Skip to content

Commit

Permalink
various runtime error fixes (mostly related to models)
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed Dec 11, 2024
1 parent 605db10 commit 6167203
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion mupifDB/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Link_Model(StrictBase):
# links to fs.files
FileID: Optional[str]=None
Compulsory: Optional[bool]=False # deema: allow None
Object: dict[str,Any]
Object: dict[str,Any]={}
def TEMP_getChildren(self) -> List[Tuple[str,DbRef_Model]]:
return [('FileID',DbRef_Model(where=where,id=id)) for where,id in [('fs.files',self.FileID)] if id is not None and id!='']

Expand Down
2 changes: 1 addition & 1 deletion mupifDB/mupifdbRestApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ def get_status():
schedulerStatus = 'Failed'

# get some scheduler stats
stat = mupifDB.schedulerstat.getGlobalStat()
stat = mupifDB.schedulerstat.getGlobalStat().model_dump(mode='json')
schedulerstat = mongo.db.Stat.find_one()['scheduler']
output = {'mupifDBStatus': mupifDBStatus, 'schedulerStatus': schedulerStatus, 'totalStat': stat, 'schedulerStat': schedulerstat}
return jsonify({'result': output})
Expand Down
5 changes: 3 additions & 2 deletions mupifDB/workflowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def create(workflowID: str, type: Literal['Inputs','Outputs'], workflowVer=-1, n
data.append(
models.IODataRecordItem_Model.model_validate(
# upcast to common base class (InputOutputBase_Model), filtering only inherited keys
dict([(k,v) for k,v in io.model_dump() if k in models.InputOutputBase_Model.model_fields.keys()])
dict([(k,v) for k,v in io.model_dump().items() if k in models.InputOutputBase_Model.model_fields.keys()])
|
dict(Compulsory=io.Compulsory if type=='Inputs' else False)
)
Expand Down Expand Up @@ -307,7 +307,8 @@ def checkInput(eid, name, obj_id, object_type, data_id, linked_output=False, ont
else:
# check value from database record
if object_type == 'mupif.Property':
file_id = inp_record.Object['FileID']
# FIXME: mismatch between mupif and mupifDBs models here?
file_id = inp_record.Object.get('FileID',None)
if file_id is None:
# property from dict
try:
Expand Down
41 changes: 24 additions & 17 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@


import pydantic
from typing import Literal,List
from typing import Literal,List,Optional
import threading
import multiprocessing

Expand All @@ -83,7 +83,8 @@ class JobInfo(mp.BareData):
we_id: str
wid: str
status: Literal['Running','Finished','Failed']
time: datetime.datetime
started: datetime.datetime
finished: Optional[datetime.datetime]
lastJobs: List[JobInfo]=[]
class Hist(mp.BareData):
interval: int=60*60
Expand Down Expand Up @@ -113,15 +114,18 @@ def advance(self):

def advanceTime(self):
self.hist48h.advance()
def updateLastJobs(self, job: JobInfo, max=5) -> None:
def lastJobNew(self, job: JobInfo, max=5) -> None:
self.lastJobs.append(job)
self.lastJobs=self.lastJobs[-max:]
def lastJobDone(self, we_id, status: Literal['Finished','Failed'], finished: datetime.datetime):
match=[j for j in self.lastJobs if j.we_id==job.we_id]
if len(match)==0:
self.lastJobs.append(job)
self.lastJobs=self.lastJobs[-max:]
elif len(match)==1:
match[0].status=job.status
match[0].time=job.time
else: log.error('Multiple lastJobs with {we_id=}??')
if not match: return # job already gone
if len(match)>1:
log.error('Multiple lastJobs with {we_id=}??')
return
match[0].finished=finished
match[0].status=status

def updateLoad(self) -> None:
self.load=int(100*self.tasks.running*1./poolsize)
self.hist48h.load[-1]=self.load
Expand Down Expand Up @@ -149,8 +153,13 @@ def inner(*args, **kwargs):
if not Pyro5.callcontext.current_context.client: raise PermissionError('Must only be called via Pyro')
if no_remote:
import ipaddress
import psutil
import socket
addr=ipaddress.ip_address(Pyro5.callcontext.current_context.client_sock_addr[0]) # type: ignore
if not addr.is_loopback: raise PermissionError(f'Must only be called locally (not from {addr})')
if addr.is_loopback: pass
else:
ips=set([ipaddress.ip_address(rec.address) for rec in sum(psutil.net_if_addrs().values(),[]) if rec.family in (socket.AF_INET,socket,socket.AF_INET6)])
if addr not in ips: raise PermissionError(f'Must only be called locally (not from {addr})')
return fn(*args,**kwargs)
return inner
return deco
Expand Down Expand Up @@ -189,7 +198,7 @@ def getStatistics(self,raw=False):
# the default raw=False will return data translated to the old format
self.advanceTime()
s=self.stat
if raw: return s
if raw: return s.model_dump(mode='json')
return dict(
runningTasks = s.tasks.running,
scheduledTasks = s.tasks.scheduled,
Expand Down Expand Up @@ -224,7 +233,7 @@ def updateRunning(self,we_id,wid):
self.advanceTime()
self.stat.tasks.scheduled-=1
self.stat.tasks.running+=1
self.stat.updateLastJobs(SchedulerStat.JobInfo(we_id=we_id,wid=wid,status='Running',time=datetime.datetime.now()))
self.stat.lastJobNew(SchedulerStat.JobInfo(we_id=we_id,wid=wid,status='Running',started=datetime.datetime.now(),finished=None))
self.stat.sync()

@pyro_only(no_remote=True)
Expand All @@ -246,8 +255,7 @@ def updateFinished(self,retCode,we_id):
else:
self.stat.tasks.failed+=1
self.stat.hist48h.failed[-1]+=1
# wid is not passed in here, use "?" in case the job is not found anymore
self.stat.updateLastJobs(SchedulerStat.JobInfo(we_id=we_id,wid='?',status=('Finished' if retCode==0 else 'Failed'),time=datetime.datetime.now()))
self.stat.lastJobDone(we_id=we_id,status=('Finished' if retCode==0 else 'Failed'),finished=datetime.datetime.now())
self.stat.sync()

@pyro_only(no_remote=True)
Expand Down Expand Up @@ -555,8 +563,7 @@ def scheduler_schedule_pending(pool):
if api_type != 'granta':
try:
stat=SchedulerStat.model_validate(monitor.getStatistics(raw=True))
log.info(str(lt.tm_mday)+"."+str(lt.tm_mon)+"."+str(lt.tm_year)+" "+str(lt.tm_hour)+":"+str(lt.tm_min)+":"+str(lt.tm_sec)+" Scheduled/Running/Load:" +
str(stat.tasks.scheduled)+"/"+str(stat.tasks.running)+"/"+str(stat.load))
log.warning(f'Scheduled/Running/Load: {stat.tasks.scheduled}/{stat.tasks.running}/{stat.load}')
except Exception as e:
log.exception('')

Expand Down

0 comments on commit 6167203

Please sign in to comment.