Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend upload endpoint to accept both types - clks and clknblocks #503

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def check_project_exists(db, resource_id):
return query_result['count'] == 1


def get_uses_blocking(db, project_id):
sql_query = 'select uses_blocking from projects WHERE project_id = %s'
query_result = query_db(db, sql_query, [project_id], one=True)
return query_result['uses_blocking']
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved


def check_run_exists(db, project_id, run_id):
sql_query = '''
SELECT count(*)
Expand Down
62 changes: 47 additions & 15 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from io import BytesIO
import tempfile

import minio
from flask import request
Expand Down Expand Up @@ -217,7 +219,11 @@ def project_clks_post(project_id):
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# get flag use_blocking from table projects
with DBConn() as conn:
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved
uses_blocking = db.get_uses_blocking(conn, project_id)

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span, uses_blocking)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nitpick. Utility arguments such as span should come after the useful ones.

# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
Expand Down Expand Up @@ -347,39 +353,65 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
return receipt_token


def upload_json_clk_data(dp_id, clk_json, parent_span):
def upload_json_clk_data(dp_id, clk_json, parent_span, uses_blocking):
"""
Convert user provided encodings from json array of base64 data into
a newline separated file of base64 data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this docstring is now out of date?


Note this implementation is non-streaming.
"""
if 'clks' not in clk_json or len(clk_json['clks']) < 1:
safe_fail_request(400, message="Missing CLKs information")
# check if the following combinations are true
# - uses_blocking is False AND 'clks' element in upload JSON
# - uses_blocking if True AND 'clknblocks' element in upload JSON
# otherwise, return safe_fail_request
hold_condition1 = not uses_blocking and 'clks' in clk_json
hold_condition2 = uses_blocking and 'clknblocks' in clk_json

if not (hold_condition1 or hold_condition2):
# fail condition1 - uses_blocking is True but uploaded element is "clks"
if uses_blocking and 'clks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clks" while expecting "clknblocks"')
# fail condition2 - uses_blocking is False but uploaded element is "clknblocks"
if not uses_blocking and 'clknblocks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clknblocks" while expecting "clks"')
# fail condition3 - "clks" exist in JSON but there is no data
if 'clks' in clk_json and len(clk_json['clks']) < 1:
safe_fail_request(400, message='Missing CLKs information')
# fail condition4 - "clknblocks" exist in JSON but there is no data
if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1:
safe_fail_request(400, message='Missing CLK and Blocks information')
# fail condition5 - unknown element in JSON
if 'clks' not in clk_json and 'clknblocks' not in clk_json:
safe_fail_request(400, message='Unknown upload element - expect "clks" or "clknblocks"')
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved

# now we need to know element name - clks or clknblocks
element = 'clks' if hold_condition1 else 'clknblocks'

receipt_token = generate_code()

filename = Config.RAW_FILENAME_FMT.format(receipt_token)
logger.info("Storing user {} supplied clks from json".format(dp_id))
logger.info("Storing user {} supplied {} from json".format(dp_id, element))

with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span:
count = len(clk_json['clks'])
span.set_tag("clks", count)
data = b''.join(''.join(clk.split('\n')).encode() + b'\n' for clk in clk_json['clks'])
count = len(clk_json[element])
span.set_tag(element, count)

logger.info(f"Received {count} encodings.")
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved

# write clk_json into a temp file
_, tmp_filename = tempfile.mkstemp()

num_bytes = len(data)
span.set_tag("num_bytes", num_bytes)
buffer = BytesIO(data)
logger.info('Writing uploaded {} JSON to {}'.format(element.upper(), tmp_filename))
with open(tmp_filename, 'w') as f:
json.dump(clk_json, f)
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Received {count} encodings. Uploading {fmt_bytes(num_bytes)} to object store")
with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span:
span.set_tag('filename', filename)
mc = connect_to_object_store()
mc.put_object(
mc.fput_object(
Config.MINIO_BUCKET,
filename,
data=buffer,
length=num_bytes
tmp_filename
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved
)

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
Expand Down