Skip to content

Commit

Permalink
Ignore condor log error (#394)
Browse files Browse the repository at this point in the history
* ignore condor log errors

* remove debugging output from condor job
  • Loading branch information
dsschult authored Oct 17, 2024
1 parent 0ec4020 commit 57c96cd
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 75 deletions.
4 changes: 0 additions & 4 deletions iceprod/core/exe.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,6 @@ async def convert(self, transfer=False):
print('# set some env vars for expansion', file=f)
print('OS_ARCH=$(/cvmfs/icecube.opensciencegrid.org/py3-v4.3.0/os_arch.sh)', file=f)
print('', file=f)
print('# debugging', file=f)
print('echo "dir contents:"', file=f)
print('ls -al', file=f)
print('', file=f)
with scope_env(self.cfgparser, self.task.dataset.config['steering'], logger=self.logger) as globalenv:
task = self.task.get_task_config()
if self.task.task_files:
Expand Down
145 changes: 74 additions & 71 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,77 +528,80 @@ async def wait(self, timeout):

while True:
for filename, events in self.jels.items():
for event in events:
if float(event.timestamp) < self.last_event_timestamp:
continue
self.last_event_timestamp = event.timestamp

job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc)

if event.type == htcondor.JobEventType.SUBMIT:
self.jobs[job_id] = CondorJob()
continue
elif job_id not in self.jobs:
logger.debug('reloaded job %s', job_id)
self.jobs[job_id] = CondorJob()

job = self.jobs[job_id]

if event.type == htcondor.JobEventType.JOB_AD_INFORMATION:
if not job.dataset_id:
job.dataset_id = event['IceProdDatasetId']
job.task_id = event['IceProdTaskId']
job.instance_id = event['IceProdTaskInstanceId']
job.submit_dir = Path(event['Iwd'])

type_ = event['TriggerEventTypeNumber']
if type_ == htcondor.JobEventType.JOB_TERMINATED:
logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id)

# get stats
cpu = event.get('CpusUsage', None)
if not cpu:
cpu = parse_usage(event.get('RunRemoteUsage', ''))
gpu = event.get('GpusUsage', None)
memory = event.get('MemoryUsage', None) # MB
disk = event.get('DiskUsage', None) # KB
time_ = event.get('LastRemoteWallClockTime', None) # seconds
# data_in = event['ReceivedBytes'] # KB
# data_out = event['SentBytes'] # KB

resources = {}
if cpu is not None:
resources['cpu'] = cpu
if gpu is not None:
resources['gpu'] = gpu
if memory is not None:
resources['memory'] = memory/1000.
if disk is not None:
resources['disk'] = disk/1000000.
if time_ is not None:
resources['time'] = time_/3600.

success = event.get('ReturnValue', 1) == 0
job.status = JobStatus.COMPLETED if success else JobStatus.FAILED

# finish job
await self.finish(job_id, success=success, resources=resources)

elif type_ == htcondor.JobEventType.JOB_ABORTED:
job.status = JobStatus.FAILED
reason = event.get('Reason', None)
logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason)
await self.finish(job_id, success=False, reason=reason)

else:
# update status
new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None)
if new_status is not None and job.status != new_status:
job.status = new_status
if new_status == JobStatus.FAILED:
self.submitter.remove(job_id, reason=event.get('HoldReason', None))
else:
await self.job_update(job)
try:
for event in events:
if float(event.timestamp) < self.last_event_timestamp:
continue
self.last_event_timestamp = event.timestamp

job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc)

if event.type == htcondor.JobEventType.SUBMIT:
self.jobs[job_id] = CondorJob()
continue
elif job_id not in self.jobs:
logger.debug('reloaded job %s', job_id)
self.jobs[job_id] = CondorJob()

job = self.jobs[job_id]

if event.type == htcondor.JobEventType.JOB_AD_INFORMATION:
if not job.dataset_id:
job.dataset_id = event['IceProdDatasetId']
job.task_id = event['IceProdTaskId']
job.instance_id = event['IceProdTaskInstanceId']
job.submit_dir = Path(event['Iwd'])

type_ = event['TriggerEventTypeNumber']
if type_ == htcondor.JobEventType.JOB_TERMINATED:
logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id)

# get stats
cpu = event.get('CpusUsage', None)
if not cpu:
cpu = parse_usage(event.get('RunRemoteUsage', ''))
gpu = event.get('GpusUsage', None)
memory = event.get('MemoryUsage', None) # MB
disk = event.get('DiskUsage', None) # KB
time_ = event.get('LastRemoteWallClockTime', None) # seconds
# data_in = event['ReceivedBytes'] # KB
# data_out = event['SentBytes'] # KB

resources = {}
if cpu is not None:
resources['cpu'] = cpu
if gpu is not None:
resources['gpu'] = gpu
if memory is not None:
resources['memory'] = memory/1000.
if disk is not None:
resources['disk'] = disk/1000000.
if time_ is not None:
resources['time'] = time_/3600.

success = event.get('ReturnValue', 1) == 0
job.status = JobStatus.COMPLETED if success else JobStatus.FAILED

# finish job
await self.finish(job_id, success=success, resources=resources)

elif type_ == htcondor.JobEventType.JOB_ABORTED:
job.status = JobStatus.FAILED
reason = event.get('Reason', None)
logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason)
await self.finish(job_id, success=False, reason=reason)

else:
# update status
new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None)
if new_status is not None and job.status != new_status:
job.status = new_status
if new_status == JobStatus.FAILED:
self.submitter.remove(job_id, reason=event.get('HoldReason', None))
else:
await self.job_update(job)
except Exception:
logger.warning('error processing condor log', exc_info=True)

if time.monotonic() - start >= timeout:
break
Expand Down

0 comments on commit 57c96cd

Please sign in to comment.