Skip to content

Commit

Permalink
Merge branch 'master' into body-parser-error-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alxndrsn committed Dec 9, 2024
2 parents cffb302 + 38fd622 commit e5f32e5
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 48 deletions.
4 changes: 3 additions & 1 deletion docs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,9 @@ tags:
* `submission.delete` when a Submission is soft-deleted.
* `submission.purge` when soft-deleted Submissions are purged.
* `submission.restore` when a Submission is restored.
* `submission.reprocess` when an Entity Submission is held in a processing backlog and then removed.
* `submission.backlog.hold` when an Entity Submission is first held in processing backlog.
* `submission.backlog.reprocess` when an Entity Submission is reprocessed and removed from the backlog.
* `submission.backlog.force` when an Entity Submission is force-processed after being in backlog.
* `dataset.create` when a Dataset is created.
* `dataset.update` when a Dataset is updated.
* `dataset.update.publish` when a Dataset is published.
Expand Down
6 changes: 5 additions & 1 deletion lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,12 @@ const getWithConflictDetails = (defs, audits, relevantToConflict) => {
if (v.version > 1) { // v.root is false here - can use either
const baseNotContiguousWithTrunk = v.branchId != null &&
branches.get(v.branchId).lastContiguousWithTrunk < v.baseVersion;

// check if it's a create applied as an update, which is also a conflict
const createAsUpdate = def.aux.source?.details?.action === 'create';

const conflict = v.version !== (v.baseVersion + 1) ||
baseNotContiguousWithTrunk;
baseNotContiguousWithTrunk || createAsUpdate;

v.baseDiff = getDiffProp(v.dataReceived, { ...defs[v.baseVersion - 1].data, label: defs[v.baseVersion - 1].label });

Expand Down
4 changes: 3 additions & 1 deletion lib/formats/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
let { offset } = paging;

if (skipToken) {
offset = filtered.findIndex(s => skipToken.repeatId === s.__id) + 1;
const { repeatId } = skipToken;
if (!repeatId) throw Problem.user.odataInvalidSkiptoken();
offset = filtered.findIndex(s => repeatId === s.__id) + 1;
if (offset === 0) throw Problem.user.odataRepeatIdNotFound();
}

Expand Down
4 changes: 2 additions & 2 deletions lib/http/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ module.exports = (container) => {
// catch body-parser middleware problems. we only ask it to parse JSON, which
// isn't part of OpenRosa, so we can assume a plain JSON response.
switch (error?.type) {
case 'encoding.unsupported': return defaultErrorWriter(Problem.user.unparseable({ format: 'json' }), request, response);
case 'encoding.unsupported': return defaultErrorWriter(Problem.user.encodingNotSupported(), request, response);
case 'entity.parse.failed': return defaultErrorWriter(Problem.user.unparseable({ format: 'json', rawLength: error.body.length }), request, response);
case 'entity.too.large': return defaultErrorWriter(Problem.user.unparseable({ format: 'json', rawLength: error.length }), request, response);
case 'entity.too.large': return defaultErrorWriter(Problem.user.requestTooLarge(), request, response);
default: return defaultErrorWriter(error, request, response);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/model/query/analytics.js
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ FROM duplicateRuns;
const countSubmissionReprocess = () => ({ oneFirst }) => oneFirst(sql`
SELECT COUNT(*)
FROM audits
WHERE "action" = 'submission.reprocess'
WHERE "action" = 'submission.backlog.reprocess'
`);

// Measure how much time entities whose source is a submission.create
Expand Down
6 changes: 2 additions & 4 deletions lib/model/query/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const actionCondition = (action) => {
// The backup action was logged by a backup script that has been removed.
// Even though the script has been removed, the audit log entries it logged
// have not, so we should continue to exclude those.
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
else if (action === 'user')
return sql`action in ('user.create', 'user.update', 'user.delete', 'user.assignment.create', 'user.assignment.delete', 'user.session.create')`;
else if (action === 'field_key')
Expand All @@ -50,7 +50,7 @@ const actionCondition = (action) => {
else if (action === 'form')
return sql`action in ('form.create', 'form.update', 'form.delete', 'form.restore', 'form.purge', 'form.attachment.update', 'form.submission.export', 'form.update.draft.set', 'form.update.draft.delete', 'form.update.draft.replace', 'form.update.publish', 'upgrade.process.form.entities_version')`;
else if (action === 'submission')
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'submission.purge')`;
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'submission.purge')`;
else if (action === 'dataset')
return sql`action in ('dataset.create', 'dataset.update')`;
else if (action === 'entity')
Expand Down Expand Up @@ -114,8 +114,6 @@ ${extend|| sql`
LEFT JOIN entity_defs AS current_entity_def ON current_entity_def."entityId" = entities.id AND current
`}
WHERE (audits.details->>'submissionId')::INTEGER = ${submissionId}
-- suppress this one event that is used for offline entity ordering/processing
AND audits.action != 'submission.reprocess'
ORDER BY audits."loggedAt" DESC, audits.id DESC
${page(options)}`);

Expand Down
42 changes: 30 additions & 12 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing, createSubAsUpdate = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
|| event.action === 'submission.backlog.reprocess'))
return null;

// Get client version of entity
Expand All @@ -270,7 +270,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Try computing base version.
// But if there is a 404.8 not found error, double-check if the entity never existed or was deleted.
try {
baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
baseEntityDef = await Entities._computeBaseVersion(event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
} catch (err) {
if (err.problemCode === 404.8) {
// Look up deleted entity by passing deleted as option argData
Expand Down Expand Up @@ -317,7 +317,17 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
conflict = conflictingProperties.length > 0 ? ConflictType.HARD : ConflictType.SOFT;
}
} else if (createSubAsUpdate) {
conflict = ConflictType.SOFT;
const versionDiff = getDiffProp(serverEntity.aux.currentVersion.data, clientEntity.def.dataReceived);
const diffData = pickAll(versionDiff, serverEntity.aux.currentVersion.data);

if (serverEntity.aux.currentVersion.label !== clientEntity.def.label)
diffData.label = serverEntity.aux.currentVersion.label;

conflictingProperties = Object.keys(clientEntity.def.dataReceived).filter(key => key in diffData && clientEntity.def.dataReceived[key] !== diffData[key]);

if (conflict !== ConflictType.HARD) { // We don't want to downgrade conflict here
conflict = conflictingProperties.length > 0 ? ConflictType.HARD : ConflictType.SOFT;
}
} else {
// This may still be a soft conflict if the new version is not contiguous with this branch's trunk version
const interrupted = await Entities._interruptedBranch(serverEntity.id, clientEntity);
Expand All @@ -332,7 +342,10 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// make some kind of source object
const sourceDetails = {
submission: { instanceId: submissionDef.instanceId }
submission: {
instanceId: submissionDef.instanceId,
},
...createSubAsUpdate ? { action: 'create' } : {}
};
const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id, forceOutOfOrderProcessing);
const partial = new Entity.Partial(serverEntity.with({ conflict }), {
Expand Down Expand Up @@ -367,7 +380,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
const _computeBaseVersion = (event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
if (createSubAsUpdate) {
// We are in the special case of force-apply create-as-update. get the latest version.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
Expand Down Expand Up @@ -414,7 +427,7 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forc
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
await Entities._holdSubmission(event, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}
Expand All @@ -435,7 +448,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`

// Main submission event processing function, which runs within a transaction
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const { submissionId, submissionDefId } = event.details;
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;

Expand Down Expand Up @@ -549,8 +562,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
await Entities.logBacklogEvent('reprocess', event, nextSubmissionId, nextSubmissionDefId);
}
}

Expand Down Expand Up @@ -604,10 +616,13 @@ const _interruptedBranch = (entityId, clientEntity) => async ({ maybeOne }) => {
};

// Used by _computeBaseVersion to hold submissions that are not yet ready to be processed
const _holdSubmission = (eventId, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run }) => run(sql`
const _holdSubmission = (event, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run, Entities }) => {
await Entities.logBacklogEvent('hold', event, submissionId, submissionDefId);
await run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "entityUuid", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
VALUES (${event.id}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);
};

// Check for a currently-held submission by id
const _checkHeldSubmission = (submissionId) => ({ maybeOne }) => maybeOne(sql`
Expand All @@ -629,6 +644,8 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
DELETE FROM entity_submission_backlog
WHERE "auditId"=${eventId}`);

const logBacklogEvent = (action, event, submissionId, submissionDefId) => ({ Audits }) =>
Audits.log({ id: event.actorId }, `submission.backlog.${action}`, { acteeId: event.acteeId }, { submissionId, submissionDefId });

////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG
Expand All @@ -651,6 +668,7 @@ const _processSingleBacklogEvent = (event) => (container) =>
container.db.transaction(async (trxn) => {
const { Entities } = container.with({ db: trxn });
await Entities._deleteHeldSubmissionByEventId(event.id);
await Entities.logBacklogEvent('force', event, event.details.submissionId, event.details.submissionDefId);
await Entities.processSubmissionEvent(event, { details: { force: true } });
return true;
});
Expand Down Expand Up @@ -841,7 +859,7 @@ module.exports = {
_computeBaseVersion, _interruptedBranch,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents,
_getHeldSubmissionsAsEvents, logBacklogEvent,
processBacklog, _processSingleBacklogEvent,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
Expand Down
11 changes: 9 additions & 2 deletions lib/util/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,15 @@ class QueryOptions {
//
// See: https://docs.oasis-open.org/odata/odata/v4.01/odata-v4.01-part1-protocol.html
static parseSkiptoken(token) {
const jsonString = base64ToUtf8(token.substr(2));
return JSON.parse(jsonString);
if (!token.startsWith('01')) throw Problem.user.odataInvalidSkiptoken();

try {
const parsed = JSON.parse(base64ToUtf8(token.substr(2)));
if (typeof parsed !== 'object') throw Problem.user.odataInvalidSkiptoken();
return parsed;
} catch (err) {
throw Problem.user.odataInvalidSkiptoken();
}
}

static getSkiptoken(data) {
Expand Down
6 changes: 5 additions & 1 deletion lib/util/problem.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ const problems = {

odataRepeatIdNotFound: problem(400.34, () => 'Record associated with the provided $skiptoken not found.'),

requestTooLarge: problem(400.35, () => 'Request body too large.'),
odataInvalidSkiptoken: problem(400.35, () => 'Invalid $skiptoken'),

requestTooLarge: problem(400.36, () => 'Request body too large.'),

encodingNotSupported: problem(400.37, () => 'Encoding not supported.'),

// no detail information for security reasons.
authenticationFailed: problem(401.2, () => 'Could not authenticate with the provided credentials.'),
Expand Down
2 changes: 1 addition & 1 deletion lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.backlog.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
2 changes: 1 addition & 1 deletion test/integration/api/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ describe('/audits', () => {
});
}));

it('should filter out offline entity submission reprocessing events given action=nonverbose', testService(async (service, container) => {
it('should filter out offline entity submission backlog events given action=nonverbose', testService(async (service, container) => {
const asAlice = await service.login('alice');

await asAlice.post('/v1/projects/1/forms?publish=true')
Expand Down
Loading

0 comments on commit e5f32e5

Please sign in to comment.