Skip to content

Commit

Permalink
Merge pull request #11913 from vkuznet/fix-issue-11899
Browse files Browse the repository at this point in the history
New DBSConcurrency module for concurrent execution of HTTP queries to DBS via pycurl manager
  • Loading branch information
amaltaro authored Feb 27, 2024
2 parents edf92c0 + 6a6d5ae commit 9a703d6
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 32 deletions.
35 changes: 3 additions & 32 deletions src/python/WMComponent/WorkflowUpdater/WorkflowUpdaterPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
from WMCore.DAOFactory import DAOFactory
from WMCore.Services.DBS.DBSConcurrency import getBlockInfo4PU


def findJsonSandboxFiles(tfile):
Expand Down Expand Up @@ -142,6 +143,7 @@ def updateBlockInfo(jdoc, msPUBlockLoc, logger):
:param jdoc: JSON sandbox dictionary
:param msPUBlockLOck: dict of block with rses from MSPileup service, i.e. {'block': [rses], ... }
:param logger: logger object
:return: newly constructed dict
"""
returnDict = {}
Expand Down Expand Up @@ -176,7 +178,7 @@ def updateBlockInfo(jdoc, msPUBlockLoc, logger):
if len(blocksToUpdate) > 0:
logger.info("Adding %s blocks from MSPileup which are not present in pileupconf.json",
len(blocksToUpdate))
binfo = getBlockInfo(blocksToUpdate)
binfo = getBlockInfo4PU(blocksToUpdate, ckey(), cert())
for puType in returnDict.keys():
for blockName in blocksToUpdate:
# update block record in-place
Expand All @@ -185,37 +187,6 @@ def updateBlockInfo(jdoc, msPUBlockLoc, logger):
return returnDict


def getBlockInfo(blockNames):
"""
Fetch block information details, file list and number of events, from DBS
server.
:param blockNames: list of block names
:return dict: dictionary of {block: {"FileList": file_list, "NumberOfEvents": number_events}, ...}
"""
# TODO: the logic of this function should be implemented concurrently:
# - for every given block get information from DBS: list of files and number of events in a block
# - please use pycurl_manager.py module and the following logic:
# urls = []
# for blk in blockNames:
# # need to encode block name properly
# block = urllib.parse.quote_plus(blk)
# url = f"https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader/datasetsummary?block_name={block}"
# urls.append(url)
# # place concurrent calls to DBS
# results = getdata(urls, ckey, cert, cookie=cookie)
# # parse output of getdata in some form
# for row in resuls:
# blockInfo[blk] = ...

# so far we put placeholder codebase which does nothing
fileList = []
numEvents = 0
blockInfo = {}
for blk in blockNames:
blockInfo[blk] = {"FileList": fileList, "NumberOfEvents": numEvents}
return blockInfo


def writePileupJson(tfile, jdict, logger, dest=None):
"""
Write pileup JSON sandbox files back to file system
Expand Down
41 changes: 41 additions & 0 deletions src/python/WMCore/Services/DBS/DBSConcurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
"""
File : DBSConcurrency.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: dedicated module to holds DBS related functions executed
concurrent calls to DBS APIs.
"""

import json
import urllib
from WMCore.Services.pycurl_manager import getdata as multi_getdata


def getBlockInfo4PU(blockNames, ckey, cert):
"""
Fetch block information details, file list and number of events, from DBS
server. Here we use concrete set of parameters for DBS to use in this case, i.e.
we must look-up only valid files and get full details from the DBS API (in order
to get number of events).
:param blockNames: list of block names
:return: dictionary of {block: {"FileList": list of strings, "NumberOfEvents": integer}, ...}
"""
urls = []
for blk in blockNames:
# need to encode block name properly
block = urllib.parse.quote_plus(blk)
url = f"https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader/files?detail=true&validFileOnly=1&block_name={block}"
urls.append(url)
# place concurrent calls to DBS, please note that multi_getdata is generator, therefore
# it does not put DBS results into the memory until this generator is iterated
results = multi_getdata(urls, ckey, cert)
# parse output of getdata in some form
blockInfo = {}
for row in results:
blk = row['url'].split('block_name=')[-1]
block = urllib.parse.unquote_plus(blk)
data = json.loads(row['data'])
files = [r['logical_file_name'] for r in data]
nevents = sum([r['event_count'] for r in data])
blockInfo[block] = {'FileList': files, 'NumberOfEvents': nevents}
return blockInfo
68 changes: 68 additions & 0 deletions test/python/WMCore_t/Services_t/DBS_t/DBSConcurrency_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python
"""
_DBSReader_t_
Unit test for the DBSConcurrency module
"""

import os
import json
import time
import logging
import unittest

# WMCore modules
from WMCore.Services.DBS.DBSConcurrency import getBlockInfo4PU
from WMCore.Services.pycurl_manager import getdata as multi_getdata
from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase


class DBSConcurrencyTest(EmulatedUnitTestCase):
"""
DBSConcurrencyTest class defines unit tests for DBS concurrent codebase
"""

def setUp(self):
"""
Initialization function
"""

self.dbs = 'https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader'
self.ckey = os.getenv('X509_USER_KEY')
self.cert = os.getenv('X509_USER_CERT')
logging.basicConfig()
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)

def testGetBlockInfoList(self):
"""
Unit test for getBlockInfo4PU function
"""
time0 = time.time()
dataset = '/ZMM_13TeV_TuneCP5-pythia8/RunIIAutumn18DR-SNB_102X_upgrade2018_realistic_v17-v2/AODSIM'
url = f"{self.dbs}/blocks?dataset={dataset}"
self.logger.info(url)
results = multi_getdata([url], self.ckey, self.cert)
blocks = []
for row in results:
data = json.loads(row['data'])
blocks = [r['block_name'] for r in data]
elapsedTime = time.time() - time0
self.logger.debug("for %s get %d in %s seconds", dataset, len(blocks), elapsedTime)
self.assertTrue(len(blocks), 2)
# call to DBS should be resovled within 1 second
self.assertTrue(elapsedTime < 3)

time0 = time.time()
blockInfoList = getBlockInfo4PU(blocks, self.ckey, self.cert)
for blk, row in blockInfoList.items():
self.logger.debug("block %s, nfiles=%d, nevents=%d", blk, len(row['FileList']), row['NumberOfEvents'])
elapsedTime = time.time() - time0
self.logger.debug("Elapsed time: %d seconds", elapsedTime)
# NOTE: if every DBS call spent 1 second, avg time for 10 calls will be around 1 second
# therefore, we will test quite low number here
self.assertTrue(elapsedTime < 3)


if __name__ == '__main__':
unittest.main()

0 comments on commit 9a703d6

Please sign in to comment.