Skip to content

Commit

Permalink
Make injecting state messages in LiveObjects tests simpler
Browse files Browse the repository at this point in the history
This refactoring is needed for upcoming tests for buffering of state
operation messages.
  • Loading branch information
VeskeR committed Nov 8, 2024
1 parent ad0311e commit a5f8667
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 81 deletions.
96 changes: 95 additions & 1 deletion test/common/modules/live_objects_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
/**
* LiveObjects helper to create pre-determined state tree on channels
*/
define(['shared_helper'], function (Helper) {
define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveObjectsPlugin) {
const createPM = Ably.makeProtocolMessageFromDeserialized({ LiveObjectsPlugin });

const ACTIONS = {
MAP_CREATE: 0,
MAP_SET: 1,
Expand All @@ -18,6 +20,7 @@ define(['shared_helper'], function (Helper) {

class LiveObjectsHelper {
constructor(helper) {
this._helper = helper;
this._rest = helper.AblyRest({ useBinaryProtocol: false });
}

Expand Down Expand Up @@ -171,6 +174,97 @@ define(['shared_helper'], function (Helper) {
return op;
}

mapObject(opts) {
const { objectId, regionalTimeserial, entries } = opts;
const obj = {
object: {
objectId,
regionalTimeserial,
map: { entries },
},
};

return obj;
}

counterObject(opts) {
const { objectId, regionalTimeserial, count } = opts;
const obj = {
object: {
objectId,
regionalTimeserial,
counter: {
created: true,
count,
},
},
};

return obj;
}

stateOperationMessage(opts) {
const { channelName, serial, state } = opts;

state?.forEach((x, i) => (x.serial = `${serial}:${i}`));

return {
action: 19, // STATE
channel: channelName,
channelSerial: serial,
state: state ?? [],
};
}

stateObjectMessage(opts) {
const { channelName, syncSerial, state } = opts;

return {
action: 20, // STATE_SYNC
channel: channelName,
channelSerial: syncSerial,
state: state ?? [],
};
}

async processStateOperationMessageOnChannel(opts) {
const { channel, ...rest } = opts;

this._helper.recordPrivateApi('call.channel.processMessage');
this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await channel.processMessage(
createPM(
this.stateOperationMessage({
...rest,
channelName: channel.name,
}),
),
);
}

async processStateObjectMessageOnChannel(opts) {
const { channel, ...rest } = opts;

this._helper.recordPrivateApi('call.channel.processMessage');
this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await channel.processMessage(
createPM(
this.stateObjectMessage({
...rest,
channelName: channel.name,
}),
),
);
}

fakeMapObjectId() {
return `map:${Helper.randomString()}`;
}

fakeCounterObjectId() {
return `counter:${Helper.randomString()}`;
}

async stateRequest(channelName, opBody) {
if (Array.isArray(opBody)) {
throw new Error(`Only single object state requests are supported`);
Expand Down
116 changes: 36 additions & 80 deletions test/realtime/live_objects.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
/** @nospec */
it(`doesn't break when it receives a STATE ProtocolMessage`, async function () {
const helper = this.test.helper;
const liveObjectsHelper = new LiveObjectsHelper(helper);
const testClient = helper.AblyRealtime();

await helper.monitorConnectionThenCloseAndFinish(async () => {
Expand All @@ -73,25 +74,13 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],

await helper.monitorConnectionThenCloseAndFinish(async () => {
// inject STATE message that should be ignored and not break anything without LiveObjects plugin
helper.recordPrivateApi('call.channel.processMessage');
helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await testChannel.processMessage(
createPM({
action: 19,
channel: 'channel',
channelSerial: 'serial:',
state: [
{
operation: {
action: 1,
objectId: 'root',
mapOp: { key: 'stringKey', data: { value: 'stringValue' } },
},
serial: 'a@0-0',
},
],
}),
);
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel: testChannel,
serial: '@0-0',
state: [
liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'stringKey', data: { value: 'stringValue' } }),
],
});

const publishChannel = publishClient.channels.get('channel');
await publishChannel.publish(null, 'test');
Expand All @@ -105,6 +94,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
/** @nospec */
it(`doesn't break when it receives a STATE_SYNC ProtocolMessage`, async function () {
const helper = this.test.helper;
const liveObjectsHelper = new LiveObjectsHelper(helper);
const testClient = helper.AblyRealtime();

await helper.monitorConnectionThenCloseAndFinish(async () => {
Expand All @@ -117,24 +107,11 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],

await helper.monitorConnectionThenCloseAndFinish(async () => {
// inject STATE_SYNC message that should be ignored and not break anything without LiveObjects plugin
helper.recordPrivateApi('call.channel.processMessage');
helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await testChannel.processMessage(
createPM({
action: 20,
channel: 'channel',
channelSerial: 'serial:',
state: [
{
object: {
objectId: 'root',
regionalTimeserial: 'a@0-0',
map: {},
},
},
],
}),
);
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel: testChannel,
syncSerial: 'serial:',
state: [liveObjectsHelper.mapObject({ objectId: 'root', regionalTimeserial: '@0-0' })],
});

const publishChannel = publishClient.channels.get('channel');
await publishChannel.publish(null, 'test');
Expand Down Expand Up @@ -261,6 +238,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
/** @nospec */
it('getRoot() waits for subsequent STATE_SYNC to finish before resolving', async function () {
const helper = this.test.helper;
const liveObjectsHelper = new LiveObjectsHelper(helper);
const client = RealtimeWithLiveObjects(helper);

await helper.monitorConnectionThenCloseAndFinish(async () => {
Expand All @@ -272,23 +250,17 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
await liveObjects.getRoot();

// inject STATE_SYNC message to emulate start of a new sequence
helper.recordPrivateApi('call.channel.processMessage');
helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await channel.processMessage(
createPM({
action: 20,
channel: 'channel',
// have cursor so client awaits for additional STATE_SYNC messages
channelSerial: 'serial:cursor',
state: [],
}),
);
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
// have cursor so client awaits for additional STATE_SYNC messages
syncSerial: 'serial:cursor',
});

let getRootResolved = false;
let newRoot;
let root;
liveObjects.getRoot().then((value) => {
getRootResolved = true;
newRoot = value;
root = value;
});

// wait for next tick to check that getRoot() promise handler didn't proc
Expand All @@ -297,42 +269,26 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],

expect(getRootResolved, 'Check getRoot() is not resolved while STATE_SYNC is in progress').to.be.false;

// inject next STATE_SYNC message
helper.recordPrivateApi('call.channel.processMessage');
helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
await channel.processMessage(
createPM({
action: 20,
channel: 'channel',
// no cursor to indicate the end of STATE_SYNC messages
channelSerial: 'serial:',
state: [
{
object: {
objectId: 'root',
regionalTimeserial: 'a@0-0',
map: {
entries: {
key: {
timeserial: 'a@0-0',
data: {
value: 1,
},
},
},
},
},
},
],
}),
);
// inject final STATE_SYNC message
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
// no cursor to indicate the end of STATE_SYNC messages
syncSerial: 'serial:',
state: [
liveObjectsHelper.mapObject({
objectId: 'root',
regionalTimeserial: '@0-0',
entries: { key: { timeserial: '@0-0', data: { value: 1 } } },
}),
],
});

// wait for next tick for getRoot() handler to process
helper.recordPrivateApi('call.Platform.nextTick');
await new Promise((res) => nextTick(res));

expect(getRootResolved, 'Check getRoot() is resolved when STATE_SYNC sequence has ended').to.be.true;
expect(newRoot.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key');
expect(root.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key');
}, client);
});

Expand Down

0 comments on commit a5f8667

Please sign in to comment.