Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Retry Project: Modifiers and MemoryModifier #11928

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/python/WMComponent/ErrorHandler/ErrorHandlerPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

config.ErrorHandler.maxRetries

If the max retries value should be set by exitCode:

config.ErrorHandler.exitCodesRetry = {50660: 10}

However, it can also be used to handle jobs based on properties in the FWJR.
In order to engage any of this behavior you have to set the config option:
config.ErrorHandler.readFWJR = True
Expand Down Expand Up @@ -82,6 +86,8 @@ def __init__(self, config):
self.readFWJR = getattr(self.config.ErrorHandler, 'readFWJR', False)
self.passCodes = getattr(self.config.ErrorHandler, 'passExitCodes', [])

self.exitCodesRetry = getattr(self.config.ErrorHandler, 'exitCodesRetry', {})

self.getJobs = self.daoFactory(classname="Jobs.GetAllJobs")
self.idLoad = self.daoFactory(classname="Jobs.LoadFromIDWithType")
self.loadAction = self.daoFactory(classname="Jobs.LoadForErrorHandler")
Expand Down Expand Up @@ -173,7 +179,16 @@ def processRetries(self, jobList, state):

# Retries < max retry count
for job in jobList:
allowedRetries = self.maxRetries.get(job['type'], self.maxRetries['default'])

report = Report()
reportPath = os.path.join(job['cache_dir'], "Report.%i.pkl" % job['retry_count'])
report.load(reportPath)
exitCode = report.getExitCode()

if exitCode in self.exitCodesRetry:
allowedRetries = self.exitCodesRetry[exitCode]
else:
allowedRetries = self.maxRetries.get(job['type'], self.maxRetries['default'])
# Retries < allowed max retry count
if job['retry_count'] < allowedRetries and state != 'create':
cooloffJobs.append(job)
Expand Down
8 changes: 6 additions & 2 deletions src/python/WMComponent/JobCreator/JobCreatorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,15 @@ def pollSubscriptions(self):
"""
logging.info("Beginning JobCreator.pollSubscriptions() cycle.")
myThread = threading.currentThread()

logging.info('TEST: jobCreator.pollSubscriptions 1')
# First, get list of Subscriptions
subscriptions = self.subscriptionList.execute()

logging.info('TEST: jobCreator.pollSubscriptions 2')
# Okay, now we have a list of subscriptions
for subscriptionID in subscriptions:
logging.info('TEST: jobCreator.pollSubscriptions 3. subscriptionID is {}'.format(subscriptionID))
wmbsSubscription = Subscription(id=subscriptionID)
logging.info('TEST: jobCreator.pollSubscriptions 4. Subscription is {}'.format(wmbsSubscription))
try:
wmbsSubscription.load()
except IndexError:
Expand All @@ -431,8 +433,10 @@ def pollSubscriptions(self):
continue

workflow = Workflow(id=wmbsSubscription["workflow"].id)
logging.info('TEST: jobCreator.pollSubscriptions 5. workflow is {}'.format(workflow))
workflow.load()
wmbsSubscription['workflow'] = workflow
logging.info('TEST: jobCreator.pollSubscriptions 6. wmbsSubscription is {}'.format(wmbsSubscription))
wmWorkload = retrieveWMSpec(workflow=workflow)

if not workflow.task or not wmWorkload:
Expand Down
184 changes: 184 additions & 0 deletions src/python/WMComponent/RetryManager/Modifier/BaseModifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#!/usr/bin/env python

"""
_BaseModifier_


