From b2996aa91255857820859c9e4aa36327522e2064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20=C5=A0milauer?= Date: Fri, 22 Nov 2024 12:46:27 +0100 Subject: [PATCH] (cleanups) --- mupifDB/api/main.py | 75 ++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/mupifDB/api/main.py b/mupifDB/api/main.py index 619d6dd..a03da10 100644 --- a/mupifDB/api/main.py +++ b/mupifDB/api/main.py @@ -119,13 +119,6 @@ async def validation_exception_handler(request: fastapi.Request, exc: fastapi.ex -def fix_id(record): - if record: - if '_id' in record: - record['_id'] = str(record['_id']) - return record - - # -------------------------------------------------- # Default # -------------------------------------------------- @@ -185,16 +178,12 @@ def get_workflows() -> List[models.Workflow_Model]: res = db.Workflows.find() return [models.Workflow_Model.model_validate(r) for r in res] - @app.get("/workflows/{workflow_id}", tags=["Workflows"]) def get_workflow(workflow_id: str) -> models.Workflow_Model: res = db.Workflows.find_one({"wid": workflow_id}) if res is None: raise NotFoundError(f'Database reports no workflow with wid={workflow_id}.') return models.Workflow_Model.model_validate(res) - - - @app.patch("/workflows/", tags=["Workflows"]) def update_workflow(wf: models.Workflow_Model) -> models.Workflow_Model: res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': wf.model_dump_db()}, return_document=ReturnDocument.AFTER) @@ -209,10 +198,7 @@ def insert_workflow(wf: models.Workflow_Model) -> str: @app.post("/workflows_history/", tags=["Workflows"]) def insert_workflow_history(wf: models.Workflow_Model) -> str: - #print('TTT') - #print_json(data=wf.model_dump()) res = db.WorkflowsHistory.insert_one(wf.model_dump_db()) - #print(f'{res=}') return str(res.inserted_id) @@ -227,8 +213,6 @@ def get_workflow_history(workflow_id: str, workflow_version: int) -> models.Work if res is None: raise NotFoundError(f'Database reports no workflow with wid={workflow_id} and Version={workflow_version}.') return models.Workflow_Model.model_validate(res) - - # -------------------------------------------------- # Executions # -------------------------------------------------- @@ -261,13 +245,14 @@ def get_execution(uid: str) -> models.WorkflowExecution_Model: return models.WorkflowExecution_Model.model_validate(res) - +# FIXME: how is this different from get_execution?? @app.get("/edm_execution/{uid}", tags=["Executions"]) def get_edm_execution_uid(uid: str) -> models.WorkflowExecution_Model: - res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}) - if res is None: raise NotFoundError(f'Database reports no edm_execution with uid={uid}.') - obj=models.WorkflowExecution_Model.model_validate(res) - return obj + #res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}) + #if res is None: raise NotFoundError(f'Database reports no edm_execution with uid={uid}.') + #obj=models.WorkflowExecution_Model.model_validate(res) + #return obj + return get_execution(uid) @app.get("/edm_execution/{uid}/{entity}/{iotype}", tags=["Executions"]) def get_edm_execution_uid_entity_iotype(uid: str, entity: str, iotype: Literal['input','output']) -> List[str]: @@ -292,69 +277,63 @@ def insert_execution(data: models.WorkflowExecution_Model) -> str: res = db.WorkflowExecutions.insert_one(data.model_dump_db()) return str(res.inserted_id) -### XXX: fix return annotations - @app.get("/executions/{uid}/inputs/", tags=["Executions"]) def get_execution_inputs(uid: str) -> List[models.IODataRecordItem_Model]: - # print(100*'@'+f'\n{uid=}') - res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}) - # pprint(res) - if res: - ex=models.WorkflowExecution_Model.model_validate(res) - if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs)})).DataSet + ex = get_execution(uid) + if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs)})).DataSet return [] @app.get("/executions/{uid}/outputs/", tags=["Executions"]) def get_execution_outputs(uid: str) -> List[models.IODataRecordItem_Model]: - res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}) - # pprint(res) - if res: - ex=models.WorkflowExecution_Model.model_validate(res) - if ex.Inputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Outputs)})).DataSet + ex = get_execution(uid) + if ex.Outputs: return models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Outputs)})).DataSet return [] @app.get("/executions/{uid}/livelog/{num}", tags=["Executions"]) -def get_execution_livelog(uid: str, num: int): - if (rec:=db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})) and (uri:=rec.get('loggerURI',None)): +def get_execution_livelog(uid: str, num: int) -> List[str]: + ex = get_execution(uid) + if ex.loggerURI is not None: import Pyro5.api import serpent import pickle fmt=logging.Formatter(fmt='%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s') - proxy=Pyro5.api.Proxy(uri) + proxy=Pyro5.api.Proxy(ex.loggerURI) proxy._pyroTimeout=5 ll=proxy.tail(num,raw=True) if isinstance(ll,dict): ll=serpent.tobytes(ll) ll=pickle.loads(ll) # type: ignore return [fmt.format(rec) for rec in ll] + # perhaps raise exception instead? + return [] -def get_execution_io_item(uid: str, name, obj_id: str, inputs: bool): - we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})) - data = models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)})) +def get_execution_io_item(uid: str, name, obj_id: str, inputs: bool) -> models.IODataRecordItem_Model: + ex = get_execution(uid) + data = models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(ex.Inputs if inputs else ex.Outputs)})) for elem in data.DataSet: if elem.Name == name and elem.ObjID == obj_id: return elem - return None + raise NotFoundError('Execution weid={uid}, {"inputs" it inputs else "outputs"}: no element with {name=} & {obj_id=}.') @app.get("/executions/{uid}/input_item/{name}/{obj_id}/", tags=["Executions"]) -def get_execution_input_item(uid: str, name: str, obj_id: str): +def get_execution_input_item(uid: str, name: str, obj_id: str) -> models.IODataRecordItem_Model: return get_execution_io_item(uid, name, obj_id, inputs=True) @app.get("/executions/{uid}/output_item/{name}/{obj_id}/", tags=["Executions"]) -def get_execution_output_item(uid: str, name: str, obj_id: str): +def get_execution_output_item(uid: str, name: str, obj_id: str) -> models.IODataRecordItem_Model: return get_execution_io_item(uid, name, obj_id, inputs=False) @app.get("/executions/{uid}/input_item/{name}//", tags=["Executions"]) -def _get_execution_input_item(uid: str, name: str): +def _get_execution_input_item(uid: str, name: str) -> models.IODataRecordItem_Model: return get_execution_io_item(uid, name, '', inputs=True) @app.get("/executions/{uid}/output_item/{name}//", tags=["Executions"]) -def _get_execution_output_item(uid: str, name: str): +def _get_execution_output_item(uid: str, name: str) -> models.IODataRecordItem_Model: return get_execution_io_item(uid, name, '', inputs=False) @@ -363,9 +342,9 @@ class M_IODataSetContainer(BaseModel): link: typing.Optional[dict] = None object: typing.Optional[dict] = None - -def set_execution_io_item(uid, name, obj_id, inputs: bool, data_container): - we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})) +# FIXME: validation +def set_execution_io_item(uid: str, name: str, obj_id: str, inputs: bool, data_container): + we = get_execution(uid) if (we.Status == 'Created' and inputs==True) or (we.Status == 'Running' and inputs==False): id_condition = {'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)} if data_container.link is not None and inputs==True: