From 57c96cd1878dd4a5e8e21e932a0dd29678fb6b4e Mon Sep 17 00:00:00 2001 From: David Schultz Date: Thu, 17 Oct 2024 15:57:18 -0500 Subject: [PATCH] Ignore condor log error (#394) * ignore condor log errors * remove debugging output from condor job --- iceprod/core/exe.py | 4 - iceprod/server/plugins/condor.py | 145 ++++++++++++++++--------------- 2 files changed, 74 insertions(+), 75 deletions(-) diff --git a/iceprod/core/exe.py b/iceprod/core/exe.py index f77d6e3b..24d0f12b 100644 --- a/iceprod/core/exe.py +++ b/iceprod/core/exe.py @@ -455,10 +455,6 @@ async def convert(self, transfer=False): print('# set some env vars for expansion', file=f) print('OS_ARCH=$(/cvmfs/icecube.opensciencegrid.org/py3-v4.3.0/os_arch.sh)', file=f) print('', file=f) - print('# debugging', file=f) - print('echo "dir contents:"', file=f) - print('ls -al', file=f) - print('', file=f) with scope_env(self.cfgparser, self.task.dataset.config['steering'], logger=self.logger) as globalenv: task = self.task.get_task_config() if self.task.task_files: diff --git a/iceprod/server/plugins/condor.py b/iceprod/server/plugins/condor.py index 3f746d1b..4c3f0e4e 100644 --- a/iceprod/server/plugins/condor.py +++ b/iceprod/server/plugins/condor.py @@ -528,77 +528,80 @@ async def wait(self, timeout): while True: for filename, events in self.jels.items(): - for event in events: - if float(event.timestamp) < self.last_event_timestamp: - continue - self.last_event_timestamp = event.timestamp - - job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc) - - if event.type == htcondor.JobEventType.SUBMIT: - self.jobs[job_id] = CondorJob() - continue - elif job_id not in self.jobs: - logger.debug('reloaded job %s', job_id) - self.jobs[job_id] = CondorJob() - - job = self.jobs[job_id] - - if event.type == htcondor.JobEventType.JOB_AD_INFORMATION: - if not job.dataset_id: - job.dataset_id = event['IceProdDatasetId'] - job.task_id = event['IceProdTaskId'] - job.instance_id = event['IceProdTaskInstanceId'] - job.submit_dir = Path(event['Iwd']) - - type_ = event['TriggerEventTypeNumber'] - if type_ == htcondor.JobEventType.JOB_TERMINATED: - logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id) - - # get stats - cpu = event.get('CpusUsage', None) - if not cpu: - cpu = parse_usage(event.get('RunRemoteUsage', '')) - gpu = event.get('GpusUsage', None) - memory = event.get('MemoryUsage', None) # MB - disk = event.get('DiskUsage', None) # KB - time_ = event.get('LastRemoteWallClockTime', None) # seconds - # data_in = event['ReceivedBytes'] # KB - # data_out = event['SentBytes'] # KB - - resources = {} - if cpu is not None: - resources['cpu'] = cpu - if gpu is not None: - resources['gpu'] = gpu - if memory is not None: - resources['memory'] = memory/1000. - if disk is not None: - resources['disk'] = disk/1000000. - if time_ is not None: - resources['time'] = time_/3600. - - success = event.get('ReturnValue', 1) == 0 - job.status = JobStatus.COMPLETED if success else JobStatus.FAILED - - # finish job - await self.finish(job_id, success=success, resources=resources) - - elif type_ == htcondor.JobEventType.JOB_ABORTED: - job.status = JobStatus.FAILED - reason = event.get('Reason', None) - logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason) - await self.finish(job_id, success=False, reason=reason) - - else: - # update status - new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None) - if new_status is not None and job.status != new_status: - job.status = new_status - if new_status == JobStatus.FAILED: - self.submitter.remove(job_id, reason=event.get('HoldReason', None)) - else: - await self.job_update(job) + try: + for event in events: + if float(event.timestamp) < self.last_event_timestamp: + continue + self.last_event_timestamp = event.timestamp + + job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc) + + if event.type == htcondor.JobEventType.SUBMIT: + self.jobs[job_id] = CondorJob() + continue + elif job_id not in self.jobs: + logger.debug('reloaded job %s', job_id) + self.jobs[job_id] = CondorJob() + + job = self.jobs[job_id] + + if event.type == htcondor.JobEventType.JOB_AD_INFORMATION: + if not job.dataset_id: + job.dataset_id = event['IceProdDatasetId'] + job.task_id = event['IceProdTaskId'] + job.instance_id = event['IceProdTaskInstanceId'] + job.submit_dir = Path(event['Iwd']) + + type_ = event['TriggerEventTypeNumber'] + if type_ == htcondor.JobEventType.JOB_TERMINATED: + logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id) + + # get stats + cpu = event.get('CpusUsage', None) + if not cpu: + cpu = parse_usage(event.get('RunRemoteUsage', '')) + gpu = event.get('GpusUsage', None) + memory = event.get('MemoryUsage', None) # MB + disk = event.get('DiskUsage', None) # KB + time_ = event.get('LastRemoteWallClockTime', None) # seconds + # data_in = event['ReceivedBytes'] # KB + # data_out = event['SentBytes'] # KB + + resources = {} + if cpu is not None: + resources['cpu'] = cpu + if gpu is not None: + resources['gpu'] = gpu + if memory is not None: + resources['memory'] = memory/1000. + if disk is not None: + resources['disk'] = disk/1000000. + if time_ is not None: + resources['time'] = time_/3600. + + success = event.get('ReturnValue', 1) == 0 + job.status = JobStatus.COMPLETED if success else JobStatus.FAILED + + # finish job + await self.finish(job_id, success=success, resources=resources) + + elif type_ == htcondor.JobEventType.JOB_ABORTED: + job.status = JobStatus.FAILED + reason = event.get('Reason', None) + logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason) + await self.finish(job_id, success=False, reason=reason) + + else: + # update status + new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None) + if new_status is not None and job.status != new_status: + job.status = new_status + if new_status == JobStatus.FAILED: + self.submitter.remove(job_id, reason=event.get('HoldReason', None)) + else: + await self.job_update(job) + except Exception: + logger.warning('error processing condor log', exc_info=True) if time.monotonic() - start >= timeout: break