Skip to content

Commit

Permalink
Add experimental (disabled by default) support for re-validation afte…
Browse files Browse the repository at this point in the history
…r update via transactions
  • Loading branch information
eudoxos committed Nov 26, 2024
1 parent 3f220e0 commit cfd9436
Showing 1 changed file with 51 additions and 16 deletions.
67 changes: 51 additions & 16 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,35 @@
def NotFoundError(detail):
return HTTPException(status_code=404, detail=detail)


from mupifDB import models


client = MongoClient("mongodb://localhost:"+os.environ.get('MUPIFDB_MONGODB_PORT','27017'))
db = client.MuPIF


# Transaction support for MongoDB, used to re-validate a record after validation,
# aborting the transaction automatically if it does not validate.
#
# Transactions need to be enabled in the db (via startup setting, plus a short setup through mongosh — see Makefile in the root of the repo)
# but we might avoid them just as well by setting the schema on the collection (mongo has provisions for that) and then
# the validation would (hopefully) happen automatically
#
import contextlib
from typing import Generator
from pymongo.client_session import ClientSession

@contextlib.contextmanager
def db_transaction() -> Generator[ClientSession|None]:
if 1:
# return None as session object, this makes the context no-op
yield None
else:
with client.start_session() as session:
with session.start_transaction():
yield session


tags_metadata = [
{
"name": "Users",
Expand Down Expand Up @@ -188,8 +211,10 @@ def get_workflow(workflow_id: str) -> models.Workflow_Model:

@app.patch("/workflows/", tags=["Workflows"])
def update_workflow(wf: models.Workflow_Model) -> models.Workflow_Model:
res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': wf.model_dump_db()}, return_document=ReturnDocument.AFTER)
return models.Workflow_Model.model_validate(res)
# don't write the result if the result after the update does not validate
with db_transaction() as session:
res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': wf.model_dump_db()}, return_document=ReturnDocument.AFTER, session=session)
return models.Workflow_Model.model_validate(res)


@app.post("/workflows/", tags=["Workflows"])
Expand Down Expand Up @@ -347,14 +372,19 @@ class M_IODataSetContainer(BaseModel):
# FIXME: validation
def set_execution_io_item(uid: str, name: str, obj_id: str, inputs: bool, data_container):
we = get_execution(uid)

if (we.Status == 'Created' and inputs==True) or (we.Status == 'Running' and inputs==False):
id_condition = {'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)}
if data_container.link is not None and inputs==True:
res = db.IOData.update_one(id_condition, {'$set': {"DataSet.$[r].Link": data_container.link}}, array_filters=[{"r.Name": name, "r.ObjID": str(obj_id)}])
return res.matched_count > 0
if data_container.object is not None:
res = db.IOData.update_one(id_condition, {'$set': {"DataSet.$[r].Object": data_container.object}}, array_filters=[{"r.Name": name, "r.ObjID": str(obj_id)}])
return res.matched_count > 0
with db_transaction() as session:
_id=we.Inputs if inputs else we.Outputs
id_condition = {'_id': bson.objectid.ObjectId(_id)}
if data_container.link is not None and inputs==True:
rec = db.IOData.find_one_and_update(id_condition, {'$set': {"DataSet.$[r].Link": data_container.link}}, array_filters=[{"r.Name": name, "r.ObjID": str(obj_id)}], return_document=ReturnDocument.AFTER, session=session)
elif data_container.object is not None:
rec = db.IOData.find_one_and_update(id_condition, {'$set': {"DataSet.$[r].Object": data_container.object}}, array_filters=[{"r.Name": name, "r.ObjID": str(obj_id)}], return_document=ReturnDocument.AFTER, session=session)
else: return False # raise exception??
if rec is None: raise NotFoundError(f'Database reports no IOData with {_id=}.')
models.IODataRecord_Model.model_validate(rec) # if not validated, transaction is aborted
return True
return False


Expand Down Expand Up @@ -384,26 +414,28 @@ class M_ModifyExecutionOntoBaseObjectID(BaseModel):

@app.patch("/executions/{uid}/set_onto_base_object_id/", tags=["Executions"])
def modify_execution_id(uid: str, data: M_ModifyExecutionOntoBaseObjectID):
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": data.name}, {"$set": {"EDMMapping.$.id": data.value}})
with db_transaction() as session:
rec = db.WorkflowExecutions.find_one_and_update({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": data.name}, {"$set": {"EDMMapping.$.id": data.value}}, return_document=ReturnDocument.AFTER, session=session)
models.WorkflowExecution_Model.model_validate(rec)
return get_execution(uid)

class M_ModifyExecutionOntoBaseObjectIDMultiple(BaseModel):
data: list[dict]

@app.patch("/executions/{uid}/set_onto_base_object_id_multiple/", tags=["Executions"])
def modify_execution_id_multiple(uid: str, data: List[M_ModifyExecutionOntoBaseObjectID]):
for d in data:
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": d.name}, {"$set": {"EDMMapping.$.id": d.value}})
for d in data: modify_execution_id(uid,d)
return get_execution(uid)


class M_ModifyExecutionOntoBaseObjectIDs(BaseModel):
name: str
value: list[str]

@app.patch("/executions/{uid}/set_onto_base_object_ids/", tags=["Executions"])
def modify_execution_ids(uid: str, data: M_ModifyExecutionOntoBaseObjectIDs):
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": data.name}, {"$set": {"EDMMapping.$.ids": data.value}})
with db_transaction() as session:
rec=db.WorkflowExecutions.find_one_and_update({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": data.name}, {"$set": {"EDMMapping.$.ids": data.value}}, return_document=ReturnDocument.AFTER, session=session)
models.WorkflowExecution_Model.model_validate(rec)
return get_execution(uid)


Expand All @@ -413,7 +445,9 @@ class M_ModifyExecution(BaseModel):

@app.patch("/executions/{uid}", tags=["Executions"])
def modify_execution(uid: str, data: M_ModifyExecution):
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid)}, {"$set": {data.key: data.value}})
with db_transaction() as session:
rec=db.WorkflowExecutions.find_one_and_update({'_id': bson.objectid.ObjectId(uid)}, {"$set": {data.key: data.value}}, return_document=ReturnDocument.AFTER, session=session)
models.WorkflowExecution_Model.model_validate(rec)
return get_execution(uid)


Expand Down Expand Up @@ -466,6 +500,7 @@ async def get_temp_dir():
def get_file(uid: str, tdir=Depends(get_temp_dir)):
fs = gridfs.GridFS(db)
foundfile = fs.get(bson.objectid.ObjectId(uid))
if not foundfile: raise NotFoundError('Database reports no file with {uid=}.')
wfile = io.BytesIO(foundfile.read())
fn = foundfile.filename
return StreamingResponse(wfile, headers={"Content-Disposition": "attachment; filename=" + fn})
Expand Down

0 comments on commit cfd9436

Please sign in to comment.