Skip to content

Commit

Permalink
Implement buffering and application of operations during a SYNC sequence
Browse files Browse the repository at this point in the history
- state operation messages are buffered while SYNC is in progress
- all buffered operations are discarded when new SYNC starts
- when SYNC ends operations to apply are decided based on the regional
timeserial of the message
- eligible operations are applied via a regular LiveObject operation
application logic

Resolves DTP-955
  • Loading branch information
VeskeR committed Oct 25, 2024
1 parent 372bd1c commit 422a148
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
29 changes: 24 additions & 5 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import { LiveObject } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';
import { DefaultTimeserial } from './timeserial';
import { DefaultTimeserial, Timeserial } from './timeserial';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
}

export interface BufferedStateMessage {
stateMessage: StateMessage;
regionalTimeserial: Timeserial;
}

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
Expand All @@ -25,6 +30,7 @@ export class LiveObjects {
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: BufferedStateMessage[];

constructor(channel: RealtimeChannel) {
this._channel = channel;
Expand All @@ -33,6 +39,7 @@ export class LiveObjects {
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
this._bufferedStateOperations = [];
}

async getRoot(): Promise<LiveMap> {
Expand Down Expand Up @@ -87,8 +94,16 @@ export class LiveObjects {
*/
handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void {
const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial);

if (this._syncInProgress) {
// TODO: handle buffering of state messages during SYNC
// the client receives state messages in realtime over the channel concurrently with the SYNC sequence,
// which means that some of the state messages may already be applied by the realtime in the SYNC sequence once it's ended.
// to avoid re-applying those state messages (for example double counting COUNTER_INC operation),
// we buffer incoming state operation messsages while SYNC is in progress, and apply them once SYNC has ended.
stateMessages.forEach((x) =>
this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }),
);
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages, timeserial);
Expand All @@ -102,7 +117,7 @@ export class LiveObjects {
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.onAttached()',
'channel = ' + this._channel.name + ', hasState = ' + hasState,
`channel=${this._channel.name}, hasState=${hasState}`,
);

if (hasState) {
Expand Down Expand Up @@ -137,16 +152,20 @@ export class LiveObjects {
}

private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
}

private _endSync(): void {
// TODO: handle applying buffered state messages when SYNC is finished

this._applySync();
// should apply buffered state operations after we applied the SYNC data
this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
Expand Down
42 changes: 41 additions & 1 deletion src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { BufferedStateMessage, LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';
Expand Down Expand Up @@ -126,6 +126,46 @@ export class LiveObjectsPool {
}
}

applyBufferedStateMessages(bufferedStateMessages: BufferedStateMessage[]): void {
// since we receive state operation messages concurrently with the SYNC sequence,
// we must determine which operation messages should be applied to the now local copy of the object pool, and the rest will be skipped.
// since messages are delivered in regional order to the client, we can inspect the regional timeserial
// of each state operation message to know whether it has reached a point in the message stream
// that is no longer included in the state object snapshot we received from SYNC sequence.
for (const { regionalTimeserial, stateMessage } of bufferedStateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const existingObject = this.get(stateMessage.operation.objectId);
if (!existingObject) {
// for object ids we haven't seen yet we can apply operation immediately
this.applyStateMessages([stateMessage], regionalTimeserial);
continue;
}

// otherwise we need to compare regional timeserials
if (regionalTimeserial.before(existingObject.getRegionalTimeserial())) {
// the operation's regional timeserial < the object's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()',
`skipping applying buffered state operation message as existing object has greater regional timeserial: ${existingObject.getRegionalTimeserial().toString()}, than the op: ${regionalTimeserial.toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

this.applyStateMessages([stateMessage], regionalTimeserial);
}
}

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID);
Expand Down

0 comments on commit 422a148

Please sign in to comment.