Skip to content

Commit

Permalink
GC tombstoned map entries for LiveMap and objects in the global pool
Browse files Browse the repository at this point in the history
Resolves DTP-1024
  • Loading branch information
VeskeR committed Dec 11, 2024
1 parent f370f62 commit e99520c
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/plugins/liveobjects/defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const DEFAULTS = {
gcInterval: 1000 * 60 * 5, // 5 minutes
/**
* Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation
* with an earlier origin timeserial that would not have been applied if the tombstone still existed.
*
* Applies both for map entries tombstones and object tombstones.
*/
gcGracePeriod: 1000 * 60 * 2.5, // 2.5 minutes
};
8 changes: 8 additions & 0 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

/**
* @internal
*/
onGCInterval(): void {
// nothing to GC for a counter object
return;
}

protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}
Expand Down
30 changes: 28 additions & 2 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import deepEqual from 'deep-equal';

import type * as API from '../../../ably';
import { DEFAULTS } from './defaults';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import {
Expand Down Expand Up @@ -33,6 +34,10 @@ export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
tombstone: boolean;
/**
* Can't use timeserial from the operation that deleted the entry for the same reason as for {@link LiveObject} tombstones, see explanation there.
*/
tombstonedAt: number | undefined;
timeserial: string | undefined;
data: StateData | undefined;
}
Expand Down Expand Up @@ -291,6 +296,22 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

/**
* @internal
*/
onGCInterval(): void {
// should remove any tombstoned entries from the underlying map data that have exceeded the GC grace period

const keysToDelete: string[] = [];
for (const [key, value] of this._dataRef.data.entries()) {
if (value.tombstone === true && Date.now() - value.tombstonedAt! >= DEFAULTS.gcGracePeriod) {
keysToDelete.push(key);
}
}

keysToDelete.forEach((x) => this._dataRef.data.delete(x));
}

protected _getZeroValueData(): LiveMapData {
return { data: new Map<string, MapEntry>() };
}
Expand Down Expand Up @@ -455,11 +476,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

