diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index b7ea4e577..4cdcd2312 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter { } } - this._liveObjects.handleStateMessages(stateMessages, message.channelSerial); + this._liveObjects.handleStateMessages(stateMessages); break; } diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 5ec413434..bfa3a99cc 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,7 +1,7 @@ import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage'; -import { Timeserial } from './timeserial'; +import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage'; +import { DefaultTimeserial } from './timeserial'; export interface LiveCounterData extends LiveObjectData { data: number; @@ -12,52 +12,35 @@ export interface LiveCounterUpdate extends LiveObjectUpdate { } export class LiveCounter extends LiveObject { - constructor( - liveObjects: LiveObjects, - private _created: boolean, - initialData?: LiveCounterData | null, - objectId?: string, - regionalTimeserial?: Timeserial, - ) { - super(liveObjects, initialData, objectId, regionalTimeserial); - } - /** * Returns a {@link LiveCounter} instance with a 0 value. * * @internal */ - static zeroValue( - liveobjects: LiveObjects, - isCreated: boolean, - objectId?: string, - regionalTimeserial?: Timeserial, - ): LiveCounter { - return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial); - } - - value(): number { - return this._dataRef.data; + static zeroValue(liveobjects: LiveObjects, objectId: string): LiveCounter { + return new LiveCounter(liveobjects, objectId); } /** + * Returns a {@link LiveCounter} instance based on the provided state object. + * The provided state object must hold a valid counter object data. + * * @internal */ - isCreated(): boolean { - return this._created; + static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveCounter { + const obj = new LiveCounter(liveobjects, stateObject.objectId); + obj.overrideWithStateObject(stateObject); + return obj; } - /** - * @internal - */ - setCreated(created: boolean): void { - this._created = created; + value(): number { + return this._dataRef.data; } /** * @internal */ - applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void { + applyOperation(op: StateOperation, msg: StateMessage): void { if (op.objectId !== this.getObjectId()) { throw new this._client.ErrorInfo( `Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`, @@ -66,10 +49,24 @@ export class LiveCounter extends LiveObject ); } + const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial); + if (!this._canApplyOperation(opOriginTimeserial)) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveCounter.applyOperation()', + `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`, + ); + return; + } + // should update stored site timeserial immediately. doesn't matter if we successfully apply the op, + // as it's important to mark that the op was processed by the object + this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial; + let update: LiveCounterUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.COUNTER_CREATE: - update = this._applyCounterCreate(op.counter); + update = this._applyCounterCreate(op); break; case StateOperationAction.COUNTER_INC: @@ -90,19 +87,72 @@ export class LiveCounter extends LiveObject ); } - this.setRegionalTimeserial(opRegionalTimeserial); this.notifyUpdated(update); } + /** + * @internal + */ + overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate { + if (stateObject.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (!this._client.Utils.isNil(stateObject.createOp)) { + // it is expected that create operation can be missing in the state object, so only validate it when it exists + if (stateObject.createOp.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.action !== StateOperationAction.COUNTER_CREATE) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + } + + const previousDataRef = this._dataRef; + // override all relevant data for this object with data from the state object + this._createOperationIsMerged = false; + this._dataRef = { data: stateObject.counter?.count ?? 0 }; + this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials); + if (!this._client.Utils.isNil(stateObject.createOp)) { + this._mergeInitialDataFromCreateOperation(stateObject.createOp); + } + + return this._updateFromDataDiff(previousDataRef, this._dataRef); + } + protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } - protected _updateFromDataDiff(currentDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate { - const counterDiff = newDataRef.data - currentDataRef.data; + protected _updateFromDataDiff(prevDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate { + const counterDiff = newDataRef.data - prevDataRef.data; return { update: { inc: counterDiff } }; } + protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveCounterUpdate { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + // note that it is intentional to SUM the incoming count from the create op. + // if we got here, it means that current counter instance is missing the initial value in its data reference, + // which we're going to add now. + this._dataRef.data += stateOperation.counter?.count ?? 0; + this._createOperationIsMerged = true; + + return { update: { inc: stateOperation.counter?.count ?? 0 } }; + } + private _throwNoPayloadError(op: StateOperation): void { throw new this._client.ErrorInfo( `No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, @@ -111,32 +161,21 @@ export class LiveCounter extends LiveObject ); } - private _applyCounterCreate(op: StateCounter | undefined): LiveCounterUpdate | LiveObjectUpdateNoop { - if (this.isCreated()) { - // skip COUNTER_CREATE op if this counter is already created + private _applyCounterCreate(op: StateOperation): LiveCounterUpdate | LiveObjectUpdateNoop { + if (this._createOperationIsMerged) { + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveCounter._applyCounterCreate()', - `skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`, + `skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`, ); return { noop: true }; } - if (this._client.Utils.isNil(op)) { - // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. - // we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation - this.setCreated(true); - return { update: { inc: 0 } }; - } - - // note that it is intentional to SUM the incoming count from the create op. - // if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op, - // so it is missing the initial value that we're going to add now. - this._dataRef.data += op.count ?? 0; - this.setCreated(true); - - return { update: { inc: op.count ?? 0 } }; + return this._mergeInitialDataFromCreateOperation(op); } private _applyCounterInc(op: StateCounterOp): LiveCounterUpdate { diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 97129f896..b29654c02 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,14 +1,13 @@ import deepEqual from 'deep-equal'; -import type BaseClient from 'common/lib/client/baseclient'; import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, - StateMap, StateMapEntry, StateMapOp, StateMessage, + StateObject, StateOperation, StateOperationAction, StateValue, @@ -50,11 +49,9 @@ export class LiveMap extends LiveObject { constructor( liveObjects: LiveObjects, private _semantics: MapSemantics, - initialData?: LiveMapData | null, - objectId?: string, - regionalTimeserial?: Timeserial, + objectId: string, ) { - super(liveObjects, initialData, objectId, regionalTimeserial); + super(liveObjects, objectId); } /** @@ -62,41 +59,20 @@ export class LiveMap extends LiveObject { * * @internal */ - static zeroValue(liveobjects: LiveObjects, objectId?: string, regionalTimeserial?: Timeserial): LiveMap { - return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, regionalTimeserial); + static zeroValue(liveobjects: LiveObjects, objectId: string): LiveMap { + return new LiveMap(liveobjects, MapSemantics.LWW, objectId); } /** + * Returns a {@link LiveMap} instance based on the provided state object. + * The provided state object must hold a valid map object data. + * * @internal */ - static liveMapDataFromMapEntries(client: BaseClient, entries: Record): LiveMapData { - const liveMapData: LiveMapData = { - data: new Map(), - }; - - // need to iterate over entries manually to work around optional parameters from state object entries type - Object.entries(entries ?? {}).forEach(([key, entry]) => { - let liveData: StateData; - if (typeof entry.data.objectId !== 'undefined') { - liveData = { objectId: entry.data.objectId } as ObjectIdStateData; - } else { - liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData; - } - - const liveDataEntry: MapEntry = { - ...entry, - timeserial: entry.timeserial - ? DefaultTimeserial.calculateTimeserial(client, entry.timeserial) - : DefaultTimeserial.zeroValueTimeserial(client), - // true only if we received explicit true. otherwise always false - tombstone: entry.tombstone === true, - data: liveData, - }; - - liveMapData.data.set(key, liveDataEntry); - }); - - return liveMapData; + static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveMap { + const obj = new LiveMap(liveobjects, stateObject.map?.semantics!, stateObject.objectId); + obj.overrideWithStateObject(stateObject); + return obj; } /** @@ -144,7 +120,7 @@ export class LiveMap extends LiveObject { /** * @internal */ - applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void { + applyOperation(op: StateOperation, msg: StateMessage): void { if (op.objectId !== this.getObjectId()) { throw new this._client.ErrorInfo( `Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`, @@ -153,10 +129,24 @@ export class LiveMap extends LiveObject { ); } + const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial); + if (!this._canApplyOperation(opOriginTimeserial)) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveMap.applyOperation()', + `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`, + ); + return; + } + // should update stored site timeserial immediately. doesn't matter if we successfully apply the op, + // as it's important to mark that the op was processed by the object + this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial; + let update: LiveMapUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.MAP_CREATE: - update = this._applyMapCreate(op.map); + update = this._applyMapCreate(op); break; case StateOperationAction.MAP_SET: @@ -165,7 +155,7 @@ export class LiveMap extends LiveObject { // leave an explicit return here, so that TS knows that update object is always set after the switch statement. return; } else { - update = this._applyMapSet(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); + update = this._applyMapSet(op.mapOp, opOriginTimeserial); } break; @@ -175,7 +165,7 @@ export class LiveMap extends LiveObject { // leave an explicit return here, so that TS knows that update object is always set after the switch statement. return; } else { - update = this._applyMapRemove(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); + update = this._applyMapRemove(op.mapOp, opOriginTimeserial); } break; @@ -187,18 +177,76 @@ export class LiveMap extends LiveObject { ); } - this.setRegionalTimeserial(opRegionalTimeserial); this.notifyUpdated(update); } + /** + * @internal + */ + overrideWithStateObject(stateObject: StateObject): LiveMapUpdate { + if (stateObject.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object objectId=${stateObject.objectId}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.map?.semantics !== this._semantics) { + throw new this._client.ErrorInfo( + `Invalid state object: state object map semantics=${stateObject.map?.semantics}; LiveMap semantics=${this._semantics}`, + 50000, + 500, + ); + } + + if (!this._client.Utils.isNil(stateObject.createOp)) { + // it is expected that create operation can be missing in the state object, so only validate it when it exists + if (stateObject.createOp.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.action !== StateOperationAction.MAP_CREATE) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.map?.semantics !== this._semantics) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp map semantics=${stateObject.createOp.map?.semantics}; LiveMap semantics=${this._semantics}`, + 50000, + 500, + ); + } + } + + const previousDataRef = this._dataRef; + // override all relevant data for this object with data from the state object + this._createOperationIsMerged = false; + this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {}); + this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials); + if (!this._client.Utils.isNil(stateObject.createOp)) { + this._mergeInitialDataFromCreateOperation(stateObject.createOp); + } + + return this._updateFromDataDiff(previousDataRef, this._dataRef); + } + protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } - protected _updateFromDataDiff(currentDataRef: LiveMapData, newDataRef: LiveMapData): LiveMapUpdate { + protected _updateFromDataDiff(prevDataRef: LiveMapData, newDataRef: LiveMapData): LiveMapUpdate { const update: LiveMapUpdate = { update: {} }; - for (const [key, currentEntry] of currentDataRef.data.entries()) { + for (const [key, currentEntry] of prevDataRef.data.entries()) { // any non-tombstoned properties that exist on a current map, but not in the new data - got removed if (currentEntry.tombstone === false && !newDataRef.data.has(key)) { update.update[key] = 'removed'; @@ -206,7 +254,7 @@ export class LiveMap extends LiveObject { } for (const [key, newEntry] of newDataRef.data.entries()) { - if (!currentDataRef.data.has(key)) { + if (!prevDataRef.data.has(key)) { // if property does not exist in the current map, but new data has it as a non-tombstoned property - got updated if (newEntry.tombstone === false) { update.update[key] = 'updated'; @@ -220,7 +268,7 @@ export class LiveMap extends LiveObject { } // properties that exist both in current and new map data need to have their values compared to decide on the update type - const currentEntry = currentDataRef.data.get(key)!; + const currentEntry = prevDataRef.data.get(key)!; // compare tombstones first if (currentEntry.tombstone === true && newEntry.tombstone === false) { @@ -249,33 +297,17 @@ export class LiveMap extends LiveObject { return update; } - private _throwNoPayloadError(op: StateOperation): void { - throw new this._client.ErrorInfo( - `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, - 50000, - 500, - ); - } - - private _applyMapCreate(op: StateMap | undefined): LiveMapUpdate | LiveObjectUpdateNoop { - if (this._client.Utils.isNil(op)) { + protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveMapUpdate { + if (this._client.Utils.isNil(stateOperation.map)) { // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. // in this case there is nothing to merge into the current map, so we can just end processing the op. return { update: {} }; } - if (this._semantics !== op.semantics) { - throw new this._client.ErrorInfo( - `Cannot apply MAP_CREATE op on LiveMap objectId=${this.getObjectId()}; map's semantics=${this._semantics}, but op expected ${op.semantics}`, - 50000, - 500, - ); - } - const aggregatedUpdate: LiveMapUpdate | LiveObjectUpdateNoop = { update: {} }; // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. - Object.entries(op.entries ?? {}).forEach(([key, entry]) => { + Object.entries(stateOperation.map.entries ?? {}).forEach(([key, entry]) => { // for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message const opOriginTimeserial = entry.timeserial ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) @@ -298,9 +330,44 @@ export class LiveMap extends LiveObject { Object.assign(aggregatedUpdate.update, update.update); }); + this._createOperationIsMerged = true; + return aggregatedUpdate; } + private _throwNoPayloadError(op: StateOperation): void { + throw new this._client.ErrorInfo( + `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop { + if (this._createOperationIsMerged) { + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveMap._applyMapCreate()', + `skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${this._objectId}`, + ); + return { noop: true }; + } + + if (this._semantics !== op.map?.semantics) { + throw new this._client.ErrorInfo( + `Cannot apply MAP_CREATE op on LiveMap objectId=${this.getObjectId()}; map's semantics=${this._semantics}, but op expected ${op.map?.semantics}`, + 50000, + 500, + ); + } + + return this._mergeInitialDataFromCreateOperation(op); + } + private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop { const { ErrorInfo, Utils } = this._client; @@ -385,4 +452,34 @@ export class LiveMap extends LiveObject { return { update: { [op.key]: 'removed' } }; } + + private _liveMapDataFromMapEntries(entries: Record): LiveMapData { + const liveMapData: LiveMapData = { + data: new Map(), + }; + + // need to iterate over entries manually to work around optional parameters from state object entries type + Object.entries(entries ?? {}).forEach(([key, entry]) => { + let liveData: StateData; + if (typeof entry.data.objectId !== 'undefined') { + liveData = { objectId: entry.data.objectId } as ObjectIdStateData; + } else { + liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData; + } + + const liveDataEntry: MapEntry = { + ...entry, + timeserial: entry.timeserial + ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) + : DefaultTimeserial.zeroValueTimeserial(this._client), + // true only if we received explicit true. otherwise always false + tombstone: entry.tombstone === true, + data: liveData, + }; + + liveMapData.data.set(key, liveDataEntry); + }); + + return liveMapData; + } } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index 78244ab65..04ae7d4e2 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,7 +1,7 @@ import type BaseClient from 'common/lib/client/baseclient'; import type EventEmitter from 'common/lib/util/eventemitter'; import { LiveObjects } from './liveobjects'; -import { StateMessage, StateOperation } from './statemessage'; +import { StateMessage, StateObject, StateOperation } from './statemessage'; import { DefaultTimeserial, Timeserial } from './timeserial'; enum LiveObjectEvents { @@ -32,22 +32,26 @@ export abstract class LiveObject< > { protected _client: BaseClient; protected _eventEmitter: EventEmitter; - protected _dataRef: TData; protected _objectId: string; - protected _regionalTimeserial: Timeserial; + /** + * Represents an aggregated value for an object, which combines the initial value for an object from the create operation, + * and all state operations applied to the object. + */ + protected _dataRef: TData; + protected _siteTimeserials: Record; + protected _createOperationIsMerged: boolean; - constructor( + protected constructor( protected _liveObjects: LiveObjects, - initialData?: TData | null, - objectId?: string, - regionalTimeserial?: Timeserial, + objectId: string, ) { this._client = this._liveObjects.getClient(); this._eventEmitter = new this._client.EventEmitter(this._client.logger); - this._dataRef = initialData ?? this._getZeroValueData(); - this._objectId = objectId ?? this._createObjectId(); - // use zero value timeserial by default, so any future operation can be applied for this object - this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client); + this._dataRef = this._getZeroValueData(); + this._createOperationIsMerged = false; + this._objectId = objectId; + // use empty timeserials vector by default, so any future operation can be applied to this object + this._siteTimeserials = {}; } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -83,33 +87,10 @@ export abstract class LiveObject< } /** - * @internal - */ - getRegionalTimeserial(): Timeserial { - return this._regionalTimeserial; - } - - /** - * Sets a new data reference for the LiveObject and returns an update object that describes the changes applied based on the object's previous value. + * Emits the {@link LiveObjectEvents.Updated} event with provided update object if it isn't a noop. * * @internal */ - setData(newDataRef: TData): TUpdate { - const update = this._updateFromDataDiff(this._dataRef, newDataRef); - this._dataRef = newDataRef; - return update; - } - - /** - * @internal - */ - setRegionalTimeserial(regionalTimeserial: Timeserial): void { - this._regionalTimeserial = regionalTimeserial; - } - - /** - * @internal - */ notifyUpdated(update: TUpdate | LiveObjectUpdateNoop): void { // should not emit update event if update was noop if ((update as LiveObjectUpdateNoop).noop) { @@ -119,18 +100,68 @@ export abstract class LiveObject< this._eventEmitter.emit(LiveObjectEvents.Updated, update); } + /** + * Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object. + * + * An operation should be applied if the origin timeserial is strictly greater than the timeserial in the site timeserials for the same site. + * If the site timeserials do not contain a timeserial for the site of the origin timeserial, the operation should be applied. + */ + protected _canApplyOperation(opOriginTimeserial: Timeserial): boolean { + const siteTimeserial = this._siteTimeserials[opOriginTimeserial.siteCode]; + return !siteTimeserial || opOriginTimeserial.after(siteTimeserial); + } + + protected _timeserialMapFromStringMap(stringTimeserialsMap: Record): Record { + const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce( + (acc, v) => { + const [key, timeserialString] = v; + acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString); + return acc; + }, + {} as Record, + ); + + return objTimeserialsMap; + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); } /** + * Apply state operation message on live object. + * + * @internal + */ + abstract applyOperation(op: StateOperation, msg: StateMessage): void; + /** + * Overrides internal data for live object with data from the given state object. + * Provided state object should hold a valid data for current live object, e.g. counter data for LiveCounter, map data for LiveMap. + * + * State objects are received during SYNC sequence, and SYNC sequence is a source of truth for the current state of the objects, + * so we can use the data received from the SYNC sequence directly and override any data values or site timeserials this live object has + * without the need to merge them. + * + * Returns an update object that describes the changes applied based on the object's previous value. + * * @internal */ - abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void; + abstract overrideWithStateObject(stateObject: StateObject): TUpdate; protected abstract _getZeroValueData(): TData; /** * Calculate the update object based on the current Live Object data and incoming new data. */ - protected abstract _updateFromDataDiff(currentDataRef: TData, newDataRef: TData): TUpdate; + protected abstract _updateFromDataDiff(prevDataRef: TData, newDataRef: TData): TUpdate; + /** + * Merges the initial data from the create operation into the live object state. + * + * Client SDKs do not need to keep around the state operation that created the object, + * so we can merge the initial data the first time we receive it for the object, + * and work with aggregated value after that. + * + * This saves us from needing to merge the initial value with operations applied to + * the object every time the object is read. + */ + protected abstract _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): TUpdate; } diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index a5070ec99..5ba586fe4 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -7,18 +7,12 @@ import { LiveMap } from './livemap'; import { LiveObject, LiveObjectUpdate } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; -import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; -import { DefaultTimeserial, Timeserial } from './timeserial'; +import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { SyncCompleted = 'SyncCompleted', } -export interface BufferedStateMessage { - stateMessage: StateMessage; - regionalTimeserial: Timeserial; -} - export class LiveObjects { private _client: BaseClient; private _channel: RealtimeChannel; @@ -30,7 +24,7 @@ export class LiveObjects { private _syncInProgress: boolean; private _currentSyncId: string | undefined; private _currentSyncCursor: string | undefined; - private _bufferedStateOperations: BufferedStateMessage[]; + private _bufferedStateOperations: StateMessage[]; constructor(channel: RealtimeChannel) { this._channel = channel; @@ -92,23 +86,17 @@ export class LiveObjects { /** * @internal */ - handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void { - const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial); - + handleStateMessages(stateMessages: StateMessage[]): void { if (this._syncInProgress) { // The client receives state messages in realtime over the channel concurrently with the SYNC sequence. // Some of the incoming state messages may have already been applied to the state objects described in // the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply - // them to the state objects once the SYNC is complete. To avoid double-counting, the buffered operations - // are applied according to the state object's regional timeserial, which reflects the regional timeserial - // of the state message that was last applied to that state object. - stateMessages.forEach((x) => - this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }), - ); + // them to the state objects once the SYNC is complete. + this._bufferedStateOperations.push(...stateMessages); return; } - this._liveObjectsPool.applyStateMessages(stateMessages, timeserial); + this._liveObjectsPool.applyStateMessages(stateMessages); } /** @@ -164,8 +152,9 @@ export class LiveObjects { private _endSync(): void { this._applySync(); - // should apply buffered state operations after we applied the SYNC data - this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations); + // should apply buffered state operations after we applied the SYNC data. + // can use regular state messages application logic + this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations); this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); @@ -204,16 +193,11 @@ export class LiveObjects { for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { receivedObjectIds.add(objectId); const existingObject = this._liveObjectsPool.get(objectId); - const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial); if (existingObject) { - const update = existingObject.setData(entry.objectData); - existingObject.setRegionalTimeserial(regionalTimeserialObj); - if (existingObject instanceof LiveCounter) { - existingObject.setCreated((entry as LiveCounterDataEntry).created); - } - // store updates for existing objects to call subscription callbacks for all of them once the SYNC sequence is completed. - // this will ensure that clients get notified about changes only once everything was applied. + const update = existingObject.overrideWithStateObject(entry.stateObject); + // store updates to call subscription callbacks for all of them once the SYNC sequence is completed. + // this will ensure that clients get notified about the changes only once everything has been applied. existingObjectUpdates.push({ object: existingObject, update }); continue; } @@ -223,11 +207,11 @@ export class LiveObjects { const objectType = entry.objectType; switch (objectType) { case 'LiveCounter': - newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, regionalTimeserialObj); + newObject = LiveCounter.fromStateObject(this, entry.stateObject); break; case 'LiveMap': - newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, regionalTimeserialObj); + newObject = LiveMap.fromStateObject(this, entry.stateObject); break; default: diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index a4ea8c92e..2c57f1084 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -3,10 +3,9 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; -import { BufferedStateMessage, LiveObjects } from './liveobjects'; +import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; -import { DefaultTimeserial, Timeserial } from './timeserial'; +import { StateMessage, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -52,24 +51,22 @@ export class LiveObjectsPool { } const parsedObjectId = ObjectId.fromString(this._client, objectId); - // use zero value timeserial, so any operation can be applied for this object - const regionalTimeserial = DefaultTimeserial.zeroValueTimeserial(this._client); let zeroValueObject: LiveObject; switch (parsedObjectId.type) { case 'map': { - zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId, regionalTimeserial); + zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId); break; } case 'counter': - zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId, regionalTimeserial); + zeroValueObject = LiveCounter.zeroValue(this._liveObjects, objectId); break; } this.set(objectId, zeroValueObject); } - applyStateMessages(stateMessages: StateMessage[], regionalTimeserial: Timeserial): void { + applyStateMessages(stateMessages: StateMessage[]): void { for (const stateMessage of stateMessages) { if (!stateMessage.operation) { this._client.Logger.logAction( @@ -86,31 +83,17 @@ export class LiveObjectsPool { switch (stateOperation.action) { case StateOperationAction.MAP_CREATE: case StateOperationAction.COUNTER_CREATE: - if (this.get(stateOperation.objectId)) { - // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), - // so delegate application of the op to that object - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); - break; - } - - // otherwise we can create new objects in the pool - if (stateOperation.action === StateOperationAction.MAP_CREATE) { - this._handleMapCreate(stateOperation, regionalTimeserial); - } - - if (stateOperation.action === StateOperationAction.COUNTER_CREATE) { - this._handleCounterCreate(stateOperation, regionalTimeserial); - } - break; - case StateOperationAction.MAP_SET: case StateOperationAction.MAP_REMOVE: case StateOperationAction.COUNTER_INC: // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, - // we create a zero-value object for the provided object id, and apply operation for that zero-value object. - // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. + // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to be able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. this.createZeroValueObjectIfNotExists(stateOperation.objectId); - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); break; default: @@ -124,90 +107,10 @@ export class LiveObjectsPool { } } - applyBufferedStateMessages(bufferedStateMessages: BufferedStateMessage[]): void { - // since we receive state operation messages concurrently with the SYNC sequence, - // we must determine which operation messages should be applied to the now local copy of the object pool, and the rest will be skipped. - // since messages are delivered in regional order to the client, we can inspect the regional timeserial - // of each state operation message to know whether it has reached a point in the message stream - // that is no longer included in the state object snapshot we received from SYNC sequence. - for (const { regionalTimeserial, stateMessage } of bufferedStateMessages) { - if (!stateMessage.operation) { - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', - `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - continue; - } - - const existingObject = this.get(stateMessage.operation.objectId); - if (!existingObject) { - // for object ids we haven't seen yet we can apply operation immediately - this.applyStateMessages([stateMessage], regionalTimeserial); - continue; - } - - // otherwise we need to compare regional timeserials - if ( - regionalTimeserial.before(existingObject.getRegionalTimeserial()) || - regionalTimeserial.equal(existingObject.getRegionalTimeserial()) - ) { - // the operation's regional timeserial <= the object's timeserial, ignore the operation. - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MICRO, - 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', - `skipping buffered state operation message: op regional timeserial ${regionalTimeserial.toString()} <= object regional timeserial ${existingObject.getRegionalTimeserial().toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - continue; - } - - this.applyStateMessages([stateMessage], regionalTimeserial); - } - } - private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; } - - private _handleCounterCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void { - let counter: LiveCounter; - if (this._client.Utils.isNil(stateOperation.counter)) { - // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly a zero-value counter. - counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId, opRegionalTimeserial); - } else { - counter = new LiveCounter( - this._liveObjects, - true, - { data: stateOperation.counter.count ?? 0 }, - stateOperation.objectId, - opRegionalTimeserial, - ); - } - - this.set(stateOperation.objectId, counter); - } - - private _handleMapCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void { - let map: LiveMap; - if (this._client.Utils.isNil(stateOperation.map)) { - // if a map object is missing for the MAP_CREATE op, the initial value is implicitly a zero-value map. - map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId, opRegionalTimeserial); - } else { - const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); - map = new LiveMap( - this._liveObjects, - stateOperation.map.semantics ?? MapSemantics.LWW, - objectData, - stateOperation.objectId, - opRegionalTimeserial, - ); - } - - this.set(stateOperation.objectId, map); - } } diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index b4eefdadd..cadcccca8 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -71,12 +71,6 @@ export interface StateMap { export interface StateCounter { /** The value of the counter */ count?: number; - /** - * Indicates (true) if the counter has seen an explicit create operation - * and false if the counter was created with a default value when - * processing a regular operation. - */ - created: boolean; } /** A StateOperation describes an operation to be applied to a state object. */ @@ -110,11 +104,23 @@ export interface StateOperation { export interface StateObject { /** The identifier of the state object. */ objectId: string; - /** The *regional* timeserial of the last operation that was applied to this state object. */ - regionalTimeserial: string; - /** The data that represents the state of the object if it is a Map object type. */ + /** A vector of origin timeserials keyed by site code of the last operation that was applied to this state object. */ + siteTimeserials: Record; + /** + * The operation that created the state object. + * + * Can be missing if create operation for the object is not known at this point. + */ + createOp?: StateOperation; + /** + * The data that represents the result of applying all operations to a Map object + * excluding the initial value from the create operation if it is a Map object type. + */ map?: StateMap; - /** The data that represents the state of the object if it is a Counter object type. */ + /** + * The data that represents the result of applying all operations to a Counter object + * excluding the initial value from the create operation if it is a Counter object type. + */ counter?: StateCounter; } @@ -144,41 +150,24 @@ export class StateMessage { ): Promise { // TODO: decide how to handle individual errors from decoding values. currently we throw first ever error we get - const decodeMapEntry = async ( - entry: StateMapEntry, - ctx: ChannelOptions, - decode: typeof decodeData, - ): Promise => { - const { data, encoding, error } = await decode(entry.data.value, entry.data.encoding, ctx); - entry.data.value = data; - entry.data.encoding = encoding ?? undefined; - - if (error) { - throw error; - } - }; - if (message.object?.map?.entries) { - for (const entry of Object.values(message.object.map.entries)) { - await decodeMapEntry(entry, inputContext, decodeDataFn); - } + await StateMessage._decodeMapEntries(message.object.map.entries, inputContext, decodeDataFn); + } + + if (message.object?.createOp?.map?.entries) { + await StateMessage._decodeMapEntries(message.object.createOp.map.entries, inputContext, decodeDataFn); } - if (message.operation?.map) { - for (const entry of Object.values(message.operation.map)) { - await decodeMapEntry(entry, inputContext, decodeDataFn); - } + if (message.object?.createOp?.mapOp?.data && 'value' in message.object.createOp.mapOp.data) { + await StateMessage._decodeStateData(message.object.createOp.mapOp.data, inputContext, decodeDataFn); } - if (message.operation?.mapOp?.data && 'value' in message.operation?.mapOp?.data) { - const mapOpData = message.operation.mapOp.data; - const { data, encoding, error } = await decodeDataFn(mapOpData.value, mapOpData.encoding, inputContext); - mapOpData.value = data; - mapOpData.encoding = encoding ?? undefined; + if (message.operation?.map?.entries) { + await StateMessage._decodeMapEntries(message.operation.map.entries, inputContext, decodeDataFn); + } - if (error) { - throw error; - } + if (message.operation?.mapOp?.data && 'value' in message.operation.mapOp.data) { + await StateMessage._decodeStateData(message.operation.mapOp.data, inputContext, decodeDataFn); } } @@ -191,12 +180,142 @@ export class StateMessage { const result = new Array(count); for (let i = 0; i < count; i++) { - result[i] = this.fromValues(values[i] as Record, platform); + result[i] = StateMessage.fromValues(values[i] as Record, platform); } return result; } + private static async _decodeMapEntries( + mapEntries: Record, + inputContext: ChannelOptions, + decodeDataFn: typeof decodeData, + ): Promise { + for (const entry of Object.values(mapEntries)) { + await StateMessage._decodeStateData(entry.data, inputContext, decodeDataFn); + } + } + + private static async _decodeStateData( + stateData: StateData, + inputContext: ChannelOptions, + decodeDataFn: typeof decodeData, + ): Promise { + const { data, encoding, error } = await decodeDataFn(stateData.value, stateData.encoding, inputContext); + stateData.value = data; + stateData.encoding = encoding ?? undefined; + + if (error) { + throw error; + } + } + + private static _encodeStateOperation( + platform: typeof Platform, + stateOperation: StateOperation, + withBase64Encoding: boolean, + ): StateOperation { + // deep copy "stateOperation" object so we can modify the copy here. + // buffer values won't be correctly copied, so we will need to set them again explictly. + const stateOperationCopy = JSON.parse(JSON.stringify(stateOperation)) as StateOperation; + + if (stateOperationCopy.mapOp?.data && 'value' in stateOperationCopy.mapOp.data) { + // use original "stateOperation" object when encoding values, so we have access to the original buffer values. + stateOperationCopy.mapOp.data = StateMessage._encodeStateData( + platform, + stateOperation.mapOp?.data!, + withBase64Encoding, + ); + } + + if (stateOperationCopy.map?.entries) { + Object.entries(stateOperationCopy.map.entries).forEach(([key, entry]) => { + // use original "stateOperation" object when encoding values, so we have access to original buffer values. + entry.data = StateMessage._encodeStateData( + platform, + stateOperation?.map?.entries?.[key].data!, + withBase64Encoding, + ); + }); + } + + return stateOperationCopy; + } + + private static _encodeStateObject( + platform: typeof Platform, + stateObject: StateObject, + withBase64Encoding: boolean, + ): StateObject { + // deep copy "stateObject" object so we can modify the copy here. + // buffer values won't be correctly copied, so we will need to set them again explictly. + const stateObjectCopy = JSON.parse(JSON.stringify(stateObject)) as StateObject; + + if (stateObjectCopy.map?.entries) { + Object.entries(stateObjectCopy.map.entries).forEach(([key, entry]) => { + // use original "stateObject" object when encoding values, so we have access to original buffer values. + entry.data = StateMessage._encodeStateData( + platform, + stateObject?.map?.entries?.[key].data!, + withBase64Encoding, + ); + }); + } + + if (stateObjectCopy.createOp) { + // use original "stateObject" object when encoding values, so we have access to original buffer values. + stateObjectCopy.createOp = StateMessage._encodeStateOperation( + platform, + stateObject.createOp!, + withBase64Encoding, + ); + } + + return stateObjectCopy; + } + + private static _encodeStateData(platform: typeof Platform, data: StateData, withBase64Encoding: boolean): StateData { + const { value, encoding } = StateMessage._encodeStateValue( + platform, + data?.value, + data?.encoding, + withBase64Encoding, + ); + return { + ...data, + value, + encoding, + }; + } + + private static _encodeStateValue( + platform: typeof Platform, + value: StateValue | undefined, + encoding: string | undefined, + withBase64Encoding: boolean, + ): { + value: StateValue | undefined; + encoding: string | undefined; + } { + if (!value || !platform.BufferUtils.isBuffer(value)) { + return { value, encoding }; + } + + if (withBase64Encoding) { + return { + value: platform.BufferUtils.base64Encode(value), + encoding: encoding ? encoding + '/base64' : 'base64', + }; + } + + // toBuffer returns a datatype understandable by + // that platform's msgpack implementation (Buffer in node, Uint8Array in browsers) + return { + value: platform.BufferUtils.toBuffer(value), + encoding, + }; + } + /** * Overload toJSON() to intercept JSON.stringify() * @return {*} @@ -215,44 +334,18 @@ export class StateMessage { // if withBase64Encoding = false - we were called by msgpack const withBase64Encoding = arguments.length > 0; - let operationCopy: StateOperation | undefined = undefined; - if (this.operation) { - // deep copy "operation" prop so we can modify it here. - // buffer values won't be correctly copied, so we will need to set them again explictly - operationCopy = JSON.parse(JSON.stringify(this.operation)) as StateOperation; - - if (operationCopy.mapOp?.data && 'value' in operationCopy.mapOp.data) { - // use original "operation" prop when encoding values, so we have access to original buffer values. - operationCopy.mapOp.data = this._encodeStateData(this.operation.mapOp?.data!, withBase64Encoding); - } - - if (operationCopy.map?.entries) { - Object.entries(operationCopy.map.entries).forEach(([key, entry]) => { - // use original "operation" prop when encoding values, so we have access to original buffer values. - entry.data = this._encodeStateData(this.operation?.map?.entries?.[key].data!, withBase64Encoding); - }); - } - } - - let object: StateObject | undefined = undefined; - if (this.object) { - // deep copy "object" prop so we can modify it here. - // buffer values won't be correctly copied, so we will need to set them again explictly - object = JSON.parse(JSON.stringify(this.object)) as StateObject; - - if (object.map?.entries) { - Object.entries(object.map.entries).forEach(([key, entry]) => { - // use original "object" prop when encoding values, so we have access to original buffer values. - entry.data = this._encodeStateData(this.object?.map?.entries?.[key].data!, withBase64Encoding); - }); - } - } + const encodedOperation = this.operation + ? StateMessage._encodeStateOperation(this._platform, this.operation, withBase64Encoding) + : undefined; + const encodedObject = this.object + ? StateMessage._encodeStateObject(this._platform, this.object, withBase64Encoding) + : undefined; return { id: this.id, clientId: this.clientId, - operation: operationCopy, - object: object, + operation: encodedOperation, + object: encodedObject, extras: this.extras, }; } @@ -275,40 +368,4 @@ export class StateMessage { return result; } - - private _encodeStateData(data: StateData, withBase64Encoding: boolean): StateData { - const { value, encoding } = this._encodeStateValue(data?.value, data?.encoding, withBase64Encoding); - return { - ...data, - value, - encoding, - }; - } - - private _encodeStateValue( - value: StateValue | undefined, - encoding: string | undefined, - withBase64Encoding: boolean, - ): { - value: StateValue | undefined; - encoding: string | undefined; - } { - if (!value || !this._platform.BufferUtils.isBuffer(value)) { - return { value, encoding }; - } - - if (withBase64Encoding) { - return { - value: this._platform.BufferUtils.base64Encode(value), - encoding: encoding ? encoding + '/base64' : 'base64', - }; - } - - // toBuffer returns a datatype understandable by - // that platform's msgpack implementation (Buffer in node, Uint8Array in browsers) - return { - value: this._platform.BufferUtils.toBuffer(value), - encoding, - }; - } } diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts index 8f0f8d648..663a1bb21 100644 --- a/src/plugins/liveobjects/syncliveobjectsdatapool.ts +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -1,29 +1,24 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; -import { LiveCounterData } from './livecounter'; -import { LiveMap } from './livemap'; -import { LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { MapSemantics, StateMessage, StateObject } from './statemessage'; +import { StateMessage, StateObject } from './statemessage'; export interface LiveObjectDataEntry { - objectData: LiveObjectData; - regionalTimeserial: string; + stateObject: StateObject; objectType: 'LiveMap' | 'LiveCounter'; } export interface LiveCounterDataEntry extends LiveObjectDataEntry { - created: boolean; objectType: 'LiveCounter'; } export interface LiveMapDataEntry extends LiveObjectDataEntry { objectType: 'LiveMap'; - semantics: MapSemantics; } export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry; +// TODO: investigate if this class is still needed after changes with createOp. objects are now initialized from the stateObject and this class does minimal processing /** * @internal */ @@ -84,30 +79,18 @@ export class SyncLiveObjectsDataPool { } private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry { - const counter = stateObject.counter!; - - const objectData: LiveCounterData = { - data: counter.count ?? 0, - }; const newEntry: LiveCounterDataEntry = { - objectData, + stateObject, objectType: 'LiveCounter', - regionalTimeserial: stateObject.regionalTimeserial, - created: counter.created, }; return newEntry; } private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry { - const map = stateObject.map!; - const objectData = LiveMap.liveMapDataFromMapEntries(this._client, map.entries ?? {}); - const newEntry: LiveMapDataEntry = { - objectData, + stateObject, objectType: 'LiveMap', - regionalTimeserial: stateObject.regionalTimeserial, - semantics: map.semantics ?? MapSemantics.LWW, }; return newEntry; diff --git a/src/plugins/liveobjects/timeserial.ts b/src/plugins/liveobjects/timeserial.ts index 0accdc81b..bc5c53550 100644 --- a/src/plugins/liveobjects/timeserial.ts +++ b/src/plugins/liveobjects/timeserial.ts @@ -9,6 +9,11 @@ export interface Timeserial { */ readonly seriesId: string; + /** + * The site code of the timeserial. + */ + readonly siteCode: string; + /** * The timestamp of the timeserial. */ @@ -40,6 +45,7 @@ export interface Timeserial { */ export class DefaultTimeserial implements Timeserial { public readonly seriesId: string; + public readonly siteCode: string; public readonly timestamp: number; public readonly counter: number; public readonly index?: number; @@ -55,6 +61,8 @@ export class DefaultTimeserial implements Timeserial { this.timestamp = timestamp; this.counter = counter; this.index = index; + // TODO: will be removed once https://ably.atlassian.net/browse/DTP-1078 is implemented on the realtime + this.siteCode = this.seriesId.slice(0, 3); // site code is stored in the first 3 letters of the epoch, which is stored in the series id field } /** diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index 82c65b737..273a2f499 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -101,11 +101,17 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb action: ACTIONS.MAP_CREATE, nonce: nonce(), objectId, + map: { + semantics: 0, + }, }, }; if (entries) { - op.operation.map = { entries }; + op.operation.map = { + ...op.operation.map, + entries, + }; } return op; @@ -175,31 +181,41 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb } mapObject(opts) { - const { objectId, regionalTimeserial, entries } = opts; + const { objectId, siteTimeserials, initialEntries, materialisedEntries } = opts; const obj = { object: { objectId, - regionalTimeserial, - map: { entries }, + siteTimeserials, + map: { + semantics: 0, + entries: materialisedEntries, + }, }, }; + if (initialEntries) { + obj.object.createOp = this.mapCreateOp({ objectId, entries: initialEntries }).operation; + } + return obj; } counterObject(opts) { - const { objectId, regionalTimeserial, count } = opts; + const { objectId, siteTimeserials, initialCount, materialisedCount } = opts; const obj = { object: { objectId, - regionalTimeserial, + siteTimeserials, counter: { - created: true, - count, + count: materialisedCount, }, }, }; + if (initialCount != null) { + obj.object.createOp = this.counterCreateOp({ objectId, count: initialCount }).operation; + } + return obj; } diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 935324527..ead316add 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -30,6 +30,21 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], expect(object.constructor.name).to.match(new RegExp(`_?${className}`), msg); } + function forScenarios(scenarios, testFn) { + // if there are scenarios marked as "only", run only them. + // otherwise go over every scenario + const onlyScenarios = scenarios.filter((x) => x.only === true); + const scenariosToRun = onlyScenarios.length > 0 ? onlyScenarios : scenarios; + + for (const scenario of scenariosToRun) { + if (scenario.skip === true) { + continue; + } + + testFn(scenario); + } + } + describe('realtime/live_objects', function () { this.timeout(60 * 1000); @@ -110,7 +125,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await liveObjectsHelper.processStateObjectMessageOnChannel({ channel: testChannel, syncSerial: 'serial:', - state: [liveObjectsHelper.mapObject({ objectId: 'root', regionalTimeserial: '@0-0' })], + state: [liveObjectsHelper.mapObject({ objectId: 'root', siteTimeserials: { '000': '000@0-0' } })], }); const publishChannel = publishClient.channels.get('channel'); @@ -277,8 +292,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], state: [ liveObjectsHelper.mapObject({ objectId: 'root', - regionalTimeserial: '@0-0', - entries: { key: { timeserial: '@0-0', data: { value: 1 } } }, + siteTimeserials: { '000': '000@0-0' }, + initialEntries: { key: { timeserial: '000@0-0', data: { value: 1 } } }, }), ], }); @@ -486,7 +501,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ]; const applyOperationsScenarios = [ { - description: 'MAP_CREATE with primitives', + description: 'can apply MAP_CREATE with primitives state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; @@ -545,7 +560,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'MAP_CREATE with object ids', + description: 'can apply MAP_CREATE with object ids state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; const withReferencesMapKey = 'withReferencesMap'; @@ -620,7 +635,86 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'MAP_SET with primitives', + description: + 'MAP_CREATE state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // need to use multiple maps as MAP_CREATE op can only be applied once to a map object + const mapIds = [ + liveObjectsHelper.fakeMapObjectId(), + liveObjectsHelper.fakeMapObjectId(), + liveObjectsHelper.fakeMapObjectId(), + liveObjectsHelper.fakeMapObjectId(), + liveObjectsHelper.fakeMapObjectId(), + ]; + await Promise.all( + mapIds.map(async (mapId, i) => { + // send a MAP_SET op first to create a zero-value map with forged site timeserials vector (from the op), and set it on a root. + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'bbb@1-0', + state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: 'foo', data: { value: 'bar' } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: `aaa@${i}-0`, + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: mapId, data: { objectId: mapId } })], + }); + }), + ); + + // inject operations with various timeserial values + for (const [i, serial] of [ + 'bbb@0-0', // existing site, earlier CGO, not applied + 'bbb@1-0', // existing site, same CGO, not applied + 'bbb@2-0', // existing site, later CGO, applied + 'aaa@0-0', // different site, earlier CGO, applied + 'ccc@9-0', // different site, later CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [ + liveObjectsHelper.mapCreateOp({ + objectId: mapIds[i], + entries: { + baz: { timeserial: serial, data: { value: 'qux' } }, + }, + }), + ], + }); + } + + // check only operations with correct timeserials were applied + const expectedMapValues = [ + { foo: 'bar' }, + { foo: 'bar' }, + { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE + { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE + { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE + ]; + + for (const [i, mapId] of mapIds.entries()) { + const expectedMapValue = expectedMapValues[i]; + const expectedKeysCount = Object.keys(expectedMapValue).length; + + expect(root.get(mapId).size()).to.equal( + expectedKeysCount, + `Check map #${i + 1} has expected number of keys after MAP_CREATE ops`, + ); + Object.entries(expectedMapValue).forEach(([key, value]) => { + expect(root.get(mapId).get(key)).to.equal( + value, + `Check map #${i + 1} has expected value for "${key}" key after MAP_CREATE ops`, + ); + }); + } + }, + }, + + { + description: 'can apply MAP_SET with primitives state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; @@ -663,7 +757,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'MAP_SET with object ids', + description: 'can apply MAP_SET with object ids state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; @@ -714,7 +808,73 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'MAP_REMOVE', + description: + 'MAP_SET state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // create new map and set it on a root with forged timeserials + const mapId = liveObjectsHelper.fakeMapObjectId(); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'bbb@1-0', + state: [ + liveObjectsHelper.mapCreateOp({ + objectId: mapId, + entries: { + foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo3: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo5: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo6: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + }, + }), + ], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'aaa@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + + // inject operations with various timeserial values + for (const [i, serial] of [ + 'bbb@0-0', // existing site, earlier site CGO, not applied + 'bbb@1-0', // existing site, same site CGO, not applied + 'bbb@2-0', // existing site, later site CGO, applied, site timeserials updated + 'bbb@2-0', // existing site, same site CGO (updated from last op), not applied + 'aaa@0-0', // different site, earlier entry CGO, not applied + 'ccc@9-0', // different site, later entry CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: `foo${i + 1}`, data: { value: 'baz' } })], + }); + } + + // check only operations with correct timeserials were applied + const expectedMapKeys = [ + { key: 'foo1', value: 'bar' }, + { key: 'foo2', value: 'bar' }, + { key: 'foo3', value: 'baz' }, // updated + { key: 'foo4', value: 'bar' }, + { key: 'foo5', value: 'bar' }, + { key: 'foo6', value: 'baz' }, // updated + ]; + + expectedMapKeys.forEach(({ key, value }) => { + expect(root.get('map').get(key)).to.equal( + value, + `Check "${key}" key on map has expected value after MAP_SET ops`, + ); + }); + }, + }, + + { + description: 'can apply MAP_REMOVE state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; const mapKey = 'map'; @@ -772,7 +932,76 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'COUNTER_CREATE', + description: + 'MAP_REMOVE state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // create new map and set it on a root with forged timeserials + const mapId = liveObjectsHelper.fakeMapObjectId(); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'bbb@1-0', + state: [ + liveObjectsHelper.mapCreateOp({ + objectId: mapId, + entries: { + foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo3: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo5: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo6: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + }, + }), + ], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'aaa@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + + // inject operations with various timeserial values + for (const [i, serial] of [ + 'bbb@0-0', // existing site, earlier site CGO, not applied + 'bbb@1-0', // existing site, same site CGO, not applied + 'bbb@2-0', // existing site, later site CGO, applied, site timeserials updated + 'bbb@2-0', // existing site, same site CGO (updated from last op), not applied + 'aaa@0-0', // different site, earlier entry CGO, not applied + 'ccc@9-0', // different site, later entry CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.mapRemoveOp({ objectId: mapId, key: `foo${i + 1}` })], + }); + } + + // check only operations with correct timeserials were applied + const expectedMapKeys = [ + { key: 'foo1', exists: true }, + { key: 'foo2', exists: true }, + { key: 'foo3', exists: false }, // removed + { key: 'foo4', exists: true }, + { key: 'foo5', exists: true }, + { key: 'foo6', exists: false }, // removed + ]; + + expectedMapKeys.forEach(({ key, exists }) => { + if (exists) { + expect(root.get('map').get(key), `Check "${key}" key on map still exists after MAP_REMOVE ops`).to + .exist; + } else { + expect(root.get('map').get(key), `Check "${key}" key on map does not exist after MAP_REMOVE ops`).to.not + .exist; + } + }); + }, + }, + + { + description: 'can apply COUNTER_CREATE state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; @@ -822,7 +1051,74 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'COUNTER_INC', + description: + 'COUNTER_CREATE state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // need to use multiple counters as COUNTER_CREATE op can only be applied once to a counter object + const counterIds = [ + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + ]; + await Promise.all( + counterIds.map(async (counterId, i) => { + // send a COUNTER_INC op first to create a zero-value counter with forged site timeserials vector (from the op), and set it on a root. + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'bbb@1-0', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: `aaa@${i}-0`, + state: [ + liveObjectsHelper.mapSetOp({ objectId: 'root', key: counterId, data: { objectId: counterId } }), + ], + }); + }), + ); + + // inject operations with various timeserial values + for (const [i, serial] of [ + 'bbb@0-0', // existing site, earlier CGO, not applied + 'bbb@1-0', // existing site, same CGO, not applied + 'bbb@2-0', // existing site, later CGO, applied + 'aaa@0-0', // different site, earlier CGO, applied + 'ccc@9-0', // different site, later CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.counterCreateOp({ objectId: counterIds[i], count: 10 })], + }); + } + + // check only operations with correct timeserials were applied + const expectedCounterValues = [ + 1, + 1, + 11, // applied COUNTER_CREATE + 11, // applied COUNTER_CREATE + 11, // applied COUNTER_CREATE + ]; + + for (const [i, counterId] of counterIds.entries()) { + const expectedValue = expectedCounterValues[i]; + + expect(root.get(counterId).value()).to.equal( + expectedValue, + `Check counter #${i + 1} has expected value after COUNTER_CREATE ops`, + ); + } + }, + }, + + { + description: 'can apply COUNTER_INC state operation messages', action: async (ctx) => { const { root, liveObjectsHelper, channelName } = ctx; const counterKey = 'counter'; @@ -879,31 +1175,70 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } }, }, - ]; - for (const scenario of applyOperationsScenarios) { - if (scenario.skip === true) { - continue; - } + { + description: + 'COUNTER_INC state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + // create new counter and set it on a root with forged timeserials + const counterId = liveObjectsHelper.fakeCounterObjectId(); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'bbb@1-0', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId, count: 1 })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: 'aaa@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + // inject operations with various timeserial values + for (const [i, serial] of [ + 'bbb@0-0', // +10 existing site, earlier CGO, not applied + 'bbb@1-0', // +100 existing site, same CGO, not applied + 'bbb@2-0', // +1000 existing site, later CGO, applied, site timeserials updated + 'bbb@2-0', // +10000 existing site, same CGO (updated from last op), not applied + 'aaa@0-0', // +100000 different site, earlier CGO, applied + 'ccc@9-0', // +1000000 different site, later CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: Math.pow(10, i + 1) })], + }); + } + + // check only operations with correct timeserials were applied + expect(root.get('counter').value()).to.equal( + 1 + 1000 + 100000 + 1000000, // sum of passing operations and the initial value + `Check counter has expected value after COUNTER_INC ops`, + ); + }, + }, + ]; + + forScenarios(applyOperationsScenarios, (scenario) => /** @nospec */ - it(`can apply ${scenario.description} state operation messages`, async function () { + it(scenario.description, async function () { const helper = this.test.helper; const liveObjectsHelper = new LiveObjectsHelper(helper); const client = RealtimeWithLiveObjects(helper); await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = `channel_can_apply_${scenario.description}`; + const channelName = scenario.description; const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); const liveObjects = channel.liveObjects; await channel.attach(); const root = await liveObjects.getRoot(); - await scenario.action({ root, liveObjectsHelper, channelName }); + await scenario.action({ root, liveObjectsHelper, channelName, channel }); }, client); - }); - } + }), + ); const applyOperationsDuringSyncScenarios = [ { @@ -949,10 +1284,10 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject operations, they should be applied when sync ends await Promise.all( - primitiveKeyData.map((keyData) => + primitiveKeyData.map((keyData, i) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: '@0-0', + serial: `aaa@${i}-0`, state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1039,7 +1374,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, { - description: 'buffered state operation messages are applied based on regional timeserial of the object', + description: + 'buffered state operation messages are applied based on the site timeserials vector of the object', action: async (ctx) => { const { root, liveObjectsHelper, channel } = ctx; @@ -1049,64 +1385,81 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await liveObjectsHelper.processStateObjectMessageOnChannel({ channel, syncSerial: 'serial:cursor', - // add state object messages with non-zero regional timeserials + // add state object messages with non-empty site timeserials state: [ - liveObjectsHelper.mapObject({ - objectId: 'root', - regionalTimeserial: '@1-0', - entries: { - map: { timeserial: '@0-0', data: { objectId: mapId } }, - counter: { timeserial: '@0-0', data: { objectId: counterId } }, - }, - }), + // next map and counter objects will be checked to have correct operations applied on them based on site timeserials liveObjectsHelper.mapObject({ objectId: mapId, - regionalTimeserial: '@1-0', + siteTimeserials: { + bbb: 'bbb@2-0', + ccc: 'ccc@5-0', + }, + materialisedEntries: { + foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo3: { timeserial: 'ccc@5-0', data: { value: 'bar' } }, + foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo5: { timeserial: 'bbb@2-0', data: { value: 'bar' } }, + foo6: { timeserial: 'ccc@2-0', data: { value: 'bar' } }, + foo7: { timeserial: 'ccc@0-0', data: { value: 'bar' } }, + foo8: { timeserial: 'ccc@0-0', data: { value: 'bar' } }, + }, }), liveObjectsHelper.counterObject({ objectId: counterId, - regionalTimeserial: '@1-0', + siteTimeserials: { + bbb: 'bbb@1-0', + }, + initialCount: 1, + }), + // add objects to the root so they're discoverable in the state tree + liveObjectsHelper.mapObject({ + objectId: 'root', + siteTimeserials: { '000': '000@0-0' }, + initialEntries: { + map: { timeserial: '000@0-0', data: { objectId: mapId } }, + counter: { timeserial: '000@0-0', data: { objectId: counterId } }, + }, }), ], }); - // inject operations with older or equal regional timeserial, expect them not to be applied when sync ends - await Promise.all( - ['@0-0', '@1-0'].map(async (serial) => { - await Promise.all( - ['root', mapId].flatMap((objectId) => - primitiveKeyData.map((keyData) => - liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial, - state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })], - }), - ), - ), - ); - await liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial, - state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], - }); - }), - ); + // inject operations with various timeserial values + // Map: + for (const [i, serial] of [ + 'bbb@1-0', // existing site, earlier site CGO, not applied + 'bbb@2-0', // existing site, same site CGO, not applied + 'bbb@3-0', // existing site, later site CGO, earlier entry CGO, not applied but site timeserial updated + // message with later site CGO, same entry CGO case is not possible, as timeserial from entry would be set for the corresponding site code or be less than that + 'bbb@3-0', // existing site, same site CGO (updated from last op), later entry CGO, not applied + 'bbb@4-0', // existing site, later site CGO, later entry CGO, applied + 'aaa@1-0', // different site, earlier entry CGO, not applied but site timeserial updated + 'aaa@1-0', // different site, same site CGO (updated from last op), later entry CGO, not applied + // different site with matching entry CGO case is not possible, as matching entry timeserial means that that timeserial is in the site timeserials vector + 'ddd@1-0', // different site, later entry CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: `foo${i + 1}`, data: { value: 'baz' } })], + }); + } - // inject operations with greater regional timeserial, expect them to be applied when sync ends - await Promise.all( - ['root', mapId].map((objectId) => - liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial: '@2-0', - state: [liveObjectsHelper.mapSetOp({ objectId, key: 'foo', data: { value: 'bar' } })], - }), - ), - ); - await liveObjectsHelper.processStateOperationMessageOnChannel({ - channel, - serial: '@2-0', - state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], - }); + // Counter: + for (const [i, serial] of [ + 'bbb@0-0', // +10 existing site, earlier CGO, not applied + 'bbb@1-0', // +100 existing site, same CGO, not applied + 'bbb@2-0', // +1000 existing site, later CGO, applied, site timeserials updated + 'bbb@2-0', // +10000 existing site, same CGO (updated from last op), not applied + 'aaa@0-0', // +100000 different site, earlier CGO, applied + 'ccc@9-0', // +1000000 different site, later CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: Math.pow(10, i + 1) })], + }); + } // end sync await liveObjectsHelper.processStateObjectMessageOnChannel({ @@ -1114,33 +1467,28 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], syncSerial: 'serial:', }); - // check operations with older or equal regional timeserial are not applied - // counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations - primitiveKeyData.forEach((keyData) => { - expect( - root.get(keyData.key), - `Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`, - ).to.not.exist; - }); - primitiveKeyData.forEach((keyData) => { - expect( - root.get('map').get(keyData.key), - `Check "${keyData.key}" key doesn't exist on inner map when STATE_SYNC has ended`, - ).to.not.exist; + // check only operations with correct timeserials were applied + const expectedMapKeys = [ + { key: 'foo1', value: 'bar' }, + { key: 'foo2', value: 'bar' }, + { key: 'foo3', value: 'bar' }, + { key: 'foo4', value: 'bar' }, + { key: 'foo5', value: 'baz' }, // updated + { key: 'foo6', value: 'bar' }, + { key: 'foo7', value: 'bar' }, + { key: 'foo8', value: 'baz' }, // updated + ]; + + expectedMapKeys.forEach(({ key, value }) => { + expect(root.get('map').get(key)).to.equal( + value, + `Check "${key}" key on map has expected value after STATE_SYNC has ended`, + ); }); - // check operations with greater regional timeserial are applied - expect(root.get('foo')).to.equal( - 'bar', - 'Check only data from operations with greater regional timeserial exists on root after STATE_SYNC', - ); - expect(root.get('map').get('foo')).to.equal( - 'bar', - 'Check only data from operations with greater regional timeserial exists on inner map after STATE_SYNC', - ); expect(root.get('counter').value()).to.equal( - 1, - 'Check only increment operations with greater regional timeserial were applied to counter after STATE_SYNC', + 1 + 1000 + 100000 + 1000000, // sum of passing operations and the initial value + `Check counter has expected value after STATE_SYNC has ended`, ); }, }, @@ -1159,10 +1507,10 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject operations, they should be applied when sync ends await Promise.all( - primitiveKeyData.map((keyData) => + primitiveKeyData.map((keyData, i) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: '@0-0', + serial: `aaa@${i}-0`, state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1206,11 +1554,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - for (const scenario of applyOperationsDuringSyncScenarios) { - if (scenario.skip === true) { - continue; - } - + forScenarios(applyOperationsDuringSyncScenarios, (scenario) => /** @nospec */ it(scenario.description, async function () { const helper = this.test.helper; @@ -1229,8 +1573,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await scenario.action({ root, liveObjectsHelper, channelName, channel }); }, client); - }); - } + }), + ); const subscriptionCallbacksScenarios = [ { @@ -1351,7 +1695,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], map.subscribe((update) => { try { expect(update).to.deep.equal( - { update: { stringKey: 'deleted' } }, + { update: { stringKey: 'removed' } }, 'Check map subscription callback is called with an expected update object for MAP_REMOVE operation', ); resolve(); @@ -1382,9 +1726,9 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const expectedMapUpdates = [ { update: { foo: 'updated' } }, { update: { bar: 'updated' } }, - { update: { foo: 'deleted' } }, + { update: { foo: 'removed' } }, { update: { baz: 'updated' } }, - { update: { bar: 'deleted' } }, + { update: { bar: 'removed' } }, ]; let currentUpdateIndex = 0; @@ -1699,11 +2043,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - for (const scenario of subscriptionCallbacksScenarios) { - if (scenario.skip === true) { - continue; - } - + forScenarios(subscriptionCallbacksScenarios, (scenario) => /** @nospec */ it(scenario.description, async function () { const helper = this.test.helper; @@ -1744,8 +2084,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], sampleCounterObjectId, }); }, client); - }); - } + }), + ); }); /** @nospec */