diff --git a/docs/api.yaml b/docs/api.yaml index 12409a6fb..102516ada 100644 --- a/docs/api.yaml +++ b/docs/api.yaml @@ -868,7 +868,9 @@ tags: * `submission.delete` when a Submission is soft-deleted. * `submission.purge` when soft-deleted Submissions are purged. * `submission.restore` when a Submission is restored. - * `submission.reprocess` when an Entity Submission is held in a processing backlog and then removed. + * `submission.backlog.hold` when an Entity Submission is first held in processing backlog. + * `submission.backlog.reprocess` when an Entity Submission is reprocessed and removed from the backlog. + * `submission.backlog.force` when an Entity Submission is force-processed after being in backlog. * `dataset.create` when a Dataset is created. * `dataset.update` when a Dataset is updated. * `dataset.update.publish` when a Dataset is published. diff --git a/lib/model/query/analytics.js b/lib/model/query/analytics.js index 27ecc53b4..d35ccd5cb 100644 --- a/lib/model/query/analytics.js +++ b/lib/model/query/analytics.js @@ -525,7 +525,7 @@ FROM duplicateRuns; const countSubmissionReprocess = () => ({ oneFirst }) => oneFirst(sql` SELECT COUNT(*) FROM audits - WHERE "action" = 'submission.reprocess' + WHERE "action" = 'submission.backlog.reprocess' `); // Measure how much time entities whose source is a submission.create diff --git a/lib/model/query/audits.js b/lib/model/query/audits.js index ab9300a04..2384dd93f 100644 --- a/lib/model/query/audits.js +++ b/lib/model/query/audits.js @@ -38,7 +38,7 @@ const actionCondition = (action) => { // The backup action was logged by a backup script that has been removed. // Even though the script has been removed, the audit log entries it logged // have not, so we should continue to exclude those. - return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'backup', 'analytics')`; + return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'backup', 'analytics')`; else if (action === 'user') return sql`action in ('user.create', 'user.update', 'user.delete', 'user.assignment.create', 'user.assignment.delete', 'user.session.create')`; else if (action === 'field_key') @@ -50,7 +50,7 @@ const actionCondition = (action) => { else if (action === 'form') return sql`action in ('form.create', 'form.update', 'form.delete', 'form.restore', 'form.purge', 'form.attachment.update', 'form.submission.export', 'form.update.draft.set', 'form.update.draft.delete', 'form.update.draft.replace', 'form.update.publish', 'upgrade.process.form.entities_version')`; else if (action === 'submission') - return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'submission.purge')`; + return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'submission.purge')`; else if (action === 'dataset') return sql`action in ('dataset.create', 'dataset.update')`; else if (action === 'entity') @@ -114,8 +114,6 @@ ${extend|| sql` LEFT JOIN entity_defs AS current_entity_def ON current_entity_def."entityId" = entities.id AND current `} WHERE (audits.details->>'submissionId')::INTEGER = ${submissionId} - -- suppress this one event that is used for offline entity ordering/processing - AND audits.action != 'submission.reprocess' ORDER BY audits."loggedAt" DESC, audits.id DESC ${page(options)}`); diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js index 9d20c7321..12272f3e6 100644 --- a/lib/model/query/entities.js +++ b/lib/model/query/entities.js @@ -257,7 +257,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing, createSubAsUpdate = false) => async ({ Audits, Entities }) => { if (!(event.action === 'submission.create' || event.action === 'submission.update.version' - || event.action === 'submission.reprocess')) + || event.action === 'submission.backlog.reprocess')) return null; // Get client version of entity @@ -270,7 +270,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Try computing base version. // But if there is a 404.8 not found error, double-check if the entity never existed or was deleted. try { - baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate); + baseEntityDef = await Entities._computeBaseVersion(event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate); } catch (err) { if (err.problemCode === 404.8) { // Look up deleted entity by passing deleted as option argData @@ -380,7 +380,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Used by _updateVerison to figure out the intended base version in Central // based on the branchId, trunkVersion, and baseVersion in the submission -const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => { +const _computeBaseVersion = (event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => { if (createSubAsUpdate) { // We are in the special case of force-apply create-as-update. get the latest version. const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid) @@ -427,7 +427,7 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forc return latestEntity.aux.currentVersion; } else { // If there is no base version and we are not forcing the processing, hold submission in the backlog. - await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion); + await Entities._holdSubmission(event, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion); return null; } } @@ -448,7 +448,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql` // Main submission event processing function, which runs within a transaction // so any errors can be rolled back and logged as an entity processing error. -const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst, run }) => { +const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst, run }) => { const { submissionId, submissionDefId } = event.details; const forceOutOfOrderProcessing = parentEvent?.details?.force === true; @@ -562,8 +562,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset if (nextSub.isDefined() && !forceOutOfOrderProcessing) { const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get(); await Entities._deleteHeldSubmissionByEventId(auditId); - await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId }, - { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId }); + await Entities.logBacklogEvent('reprocess', event, nextSubmissionId, nextSubmissionDefId); } } @@ -617,10 +616,13 @@ const _interruptedBranch = (entityId, clientEntity) => async ({ maybeOne }) => { }; // Used by _computeBaseVersion to hold submissions that are not yet ready to be processed -const _holdSubmission = (eventId, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run }) => run(sql` +const _holdSubmission = (event, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run, Entities }) => { + await Entities.logBacklogEvent('hold', event, submissionId, submissionDefId); + await run(sql` INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "entityUuid", "branchId", "branchBaseVersion", "loggedAt") - VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP()) + VALUES (${event.id}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP()) `); +}; // Check for a currently-held submission by id const _checkHeldSubmission = (submissionId) => ({ maybeOne }) => maybeOne(sql` @@ -642,6 +644,8 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql` DELETE FROM entity_submission_backlog WHERE "auditId"=${eventId}`); +const logBacklogEvent = (action, event, submissionId, submissionDefId) => ({ Audits }) => + Audits.log({ id: event.actorId }, `submission.backlog.${action}`, { acteeId: event.acteeId }, { submissionId, submissionDefId }); //////////////////////////////////////////////////////////////////////////////// // FORCE PROCESSING SUBMISSIONS FROM BACKLOG @@ -664,6 +668,7 @@ const _processSingleBacklogEvent = (event) => (container) => container.db.transaction(async (trxn) => { const { Entities } = container.with({ db: trxn }); await Entities._deleteHeldSubmissionByEventId(event.id); + await Entities.logBacklogEvent('force', event, event.details.submissionId, event.details.submissionDefId); await Entities.processSubmissionEvent(event, { details: { force: true } }); return true; }); @@ -854,7 +859,7 @@ module.exports = { _computeBaseVersion, _interruptedBranch, _holdSubmission, _checkHeldSubmission, _getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId, - _getHeldSubmissionsAsEvents, + _getHeldSubmissionsAsEvents, logBacklogEvent, processBacklog, _processSingleBacklogEvent, processSubmissionEvent, streamForExport, getDefBySubmissionId, diff --git a/lib/worker/jobs.js b/lib/worker/jobs.js index 906ad6d91..1a35a08fd 100644 --- a/lib/worker/jobs.js +++ b/lib/worker/jobs.js @@ -18,7 +18,7 @@ const jobs = { 'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ], 'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ], - 'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ], + 'submission.backlog.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ], 'form.create': [ require('./form').create ], 'form.update.draft.set': [ require('./form').updateDraftSet ], diff --git a/test/integration/api/audits.js b/test/integration/api/audits.js index 3cbc205a6..732216ebe 100644 --- a/test/integration/api/audits.js +++ b/test/integration/api/audits.js @@ -645,7 +645,7 @@ describe('/audits', () => { }); })); - it('should filter out offline entity submission reprocessing events given action=nonverbose', testService(async (service, container) => { + it('should filter out offline entity submission backlog events given action=nonverbose', testService(async (service, container) => { const asAlice = await service.login('alice'); await asAlice.post('/v1/projects/1/forms?publish=true') diff --git a/test/integration/api/offline-entities.js b/test/integration/api/offline-entities.js index 7a132e020..dc6cf1362 100644 --- a/test/integration/api/offline-entities.js +++ b/test/integration/api/offline-entities.js @@ -649,7 +649,104 @@ describe('Offline Entities', () => { }); })); - it('should not include submission.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => { + it('should log an event when holding submission in backlog (force update)', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Send second update in first + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('one', 'one-update1') + .replace('baseVersion="1"', 'baseVersion="2"') + .replace('arrived', 'working') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits') + .expect(200) + .then(({ body }) => { + body.length.should.equal(2); + body.map(a => a.action).should.eql([ + 'submission.backlog.hold', + 'submission.create' + ]); + }); + + // force process the backlog + await container.Entities.processBacklog(true); + + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits') + .expect(200) + .then(({ body }) => { + body.length.should.equal(4); + body.map(a => a.action).should.eql([ + 'entity.update.version', + 'submission.backlog.force', + 'submission.backlog.hold', + 'submission.create' + ]); + }); + })); + + it('should log an event when holding submission in backlog (force update-as-create, then create-as-update)', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // send update to entity that hasn't been created yet + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.two + .replace('create="1"', 'update="1"') + .replace('branchId=""', `branchId="${branchId}"`) + .replace('two', 'two-update') + .replace('baseVersion=""', 'baseVersion="1"') + .replace('new', 'checked in') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // force process the backlog + await container.Entities.processBacklog(true); + + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits') + .expect(200) + .then(({ body }) => { + body.length.should.equal(4); + body.map(a => a.action).should.eql([ + 'entity.create', + 'submission.backlog.force', + 'submission.backlog.hold', + 'submission.create' + ]); + }); + + + // Finally send create + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.two) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // The create doesn't go thorugh the backlog so there's no backlog events here + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits') + .expect(200) + .then(({ body }) => { + body.length.should.equal(2); + body.map(a => a.action).should.eql([ + 'entity.update.version', + 'submission.create' + ]); + }); + })); + + it('should include submission.backlog.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => { const asAlice = await service.login('alice'); const branchId = uuid(); @@ -677,11 +774,14 @@ describe('Offline Entities', () => { await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits') .expect(200) .then(({ body }) => { - body.length.should.equal(2); body.map(a => a.action).should.eql([ 'entity.update.version', + 'submission.backlog.reprocess', + 'submission.backlog.hold', 'submission.create' ]); + // actor for update should be the same as submission create actor + body[0].actorId.should.equal(body[3].actorId); }); })); }); @@ -740,8 +840,11 @@ describe('Offline Entities', () => { await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits') .expect(200) .then(({ body }) => { - body.length.should.equal(1); - should.not.exist(body[0].details.problem); + body.map(a => a.action).should.eql([ + 'submission.backlog.hold', + 'submission.create' + ]); + should.not.exist(body[1].details.problem); }); // Observe that the update was still not applied. @@ -820,8 +923,11 @@ describe('Offline Entities', () => { await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits') .expect(200) .then(({ body }) => { - body.length.should.equal(1); - should.not.exist(body[0].details.problem); + body.map(a => a.action).should.eql([ + 'submission.backlog.hold', + 'submission.create' + ]); + should.not.exist(body[1].details.problem); }); // There should be one submission (the second one) in the held submissions queue @@ -896,9 +1002,13 @@ describe('Offline Entities', () => { await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits') .expect(200) .then(({ body }) => { - body.length.should.equal(1); - body[0].action.should.eql('submission.create'); - should.not.exist(body[0].details.problem); + body.map(a => a.action).should.eql([ + 'submission.backlog.force', + 'submission.backlog.hold', + 'submission.create' + ]); + body[2].action.should.eql('submission.create'); + should.not.exist(body[2].details.problem); }); })); }); diff --git a/test/integration/other/analytics-queries.js b/test/integration/other/analytics-queries.js index a0948c821..aa02493a9 100644 --- a/test/integration/other/analytics-queries.js +++ b/test/integration/other/analytics-queries.js @@ -1580,7 +1580,7 @@ describe('analytics task queries', function () { countInterruptedBranches.should.equal(4); })); - it('should count number of submission.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => { + it('should count number of submission.backlog.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => { await createTestForm(service, container, testData.forms.offlineEntity, 1); const asAlice = await service.login('alice'); @@ -1915,7 +1915,7 @@ describe('analytics task queries', function () { .expect(200); // switching the order of these updates triggers the - // submission.reprocess count + // submission.backlog.reprocess count await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') .send(testData.instances.offlineEntity.one .replace('one', 'one-update2')