if (existingEntry) {
existingEntry.tombstone = false;
existingEntry.tombstonedAt = undefined;
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = liveData;
} else {
const newEntry: MapEntry = {
tombstone: false,
tombstonedAt: undefined,
timeserial: opOriginTimeserial,
data: liveData,
};
Expand All @@ -486,11 +509,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

if (existingEntry) {
existingEntry.tombstone = true;
existingEntry.tombstonedAt = Date.now();
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = undefined;
} else {
const newEntry: MapEntry = {
tombstone: true,
tombstonedAt: Date.now(),
timeserial: opOriginTimeserial,
data: undefined,
};
Expand Down Expand Up @@ -544,9 +569,10 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

const liveDataEntry: MapEntry = {
timeserial: entry.timeserial,
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
// consider object as tombstoned only if we received an explicit flag stating that. otherwise it exists
tombstone: entry.tombstone === true,
tombstonedAt: entry.tombstone === true ? Date.now() : undefined,
};

liveMapData.data.set(key, liveDataEntry);
Expand Down
21 changes: 21 additions & 0 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ export abstract class LiveObject<
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
private _tombstone: boolean;
/**
* Even though the `timeserial` from the operation that deleted the object contains the timestamp value,
* the `timeserial` should be treated as an opaque string on the client, meaning we should not attempt to parse it.
*
* Therefore, we need to set our own timestamp when the object is deleted client-side. Strictly speaking, this is
* slightly less precise, as we will GC the object later than the server, but it is an acceptable compromise.
*/
private _tombstonedAt: number | undefined;

protected constructor(
protected _liveObjects: LiveObjects,
Expand Down Expand Up @@ -108,6 +116,7 @@ export abstract class LiveObject<
*/
tombstone(): void {
this._tombstone = true;
this._tombstonedAt = Date.now();
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
}
Expand All @@ -119,6 +128,13 @@ export abstract class LiveObject<
return this._tombstone;
}

/**
* @internal
*/
tombstonedAt(): number | undefined {
return this._tombstonedAt;
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand Down Expand Up @@ -168,6 +184,11 @@ export abstract class LiveObject<
* @internal
*/
abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop;
/**
* @internal
*/
abstract onGCInterval(): void;

protected abstract _getZeroValueData(): TData;
/**
* Calculate the update object based on the current Live Object data and incoming new data.
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { DEFAULTS } from './defaults';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
Expand All @@ -26,6 +27,9 @@ export class LiveObjects {
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: StateMessage[];

// Used by tests
static _DEFAULTS = DEFAULTS;

constructor(channel: RealtimeChannel) {
this._channel = channel;
this._client = channel.client;
Expand Down
24 changes: 24 additions & 0 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type BaseClient from 'common/lib/client/baseclient';
import { DEFAULTS } from './defaults';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
Expand All @@ -13,10 +14,16 @@ export const ROOT_OBJECT_ID = 'root';
export class LiveObjectsPool {
private _client: BaseClient;
private _pool: Map<string, LiveObject>;
private _gcInterval: ReturnType<typeof setInterval>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._pool = this._getInitialPool();
this._gcInterval = setInterval(() => {
this._onGCInterval();
}, DEFAULTS.gcInterval);
// call nodejs's Timeout.unref to not require Node.js event loop to remain active due to this interval. see https://nodejs.org/api/timers.html#timeoutunref
this._gcInterval.unref?.();
}

get(objectId: string): LiveObject | undefined {
Expand Down Expand Up @@ -68,4 +75,21 @@ export class LiveObjectsPool {
pool.set(root.getObjectId(), root);
return pool;
}

private _onGCInterval(): void {
const toDelete: string[] = [];
for (const [objectId, obj] of this._pool.entries()) {
// tombstoned objects should be removed from the pool if they have been tombstoned for longer than grace period.
// by removing them from the local pool, LiveObjects plugin no longer keeps a reference to those objects, allowing JS's
// Garbage Collection to eventually free the memory for those objects, provided the user no longer references them either.
if (obj.isTombstoned() && Date.now() - obj.tombstonedAt()! >= DEFAULTS.gcGracePeriod) {
toDelete.push(objectId);
continue;
}

obj.onGCInterval();
}

toDelete.forEach((x) => this._pool.delete(x));
}
}
7 changes: 7 additions & 0 deletions test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Defaults.getPort',
'call.Defaults.normaliseOptions',
'call.EventEmitter.emit',
'call.LiveObject.isTombstoned',
'call.LiveObjects._liveObjectsPool._onGCInterval',
'call.LiveObjects._liveObjectsPool.get',
'call.Message.decode',
'call.Message.encode',
'call.Platform.Config.push.storage.clear',
Expand Down Expand Up @@ -72,6 +75,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'pass.clientOption.webSocketSlowTimeout',
'pass.clientOption.wsConnectivityCheckUrl', // actually ably-js public API (i.e. it’s in the TypeScript typings) but no other SDK has it. At the same time it's not entirely clear if websocket connectivity check should be considered an ably-js-specific functionality (as for other params above), so for the time being we consider it as private API
'read.Defaults.version',
'read.LiveMap._dataRef.data',
'read.EventEmitter.events',
'read.Platform.Config.push',
'read.Realtime._transports',
Expand Down Expand Up @@ -112,6 +116,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'read.transport.params.mode',
'read.transport.recvRequest.recvUri',
'read.transport.uri',
'replace.LiveObjects._liveObjectsPool._onGCInterval',
'replace.channel.attachImpl',
'replace.channel.processMessage',
'replace.channel.sendMessage',
Expand All @@ -128,6 +133,8 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'serialize.recoveryKey',
'write.Defaults.ENVIRONMENT',
'write.Defaults.wsConnectivityCheckUrl',
'write.LiveObjects._DEFAULTS.gcGracePeriod',
'write.LiveObjects._DEFAULTS.gcInterval',
'write.Platform.Config.push', // This implies using a mock implementation of the internal IPlatformPushConfig interface. Our mock (in push_channel_transport.js) then interacts with internal objects and private APIs of public objects to implement this interface; I haven’t added annotations for that private API usage, since there wasn’t an easy way to pass test context information into the mock. I think that for now we can just say that if we wanted to get rid of this private API usage, then we’d need to remove this mock entirely.
'write.auth.authOptions.requestHeaders',
'write.auth.key',
Expand Down
Loading

0 comments on commit e99520c

Please sign in to comment.