diff --git a/mupifDB/models.py b/mupifDB/models.py index 64cc6e9..6a5da28 100644 --- a/mupifDB/models.py +++ b/mupifDB/models.py @@ -147,7 +147,7 @@ class Link_Model(StrictBase): # links to fs.files FileID: Optional[str]=None Compulsory: Optional[bool]=False # deema: allow None - Object: dict[str,Any] + Object: dict[str,Any]={} def TEMP_getChildren(self) -> List[Tuple[str,DbRef_Model]]: return [('FileID',DbRef_Model(where=where,id=id)) for where,id in [('fs.files',self.FileID)] if id is not None and id!=''] diff --git a/mupifDB/mupifdbRestApi.py b/mupifDB/mupifdbRestApi.py index 1cb19ca..19e9840 100644 --- a/mupifDB/mupifdbRestApi.py +++ b/mupifDB/mupifdbRestApi.py @@ -574,7 +574,7 @@ def get_status(): schedulerStatus = 'Failed' # get some scheduler stats - stat = mupifDB.schedulerstat.getGlobalStat() + stat = mupifDB.schedulerstat.getGlobalStat().model_dump(mode='json') schedulerstat = mongo.db.Stat.find_one()['scheduler'] output = {'mupifDBStatus': mupifDBStatus, 'schedulerStatus': schedulerStatus, 'totalStat': stat, 'schedulerStat': schedulerstat} return jsonify({'result': output}) diff --git a/mupifDB/workflowmanager.py b/mupifDB/workflowmanager.py index 0c9c231..f30dcd0 100644 --- a/mupifDB/workflowmanager.py +++ b/mupifDB/workflowmanager.py @@ -127,7 +127,7 @@ def create(workflowID: str, type: Literal['Inputs','Outputs'], workflowVer=-1, n data.append( models.IODataRecordItem_Model.model_validate( # upcast to common base class (InputOutputBase_Model), filtering only inherited keys - dict([(k,v) for k,v in io.model_dump() if k in models.InputOutputBase_Model.model_fields.keys()]) + dict([(k,v) for k,v in io.model_dump().items() if k in models.InputOutputBase_Model.model_fields.keys()]) | dict(Compulsory=io.Compulsory if type=='Inputs' else False) ) @@ -307,7 +307,8 @@ def checkInput(eid, name, obj_id, object_type, data_id, linked_output=False, ont else: # check value from database record if object_type == 'mupif.Property': - file_id = inp_record.Object['FileID'] + # FIXME: mismatch between mupif and mupifDBs models here? + file_id = inp_record.Object.get('FileID',None) if file_id is None: # property from dict try: diff --git a/mupifDB/workflowscheduler.py b/mupifDB/workflowscheduler.py index 82adb96..d41589b 100644 --- a/mupifDB/workflowscheduler.py +++ b/mupifDB/workflowscheduler.py @@ -65,7 +65,7 @@ import pydantic -from typing import Literal,List +from typing import Literal,List,Optional import threading import multiprocessing @@ -83,7 +83,8 @@ class JobInfo(mp.BareData): we_id: str wid: str status: Literal['Running','Finished','Failed'] - time: datetime.datetime + started: datetime.datetime + finished: Optional[datetime.datetime] lastJobs: List[JobInfo]=[] class Hist(mp.BareData): interval: int=60*60 @@ -113,15 +114,18 @@ def advance(self): def advanceTime(self): self.hist48h.advance() - def updateLastJobs(self, job: JobInfo, max=5) -> None: + def lastJobNew(self, job: JobInfo, max=5) -> None: + self.lastJobs.append(job) + self.lastJobs=self.lastJobs[-max:] + def lastJobDone(self, we_id, status: Literal['Finished','Failed'], finished: datetime.datetime): match=[j for j in self.lastJobs if j.we_id==job.we_id] - if len(match)==0: - self.lastJobs.append(job) - self.lastJobs=self.lastJobs[-max:] - elif len(match)==1: - match[0].status=job.status - match[0].time=job.time - else: log.error('Multiple lastJobs with {we_id=}??') + if not match: return # job already gone + if len(match)>1: + log.error('Multiple lastJobs with {we_id=}??') + return + match[0].finished=finished + match[0].status=status + def updateLoad(self) -> None: self.load=int(100*self.tasks.running*1./poolsize) self.hist48h.load[-1]=self.load @@ -149,8 +153,13 @@ def inner(*args, **kwargs): if not Pyro5.callcontext.current_context.client: raise PermissionError('Must only be called via Pyro') if no_remote: import ipaddress + import psutil + import socket addr=ipaddress.ip_address(Pyro5.callcontext.current_context.client_sock_addr[0]) # type: ignore - if not addr.is_loopback: raise PermissionError(f'Must only be called locally (not from {addr})') + if addr.is_loopback: pass + else: + ips=set([ipaddress.ip_address(rec.address) for rec in sum(psutil.net_if_addrs().values(),[]) if rec.family in (socket.AF_INET,socket,socket.AF_INET6)]) + if addr not in ips: raise PermissionError(f'Must only be called locally (not from {addr})') return fn(*args,**kwargs) return inner return deco @@ -189,7 +198,7 @@ def getStatistics(self,raw=False): # the default raw=False will return data translated to the old format self.advanceTime() s=self.stat - if raw: return s + if raw: return s.model_dump(mode='json') return dict( runningTasks = s.tasks.running, scheduledTasks = s.tasks.scheduled, @@ -224,7 +233,7 @@ def updateRunning(self,we_id,wid): self.advanceTime() self.stat.tasks.scheduled-=1 self.stat.tasks.running+=1 - self.stat.updateLastJobs(SchedulerStat.JobInfo(we_id=we_id,wid=wid,status='Running',time=datetime.datetime.now())) + self.stat.lastJobNew(SchedulerStat.JobInfo(we_id=we_id,wid=wid,status='Running',started=datetime.datetime.now(),finished=None)) self.stat.sync() @pyro_only(no_remote=True) @@ -246,8 +255,7 @@ def updateFinished(self,retCode,we_id): else: self.stat.tasks.failed+=1 self.stat.hist48h.failed[-1]+=1 - # wid is not passed in here, use "?" in case the job is not found anymore - self.stat.updateLastJobs(SchedulerStat.JobInfo(we_id=we_id,wid='?',status=('Finished' if retCode==0 else 'Failed'),time=datetime.datetime.now())) + self.stat.lastJobDone(we_id=we_id,status=('Finished' if retCode==0 else 'Failed'),finished=datetime.datetime.now()) self.stat.sync() @pyro_only(no_remote=True) @@ -555,8 +563,7 @@ def scheduler_schedule_pending(pool): if api_type != 'granta': try: stat=SchedulerStat.model_validate(monitor.getStatistics(raw=True)) - log.info(str(lt.tm_mday)+"."+str(lt.tm_mon)+"."+str(lt.tm_year)+" "+str(lt.tm_hour)+":"+str(lt.tm_min)+":"+str(lt.tm_sec)+" Scheduled/Running/Load:" + - str(stat.tasks.scheduled)+"/"+str(stat.tasks.running)+"/"+str(stat.load)) + log.warning(f'Scheduled/Running/Load: {stat.tasks.scheduled}/{stat.tasks.running}/{stat.load}') except Exception as e: log.exception('')