Skip to content

Commit

Permalink
(cleanups)
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed Nov 22, 2024
1 parent a7410d0 commit b2996aa
Showing 1 changed file with 27 additions and 48 deletions.
75 changes: 27 additions & 48 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,6 @@ async def validation_exception_handler(request: fastapi.Request, exc: fastapi.ex



def fix_id(record):
if record:
if '_id' in record:
record['_id'] = str(record['_id'])
return record


# --------------------------------------------------
# Default
# --------------------------------------------------
Expand Down Expand Up @@ -185,16 +178,12 @@ def get_workflows() -> List[models.Workflow_Model]:
res = db.Workflows.find()
return [models.Workflow_Model.model_validate(r) for r in res]


@app.get("/workflows/{workflow_id}", tags=["Workflows"])
def get_workflow(workflow_id: str) -> models.Workflow_Model:
res = db.Workflows.find_one({"wid": workflow_id})
if res is None: raise NotFoundError(f'Database reports no workflow with wid={workflow_id}.')
return models.Workflow_Model.model_validate(res)




@app.patch("/workflows/", tags=["Workflows"])
def update_workflow(wf: models.Workflow_Model) -> models.Workflow_Model:
res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': wf.model_dump_db()}, return_document=ReturnDocument.AFTER)
Expand All @@ -209,10 +198,7 @@ def insert_workflow(wf: models.Workflow_Model) -> str:

@app.post("/workflows_history/", tags=["Workflows"])
def insert_workflow_history(wf: models.Workflow_Model) -> str:
#print('TTT')
#print_json(data=wf.model_dump())
res = db.WorkflowsHistory.insert_one(wf.model_dump_db())
#print(f'{res=}')
return str(res.inserted_id)


Expand All @@ -227,8 +213,6 @@ def get_workflow_history(workflow_id: str, workflow_version: int) -> models.Work
if res is None: raise NotFoundError(f'Database reports no workflow with wid={workflow_id} and Version={workflow_version}.')
return models.Workflow_Model.model_validate(res)



# --------------------------------------------------
# Executions
# --------------------------------------------------
Expand Down Expand Up @@ -261,13 +245,14 @@ def get_execution(uid: str) -> models.WorkflowExecution_Model:
return models.WorkflowExecution_Model.model_validate(res)



# FIXME: how is this different from get_execution??
@app.get("/edm_execution/{uid}", tags=["Executions"])
def get_edm_execution_uid(uid: str) -> models.WorkflowExecution_Model:
res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
if res is None: raise NotFoundError(f'Database reports no edm_execution with uid={uid}.')
obj=models.WorkflowExecution_Model.model_validate(res)
return obj
#res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
#if res is None: raise NotFoundError(f'Database reports no edm_execution with uid={uid}.')
#obj=models.WorkflowExecution_Model.model_validate(res)
#return obj
return get_execution(uid)

@app.get("/edm_execution/{uid}/{entity}/{iotype}", tags=["Executions"])
def get_edm_execution_uid_entity_iotype(uid: str, entity: str, iotype: Literal['input','output']) -> List[str]:
Expand All @@ -292,69 +277,63 @@ def insert_execution(data: models.WorkflowExecution_Model) -> str:
res = db.WorkflowExecutions.insert_one(data.model_dump_db())
return str(res.inserted_id)

### XXX: fix return annotations

@app.get("/executions/{uid}/inputs/", tags=["Executions"])
def get_execution_inputs(uid: str) -> List[models.IODataRecordItem_Model]:
# print(100*'@'+f'\n{uid=}')
res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
# pprint(res)
if res:
ex=models.WorkflowExecution_Model.model_validate(res)
if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs)})).DataSet
ex = get_execution(uid)
if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs)})).DataSet
return []


@app.get("/executions/{uid}/outputs/", tags=["Executions"])
def get_execution_outputs(uid: str) -> List[models.IODataRecordItem_Model]:
res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
# pprint(res)
if res:
ex=models.WorkflowExecution_Model.model_validate(res)
if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Outputs)})).DataSet
ex = get_execution(uid)
if ex.Outputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Outputs)})).DataSet
return []

@app.get("/executions/{uid}/livelog/{num}", tags=["Executions"])
def get_execution_livelog(uid: str, num: int):
if (rec:=db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})) and (uri:=rec.get('loggerURI',None)):
def get_execution_livelog(uid: str, num: int) -> List[str]:
ex = get_execution(uid)
if ex.loggerURI is not None:
import Pyro5.api
import serpent
import pickle
fmt=logging.Formatter(fmt='%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s')
proxy=Pyro5.api.Proxy(uri)
proxy=Pyro5.api.Proxy(ex.loggerURI)
proxy._pyroTimeout=5
ll=proxy.tail(num,raw=True)
if isinstance(ll,dict): ll=serpent.tobytes(ll)
ll=pickle.loads(ll) # type: ignore
return [fmt.format(rec) for rec in ll]
# perhaps raise exception instead?
return []


def get_execution_io_item(uid: str, name, obj_id: str, inputs: bool):
we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}))
data = models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)}))
def get_execution_io_item(uid: str, name, obj_id: str, inputs: bool) -> models.IODataRecordItem_Model:
ex = get_execution(uid)
data = models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs if inputs else ex.Outputs)}))
for elem in data.DataSet:
if elem.Name == name and elem.ObjID == obj_id:
return elem
return None
raise NotFoundError('Execution weid={uid}, {"inputs" it inputs else "outputs"}: no element with {name=} & {obj_id=}.')


@app.get("/executions/{uid}/input_item/{name}/{obj_id}/", tags=["Executions"])
def get_execution_input_item(uid: str, name: str, obj_id: str):
def get_execution_input_item(uid: str, name: str, obj_id: str) -> models.IODataRecordItem_Model:
return get_execution_io_item(uid, name, obj_id, inputs=True)


@app.get("/executions/{uid}/output_item/{name}/{obj_id}/", tags=["Executions"])
def get_execution_output_item(uid: str, name: str, obj_id: str):
def get_execution_output_item(uid: str, name: str, obj_id: str) -> models.IODataRecordItem_Model:
return get_execution_io_item(uid, name, obj_id, inputs=False)


@app.get("/executions/{uid}/input_item/{name}//", tags=["Executions"])
def _get_execution_input_item(uid: str, name: str):
def _get_execution_input_item(uid: str, name: str) -> models.IODataRecordItem_Model:
return get_execution_io_item(uid, name, '', inputs=True)


@app.get("/executions/{uid}/output_item/{name}//", tags=["Executions"])
def _get_execution_output_item(uid: str, name: str):
def _get_execution_output_item(uid: str, name: str) -> models.IODataRecordItem_Model:
return get_execution_io_item(uid, name, '', inputs=False)


Expand All @@ -363,9 +342,9 @@ class M_IODataSetContainer(BaseModel):
link: typing.Optional[dict] = None
object: typing.Optional[dict] = None


def set_execution_io_item(uid, name, obj_id, inputs: bool, data_container):
we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}))
# FIXME: validation
def set_execution_io_item(uid: str, name: str, obj_id: str, inputs: bool, data_container):
we = get_execution(uid)
if (we.Status == 'Created' and inputs==True) or (we.Status == 'Running' and inputs==False):
id_condition = {'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)}
if data_container.link is not None and inputs==True:
Expand Down

0 comments on commit b2996aa

Please sign in to comment.