diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 7a80c2de9..f541a56e0 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -8,12 +8,17 @@ import { LiveObject } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; -import { DefaultTimeserial } from './timeserial'; +import { DefaultTimeserial, Timeserial } from './timeserial'; enum LiveObjectsEvents { SyncCompleted = 'SyncCompleted', } +export interface BufferedStateMessage { + stateMessage: StateMessage; + regionalTimeserial: Timeserial; +} + export class LiveObjects { private _client: BaseClient; private _channel: RealtimeChannel; @@ -25,6 +30,7 @@ export class LiveObjects { private _syncInProgress: boolean; private _currentSyncId: string | undefined; private _currentSyncCursor: string | undefined; + private _bufferedStateOperations: BufferedStateMessage[]; constructor(channel: RealtimeChannel) { this._channel = channel; @@ -33,6 +39,7 @@ export class LiveObjects { this._liveObjectsPool = new LiveObjectsPool(this); this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this); this._syncInProgress = true; + this._bufferedStateOperations = []; } async getRoot(): Promise { @@ -87,8 +94,16 @@ export class LiveObjects { */ handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void { const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial); + if (this._syncInProgress) { - // TODO: handle buffering of state messages during SYNC + // the client receives state messages in realtime over the channel concurrently with the SYNC sequence, + // which means that some of the state messages may already be applied by the realtime in the SYNC sequence once it's ended. + // to avoid re-applying those state messages (for example double counting COUNTER_INC operation), + // we buffer incoming state operation messsages while SYNC is in progress, and apply them once SYNC has ended. + stateMessages.forEach((x) => + this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }), + ); + return; } this._liveObjectsPool.applyStateMessages(stateMessages, timeserial); @@ -102,7 +117,7 @@ export class LiveObjects { this._client.logger, this._client.Logger.LOG_MINOR, 'LiveObjects.onAttached()', - 'channel = ' + this._channel.name + ', hasState = ' + hasState, + `channel=${this._channel.name}, hasState=${hasState}`, ); if (hasState) { @@ -137,6 +152,8 @@ export class LiveObjects { } private _startNewSync(syncId?: string, syncCursor?: string): void { + // need to discard all buffered state operation messages on new sync start + this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); this._currentSyncId = syncId; this._currentSyncCursor = syncCursor; @@ -144,9 +161,11 @@ export class LiveObjects { } private _endSync(): void { - // TODO: handle applying buffered state messages when SYNC is finished - this._applySync(); + // should apply buffered state operations after we applied the SYNC data + this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations); + + this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); this._currentSyncId = undefined; this._currentSyncCursor = undefined; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index c1466a8fc..3d41e2e23 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -3,7 +3,7 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; -import { LiveObjects } from './liveobjects'; +import { BufferedStateMessage, LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; import { DefaultTimeserial, Timeserial } from './timeserial'; @@ -126,6 +126,46 @@ export class LiveObjectsPool { } } + applyBufferedStateMessages(bufferedStateMessages: BufferedStateMessage[]): void { + // since we receive state operation messages concurrently with the SYNC sequence, + // we must determine which operation messages should be applied to the now local copy of the object pool, and the rest will be skipped. + // since messages are delivered in regional order to the client, we can inspect the regional timeserial + // of each state operation message to know whether it has reached a point in the message stream + // that is no longer included in the state object snapshot we received from SYNC sequence. + for (const { regionalTimeserial, stateMessage } of bufferedStateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const existingObject = this.get(stateMessage.operation.objectId); + if (!existingObject) { + // for object ids we haven't seen yet we can apply operation immediately + this.applyStateMessages([stateMessage], regionalTimeserial); + continue; + } + + // otherwise we need to compare regional timeserials + if (regionalTimeserial.before(existingObject.getRegionalTimeserial())) { + // the operation's regional timeserial < the object's timeserial, ignore the operation. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', + `skipping applying buffered state operation message as existing object has greater regional timeserial: ${existingObject.getRegionalTimeserial().toString()}, than the op: ${regionalTimeserial.toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + this.applyStateMessages([stateMessage], regionalTimeserial); + } + } + private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID);