Skip to content

Commit

Permalink
Merge pull request #1924 from ably/liveobjects/isolated-create-ops-an…
Browse files Browse the repository at this point in the history
…d-vector-timeserials

[DTP-1076, DTP-1077] Handle isolated create ops and site timeserials vector on the `StateObject`
  • Loading branch information
VeskeR authored Nov 26, 2024
2 parents 928dea7 + 245b6a2 commit 3adafa5
Show file tree
Hide file tree
Showing 11 changed files with 1,004 additions and 546 deletions.
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);
this._liveObjects.handleStateMessages(stateMessages);

break;
}
Expand Down
145 changes: 92 additions & 53 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { Timeserial } from './timeserial';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand All @@ -12,52 +12,35 @@ export interface LiveCounterUpdate extends LiveObjectUpdate {
}

export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate> {
constructor(
liveObjects: LiveObjects,
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(
liveobjects: LiveObjects,
isCreated: boolean,
objectId?: string,
regionalTimeserial?: Timeserial,
): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial);
}

value(): number {
return this._dataRef.data;
static zeroValue(liveobjects: LiveObjects, objectId: string): LiveCounter {
return new LiveCounter(liveobjects, objectId);
}

/**
* Returns a {@link LiveCounter} instance based on the provided state object.
* The provided state object must hold a valid counter object data.
*
* @internal
*/
isCreated(): boolean {
return this._created;
static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveCounter {
const obj = new LiveCounter(liveobjects, stateObject.objectId);
obj.overrideWithStateObject(stateObject);
return obj;
}

/**
* @internal
*/
setCreated(created: boolean): void {
this._created = created;
value(): number {
return this._dataRef.data;
}

/**
* @internal
*/
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
applyOperation(op: StateOperation, msg: StateMessage): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand All @@ -66,10 +49,24 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
update = this._applyCounterCreate(op.counter);
update = this._applyCounterCreate(op);
break;

case StateOperationAction.COUNTER_INC:
Expand All @@ -90,19 +87,72 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
this.notifyUpdated(update);
}

/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}

if (!this._client.Utils.isNil(stateObject.createOp)) {
// it is expected that create operation can be missing in the state object, so only validate it when it exists
if (stateObject.createOp.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}

if (stateObject.createOp.action !== StateOperationAction.COUNTER_CREATE) {
throw new this._client.ErrorInfo(
`Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}

return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}

protected _updateFromDataDiff(currentDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate {
const counterDiff = newDataRef.data - currentDataRef.data;
protected _updateFromDataDiff(prevDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate {
const counterDiff = newDataRef.data - prevDataRef.data;
return { update: { inc: counterDiff } };
}

protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveCounterUpdate {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// note that it is intentional to SUM the incoming count from the create op.
// if we got here, it means that current counter instance is missing the initial value in its data reference,
// which we're going to add now.
this._dataRef.data += stateOperation.counter?.count ?? 0;
this._createOperationIsMerged = true;

return { update: { inc: stateOperation.counter?.count ?? 0 } };
}

private _throwNoPayloadError(op: StateOperation): void {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
Expand All @@ -111,32 +161,21 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

private _applyCounterCreate(op: StateCounter | undefined): LiveCounterUpdate | LiveObjectUpdateNoop {
if (this.isCreated()) {
// skip COUNTER_CREATE op if this counter is already created
private _applyCounterCreate(op: StateOperation): LiveCounterUpdate | LiveObjectUpdateNoop {
if (this._createOperationIsMerged) {
// There can't be two different create operation for the same object id, because the object id
// fully encodes that operation. This means we can safely ignore any new incoming create operations
// if we already merged it once.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter._applyCounterCreate()',
`skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`,
`skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`,
);
return { noop: true };
}

if (this._client.Utils.isNil(op)) {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation
this.setCreated(true);
return { update: { inc: 0 } };
}

// note that it is intentional to SUM the incoming count from the create op.
// if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op,
// so it is missing the initial value that we're going to add now.
this._dataRef.data += op.count ?? 0;
this.setCreated(true);

return { update: { inc: op.count ?? 0 } };
return this._mergeInitialDataFromCreateOperation(op);
}

private _applyCounterInc(op: StateCounterOp): LiveCounterUpdate {
Expand Down
Loading

0 comments on commit 3adafa5

Please sign in to comment.