Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend upload endpoint to accept both types - clks and clknblocks #503

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def get_run(db, run_id):


def get_project_column(db, project_id, column):
assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size'}
assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size', 'uses_blocking'}
sql_query = """
SELECT {}
FROM projects
Expand Down
31 changes: 31 additions & 0 deletions backend/entityservice/views/auth_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ def abort_if_invalid_results_token(resource_id, results_token):
safe_fail_request(403, message=INVALID_ACCESS_MSG)


def abort_if_inconsistent_upload(uses_blocking, clk_json):
"""
check if the following combinations are true
- uses_blocking is False AND 'clks' element in upload JSON
- uses_blocking if True AND 'clknblocks' element in upload JSON
otherwise, return safe_fail_request

:param uses_blocking: Boolean that indicates if the project uses blocking
:param clk_json: a json dict
:return: safe_fail_request if conditions are not met
"""
is_valid_clks = not uses_blocking and 'clks' in clk_json
is_valid_clknblocks = uses_blocking and 'clknblocks' in clk_json
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved
if not (is_valid_clks or is_valid_clknblocks):
# fail condition1 - uses_blocking is True but uploaded element is "clks"
if uses_blocking and 'clks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clks" while expecting "clknblocks"')
# fail condition2 - uses_blocking is False but uploaded element is "clknblocks"
if not uses_blocking and 'clknblocks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clknblocks" while expecting "clks"')
# fail condition3 - "clks" exist in JSON but there is no data
if 'clks' in clk_json and len(clk_json['clks']) < 1:
safe_fail_request(400, message='Missing CLKs information')
# fail condition4 - "clknblocks" exist in JSON but there is no data
if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1:
safe_fail_request(400, message='Missing CLK and Blocks information')
# fail condition5 - unknown element in JSON
if 'clks' not in clk_json and 'clknblocks' not in clk_json:
safe_fail_request(400, message='Unknown upload element - expect "clks" or "clknblocks"')


def dataprovider_id_if_authorize(resource_id, receipt_token):
logger.debug("checking authorization token to fetch mask data")
if not is_receipt_token_valid(resource_id, receipt_token):
Expand Down
46 changes: 26 additions & 20 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from io import BytesIO
import tempfile

import minio
from flask import request
Expand All @@ -11,9 +13,9 @@
from entityservice.tracing import serialize_span
from entityservice.utils import safe_fail_request, get_json, generate_code, get_stream, \
clks_uploaded_to_project, fmt_bytes
from entityservice.database import DBConn
from entityservice.database import DBConn, get_project_column
from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \
abort_if_invalid_results_token, get_authorization_token_type_or_abort
abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload
from entityservice import models
from entityservice.object_store import connect_to_object_store
from entityservice.serialization import binary_format
Expand Down Expand Up @@ -198,6 +200,8 @@ def project_clks_post(project_id):
dp_id = db.get_dataprovider_id(conn, token)
project_encoding_size = db.get_project_schema_encoding_size(conn, project_id)
upload_state_updated = db.is_dataprovider_allowed_to_upload_and_lock(conn, dp_id)
# get flag use_blocking from table projects
uses_blocking = get_project_column(conn, project_id, 'uses_blocking')
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved

if not upload_state_updated:
return safe_fail_request(403, "This token has already been used to upload clks.")
Expand All @@ -216,8 +220,7 @@ def project_clks_post(project_id):
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), uses_blocking, parent_span=span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
Expand Down Expand Up @@ -347,40 +350,43 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
return receipt_token


def upload_json_clk_data(dp_id, clk_json, parent_span):
def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span):
"""
Convert user provided encodings from json array of base64 data into
a newline separated file of base64 data.
Take user provided encodings as json dict and save them, as-is, to the object store.

Note this implementation is non-streaming.
"""
if 'clks' not in clk_json or len(clk_json['clks']) < 1:
safe_fail_request(400, message="Missing CLKs information")
abort_if_inconsistent_upload(uses_blocking, clk_json)

# now we need to know element name - clks or clknblocks
is_valid_clks = not uses_blocking and 'clks' in clk_json
element = 'clks' if is_valid_clks else 'clknblocks'

receipt_token = generate_code()

filename = Config.RAW_FILENAME_FMT.format(receipt_token)
logger.info("Storing user {} supplied clks from json".format(dp_id))
logger.info("Storing user {} supplied {} from json".format(dp_id, element))

with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span:
count = len(clk_json['clks'])
span.set_tag("clks", count)
data = b''.join(''.join(clk.split('\n')).encode() + b'\n' for clk in clk_json['clks'])
count = len(clk_json[element])
span.set_tag(element, count)

logger.info(f"Received {count} encodings.")
joyceyuu marked this conversation as resolved.
Show resolved Hide resolved

num_bytes = len(data)
span.set_tag("num_bytes", num_bytes)
buffer = BytesIO(data)
# write clk_json into a temp file
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)
json.dump(clk_json, tmp)

logger.info(f"Received {count} encodings. Uploading {fmt_bytes(num_bytes)} to object store")
with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span:
span.set_tag('filename', filename)
mc = connect_to_object_store()
mc.put_object(
mc.fput_object(
Config.MINIO_BUCKET,
filename,
data=buffer,
length=num_bytes
tmp.name,
content_type='application/json'
)
logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename))

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
with DBConn() as conn:
Expand Down