From e763a303aea8f6aedf532c69bb1a0c4f44852ed8 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 25 Oct 2024 11:52:55 +0100 Subject: [PATCH] Apply timeserial dependant operations only if op timeserial is > than object/entry timeserial --- src/plugins/liveobjects/livemap.ts | 18 +++++++---- src/plugins/liveobjects/liveobjectspool.ts | 9 ++++-- test/realtime/live_objects.test.js | 36 ++++++++++++---------- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 42d8248e7..b0427a9a0 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -224,13 +224,16 @@ export class LiveMap extends LiveObject { const { ErrorInfo, Utils } = this._client; const existingEntry = this._dataRef.data.get(op.key); - if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) { - // the operation's origin timeserial < the entry's timeserial, ignore the operation. + if ( + existingEntry && + (opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial)) + ) { + // the operation's origin timeserial <= the entry's timeserial, ignore the operation. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveMap._applyMapSet()', - `skipping updating key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`, + `skipping update for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`, ); return; } @@ -271,13 +274,16 @@ export class LiveMap extends LiveObject { private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): void { const existingEntry = this._dataRef.data.get(op.key); - if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) { - // the operation's origin timeserial < the entry's timeserial, ignore the operation. + if ( + existingEntry && + (opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial)) + ) { + // the operation's origin timeserial <= the entry's timeserial, ignore the operation. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveMap._applyMapRemove()', - `skipping removing key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`, + `skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`, ); return; } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 3d41e2e23..aef14cc8d 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -151,13 +151,16 @@ export class LiveObjectsPool { } // otherwise we need to compare regional timeserials - if (regionalTimeserial.before(existingObject.getRegionalTimeserial())) { - // the operation's regional timeserial < the object's timeserial, ignore the operation. + if ( + regionalTimeserial.before(existingObject.getRegionalTimeserial()) || + regionalTimeserial.equal(existingObject.getRegionalTimeserial()) + ) { + // the operation's regional timeserial <= the object's timeserial, ignore the operation. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', - `skipping applying buffered state operation message as existing object has greater regional timeserial: ${existingObject.getRegionalTimeserial().toString()}, than the op: ${regionalTimeserial.toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`, + `skipping buffered state operation message: op regional timeserial ${regionalTimeserial.toString()} <= object regional timeserial ${existingObject.getRegionalTimeserial().toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`, ); continue; } diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 4d472d1a4..a1a9ffaef 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -1070,23 +1070,27 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ], }); - // inject operations with older regional timeserial, expect them not to be applied when sync ends + // inject operations with older or equal regional timeserial, expect them not to be applied when sync ends await Promise.all( - ['root', mapId].flatMap((objectId) => - primitiveKeyData.map((keyData) => - liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial: '@0-0', - state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })], - }), - ), - ), + ['@0-0', '@1-0'].map(async (serial) => { + await Promise.all( + ['root', mapId].flatMap((objectId) => + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })], + }), + ), + ), + ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + }), ); - await liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial: '@0-0', - state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], - }); // inject operations with greater regional timeserial, expect them to be applied when sync ends await Promise.all( @@ -1110,7 +1114,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], syncSerial: 'serial:', }); - // check operations with older regional timeserial are not applied + // check operations with older or equal regional timeserial are not applied // counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations primitiveKeyData.forEach((keyData) => { expect(