Skip to content

Commit

Permalink
Implement support for gzipped stream for RucioConMon
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed May 12, 2022
1 parent 8357f0e commit d239878
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 19 deletions.
42 changes: 28 additions & 14 deletions src/python/WMCore/Services/RucioConMon/RucioConMon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from urllib.parse import urlencode

import gzip
import json
import logging

Expand All @@ -36,10 +37,14 @@ def __init__(self, url, logger=None, configDict=None):
super(RucioConMon, self).__init__(configDict)
self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint'])

def _getResult(self, uri, callname="", clearCache=False, args=None):
def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False):
"""
Either fetch data from the cache file or query the data-service
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:param binary: specifies request for binary object from HTTP requests (e.g. zipped content)
:return: A dictionary
"""

Expand All @@ -59,9 +64,17 @@ def _getResult(self, uri, callname="", clearCache=False, args=None):

if clearCache:
self.clearCache(cachedApi, args)
data = self.refreshCache(cachedApi, apiUrl)
results = data.read()
data.close()
if binary:
data = self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True)
results = gzip.decompress(data.read())
data.close()
else:
data = self.refreshCache(cachedApi, apiUrl)
results = data.read()
data.close()

if binary:
return results

results = json.loads(results)
return results
Expand All @@ -71,9 +84,15 @@ def _getResultZipped(self, uri, callname="", clearCache=True, args=None):
This method is retrieving a zipped file from the uri privided, instead
of the normal json
:param uri: The endpoint uri
:return: A dictionary
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:return: a list of LFNs
"""
raise NotImplementedError
data = self._getResult(uri, callname, clearCache, args, binary=True)
# convert bytes which we received upstream to string
data = data.decode("utf-8")
return [f for f in data.split('\n') if f]

def getRSEStats(self):
"""
Expand Down Expand Up @@ -103,11 +122,6 @@ def getRSEUnmerged(self, rseName, zipped=False):
rseUnmerged = self._getResult(uri, callname=rseName)
return rseUnmerged
else:
pass
# TODO: To implement the _getResultZipped() method
# NOTE: An alternative uri - providing the file with .zip extension:
# uri = "WM/files/files.gz?rse=%s&format=raw" % rseName
# The uri from below provides the file zipped but with no extension
# uri = "WM/files?rse=%s&format=raw" % rseName
# rseUnmerged = self._getResultZipped(rseName, callname='unmerged.zipped', clearCache=True)
# return rseUnmerged
uri = "WM/files?rse=%s&format=raw" % rseName
rseUnmerged = self._getResultZipped(uri, callname='unmerged.zipped', clearCache=True)
return rseUnmerged
18 changes: 13 additions & 5 deletions src/python/WMCore/Services/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def cacheFileName(self, cachefile, verb='GET', inputdata=None):
return cachefile

def refreshCache(self, cachefile, url='', inputdata=None, openfile=True,
encoder=True, decoder=True, verb='GET', contentType=None, incoming_headers=None):
encoder=True, decoder=True, verb='GET', contentType=None, incoming_headers=None, binary=False):
"""
See if the cache has expired. If it has make a new request to the
service for the input data. Return the cachefile as an open file object.
Expand All @@ -203,19 +203,21 @@ def refreshCache(self, cachefile, url='', inputdata=None, openfile=True,
cachefile = self.cacheFileName(cachefile, verb, inputdata)

if cache_expired(cachefile, self["cacheduration"]):
self.getData(cachefile, url, inputdata, incoming_headers, encoder, decoder, verb, contentType)
self.getData(cachefile, url, inputdata, incoming_headers, encoder, decoder, verb, contentType, binary=binary)
else:
self['logger'].debug('Data is from the Service cache')

# cachefile may be filename or file object
if openfile and not isfile(cachefile):
if binary:
return open(cachefile, 'rb')
return open(cachefile, 'r')
else:
return cachefile

def forceRefresh(self, cachefile, url='', inputdata=None, openfile=True,
encoder=True, decoder=True, verb='GET',
contentType=None, incoming_headers=None):
contentType=None, incoming_headers=None, binary=False):
"""
Make a new request to the service for the input data, regardless of the
cache state. Return the cachefile as an open file object.
Expand All @@ -230,8 +232,10 @@ def forceRefresh(self, cachefile, url='', inputdata=None, openfile=True,
self['logger'].debug("Forcing cache refresh of %s" % cachefile)
incoming_headers.update({'cache-control': 'no-cache'})
self.getData(cachefile, url, inputdata, incoming_headers,
encoder, decoder, verb, contentType, force_refresh=True, )
encoder, decoder, verb, contentType, force_refresh=True, binary=binary)
if openfile and not isfile(cachefile):
if binary:
return open(cachefile, 'rb')
return open(cachefile, 'r')
else:
return cachefile
Expand All @@ -256,7 +260,7 @@ def clearCache(self, cachefile, inputdata=None, verb='GET'):

def getData(self, cachefile, url, inputdata=None, incoming_headers=None,
encoder=True, decoder=True,
verb='GET', contentType=None, force_refresh=False):
verb='GET', contentType=None, force_refresh=False, binary=False):
"""
Takes the already generated *full* path to cachefile and the url of the
resource. Don't need to call self.cacheFileName(cachefile, verb, inputdata)
Expand Down Expand Up @@ -294,6 +298,10 @@ def getData(self, cachefile, url, inputdata=None, incoming_headers=None,
cachefile.write(data)
cachefile.seek(0, 0) # return to beginning of file
else:
if binary:
with open(cachefile, 'wb') as f:
f.write(data)
return
with open(cachefile, 'w') as f:
if isinstance(data, dict) or isinstance(data, list):
f.write(json.dumps(data))
Expand Down
35 changes: 35 additions & 0 deletions test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python
"""
_RucioConMon_t_
Unit tests for RucioConMon WMCore Service class
"""

import unittest

from nose.plugins.attrib import attr

from WMCore.Services.RucioConMon.RucioConMon import RucioConMon


class RucioConMonTest(unittest.TestCase):
"""
Unit tests for RucioConMon Service module
"""

@attr("integration")
def testGetRSEUnmerged(self):
"""
Test getRSEUnmerged method using both zipped and unzipped requests
This test uses specific rse name which can be changed to any other RSE.
"""
url = "https://cmsweb.cern.ch/rucioconmon/WM/files?rse=T2_TR_METU&format=raw"
mgr = RucioConMon("https://cmsweb.cern.ch/rucioconmon")
rseName = "T2_TR_METU"
dataUnzipped = mgr.getRSEUnmerged(rseName, zipped=False)
dataZipped = mgr.getRSEUnmerged(rseName, zipped=True)
self.assertTrue(dataUnzipped == dataZipped)


if __name__ == '__main__':
unittest.main()

0 comments on commit d239878

Please sign in to comment.