"""

import pickle, os
import json
import logging
import tarfile
import sys
import shutil
from tempfile import TemporaryDirectory
import datetime
from WMCore.WMRuntime.SandboxCreator import SandboxCreator
import pickle


class BaseModifier(object):

def __init__(self, config):
object.__init__(self)
self.backupPath = "oldSandboxes/"
self.sandboxPath = None
self.config = config
#self.dataDictJson = '/data/tier0/WMAgent.venv3/dataDict.json' # How to save it in $WMA_INSTALL_DIR/RetryManager ?

self.logDir = getattr(config.RetryManager, 'componentDir')
self.dataDictJson = "%s/%s" % (self.logDir, 'dataDict.json')


if os.path.exists(self.dataDictJson):
self.dataDict = self.readDataDict(self.dataDictJson)
else:
self.dataDict = {}
self.writeDataDict(self.dataDictJson, self.dataDict)

def loadPKL(self, pklFile):
"""
__loadPKL__

Loads data from pickle file
Used for job.pkl
"""
with open(pklFile, 'rb') as file:
data = pickle.load(file)
return data

def savePKL(self, pklFile, data):
"""
__savePKL__

Saves new job data into pickle file
Used for job.pkl
"""
with open(pklFile, 'wb') as file:
pickle.dump(data, file)

def writeDataDict(self, jsonPath, jsonData):
"""
__writeDataDict__

Writes updates dataDict into json file in the component directory
Json file serves as record keeping of job modifications that have taken place by a modifier
"""
with open(jsonPath, 'w') as jsonDataDict:
json.dump(jsonData, jsonDataDict, indent=4)

def readDataDict(self, jsonPath):
"""
__readDataDict__

Retreives dataDict from json file
"""
with open(jsonPath, 'r') as jsonDataDict:
data = json.load(jsonDataDict)
return data

def updateSandbox(self, jobPKL):
"""
__updateSandbox__


"""
date = datetime.datetime.now().strftime("%y%m%d%H%M%S")
os.makedirs(os.path.dirname(self.backupPath), exist_ok=True)
backupFile = f"{self.backupPath}/{jobPKL['workflow']}_{date}.tar.bz2"

shutil.copyfile(jobPKL['sandbox'], backupFile)

tempDir = TemporaryDirectory()
tempDirName = tempDir.name

tFile = tarfile.open(jobPKL['sandbox'], "r")
tFile.extractall(tempDirName)

shutil.copyfile(jobPKL['spec'], tempDirName+'/WMSandbox/WMWorkload.pkl')

archivePath = jobPKL['sandbox']
with tarfile.open(archivePath, "w:bz2") as tar:
for folder in os.listdir(tempDirName):
tar.add(f"{tempDirName}/{folder}", arcname=folder)

tempDir.cleanup()
return

def getTaskPath(self, jobPKL):
"""
_getTask_

"""
taskPath = jobPKL['task']
return taskPath

def getWorkload(self, jobPKL):
"""
_getWorkload_


"""
pklPath = jobPKL['spec']

configHandle = open(pklPath, "rb")
workload = pickle.load(configHandle)
configHandle.close()

return workload

def setWorkload(self, workload, jobPKL):
"""
_setWorkload_


"""
pklPath = jobPKL['spec']

#Pkl the modified object
with open(pklPath, 'wb') as pf:
pickle.dump(workload, pf)

self.updateSandbox(jobPKL)

return

def getModifierParam(self, jobType, param, defaultReturn = {}):
"""
_getAlgoParam_

Get a parameter from the config for the current algorithm and given job type
"""
modName = self.__class__.__name__
modArgs = getattr(self.config.RetryManager, modName)

if hasattr(modArgs, jobType): #config.RetryManager.MemoryModifier.Processing = modifierParams
modifierParams = getattr(modArgs, jobType)
else:
modifierParams = modArgs.default #config.RetryManager.MemoryModifier.default = modifierParams

if hasattr(modifierParams, param):
return getattr(modifierParams, param)
else:
logging.error("No %s for %s algorithm and %s job type" % (param, modName, jobType))
return defaultReturn

def modifyJob(self, job):
"""
Executes the functions to modify the job
"""
pass

# def getDataDict(self):
# """
# __getDataDict__
# """
# return self.dataDict

# def updateDataDict(self, key, value):
# """
# __updateDataDict__

# """
# self.dataDict[key] = value
Loading