Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Room lifecycle in progress flag #412

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions src/core/room-lifecycle-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ export interface ContributesToRoomLifecycle extends HandlesDiscontinuity {
/**
* The order of precedence for lifecycle operations, passed to the mutex which allows
* us to ensure that internal operations take precedence over user-driven operations.
*
* The higher the number, the higher the priority.
*/
enum LifecycleOperationPrecedence {
Internal = 0,
Internal = 2,
Release = 1,
AttachOrDetach = 2,
AttachOrDetach = 0,
}

/**
Expand Down Expand Up @@ -127,12 +129,6 @@ export class RoomLifecycleManager {
this._transientDetachTimeouts = new Map();
this._lifecycle = lifecycle;

// This shouldn't be the case except in testing, but if we're already attached, then we should consider
// ourselves not in the middle of an operation and thus consider channel events.
if (this._lifecycle.status !== RoomStatus.Attached) {
this._operationInProgress = true;
}

this._setupContributorListeners(transientDetachTimeout);
}

Expand Down Expand Up @@ -229,7 +225,7 @@ export class RoomLifecycleManager {
channel: contributor.channel.name,
});
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._lifecycle.setStatus({
status: RoomStatus.Failed,
error: change.reason,
Expand Down Expand Up @@ -321,7 +317,7 @@ export class RoomLifecycleManager {
// We freeze our state, so that individual channel state changes do not affect the room status
// We also set our room state to the state of the contributor
// We clear all the transient detach timeouts, because we're closing all the channels
this._operationInProgress = true;
this._startLifecycleOperation();
this._clearAllTransientDetachTimeouts();

// We enter the protected block with priority Internal, so take precedence over user-driven actions
Expand Down Expand Up @@ -361,6 +357,7 @@ export class RoomLifecycleManager {
*/
private async _doRetry(contributor: ContributesToRoomLifecycle): Promise<void> {
// A helper that allows us to retry the attach operation
// eslint-disable-next-line unicorn/consistent-function-scoping
const doAttachWithRetry = () => {
this._logger.debug('RoomLifecycleManager.doAttachWithRetry();');
this._lifecycle.setStatus({ status: RoomStatus.Attaching });
Expand All @@ -374,13 +371,16 @@ export class RoomLifecycleManager {
// If we're in failed, then we should wind down all the channels, eventually - but we're done here
if (result.status === RoomStatus.Failed) {
void this._mtx.runExclusive(
() => this._runDownChannelsOnFailedAttach(),
() =>
this._runDownChannelsOnFailedAttach().finally(() => {
this._endLifecycleOperation();
}),
LifecycleOperationPrecedence.Internal,
);
return;
}

// If we're in suspended, then we should wait for the channel to reattach, but wait for it to do so
// If we're in suspended, then we should wait for the channel to reattach and then try again
if (result.status === RoomStatus.Suspended) {
const failedFeature = result.failedFeature;
if (!failedFeature) {
Expand All @@ -393,7 +393,8 @@ export class RoomLifecycleManager {
return this._doRetry(failedFeature).catch();
}

// We attached, huzzah!
// We attached, huzzah! It's the end of the loop
this._endLifecycleOperation();
});
};

Expand All @@ -404,6 +405,9 @@ export class RoomLifecycleManager {
try {
await this._doChannelWindDown(contributor).catch(() => {
// If in doing the wind down, we've entered failed state, then it's game over anyway
// TODO: Another PR, but in the even if we get a failed channel, we still need to do the wind down
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
// of other channels for atomicity.
// https://github.com/ably/ably-chat-js/issues/416
if (this._lifecycle.status === RoomStatus.Failed) {
throw new Error('room is in a failed state');
}
Expand All @@ -417,27 +421,32 @@ export class RoomLifecycleManager {
});
} catch {
// If an error gets through here, then the room has entered the failed state, we're done.
this._endLifecycleOperation();
return;
}

// If our problem channel has reattached, then we can retry the attach
if (contributor.channel.state === RoomStatus.Attached) {
if (contributor.channel.state === 'attached') {
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
this._logger.debug('RoomLifecycleManager._doRetry(); feature reattached, retrying attach');
return doAttachWithRetry();
}

// Otherwise, wait for our problem channel to re-attach and try again
return new Promise<void>((resolve) => {
const listener = (change: Ably.ChannelStateChange) => {
if (change.current === RoomStatus.Attached) {
if (change.current === 'attached') {
contributor.channel.off(listener);
resolve();
return;
}

if (change.current === RoomStatus.Failed) {
if (change.current === 'failed') {
contributor.channel.off(listener);
this._lifecycle.setStatus({ status: RoomStatus.Failed, error: change.reason });

// Its ok to just set operation in progress = false and return here
// As every other channel is wound down.
this._endLifecycleOperation();
throw change.reason ?? new Ably.ErrorInfo('unknown error in _doRetry', ErrorCodes.RoomLifecycleError, 500);
}
};
Expand Down Expand Up @@ -487,15 +496,15 @@ export class RoomLifecycleManager {

// At this point, we force the room status to be attaching
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._lifecycle.setStatus({ status: RoomStatus.Attaching });

return this._doAttach().then((result: RoomAttachmentResult) => {
// If we're in a failed state, then we should wind down all the channels, eventually
if (result.status === RoomStatus.Failed) {
this._logger.debug('RoomLifecycleManager.attach(); room entered failed, winding down channels', { result });
void this._mtx.runExclusive(
() => this._runDownChannelsOnFailedAttach(),
() => this._runDownChannelsOnFailedAttach().finally(() => (this._operationInProgress = false)),
LifecycleOperationPrecedence.Internal,
);

Expand Down Expand Up @@ -589,7 +598,7 @@ export class RoomLifecycleManager {

// We successfully attached all the channels - set our status to attached, start listening changes in channel status
this._lifecycle.setStatus(attachResult);
this._operationInProgress = false;
this._endLifecycleOperation();

// Iterate the pending discontinuity events and trigger them
for (const [contributor, error] of this._pendingDiscontinuityEvents) {
Expand Down Expand Up @@ -713,7 +722,7 @@ export class RoomLifecycleManager {
}

// We force the room status to be detaching
this._operationInProgress = true;
this._startLifecycleOperation();
this._clearAllTransientDetachTimeouts();
this._lifecycle.setStatus({ status: RoomStatus.Detaching });

Expand Down Expand Up @@ -763,6 +772,9 @@ export class RoomLifecycleManager {
done = true;
}

// The process is finished, so set operationInProgress to false
this._endLifecycleOperation();

// If we aren't in the failed state, then we're detached
if (this._lifecycle.status !== RoomStatus.Failed) {
this._lifecycle.setStatus({ status: RoomStatus.Detached });
Expand Down Expand Up @@ -819,7 +831,7 @@ export class RoomLifecycleManager {

// We force the room status to be releasing
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._releaseInProgress = true;
this._lifecycle.setStatus({ status: RoomStatus.Releasing });

Expand Down Expand Up @@ -888,7 +900,24 @@ export class RoomLifecycleManager {
}),
).then(() => {
this._releaseInProgress = false;
this._endLifecycleOperation();
this._lifecycle.setStatus({ status: RoomStatus.Released });
});
}

/**
* Starts the room lifecycle operation.
*/
private _startLifecycleOperation(): void {
this._logger.debug('RoomLifecycleManager._startLifecycleOperation();');
this._operationInProgress = true;
}

/**
* Ends the room lifecycle operation.
*/
private _endLifecycleOperation(): void {
this._logger.debug('RoomLifecycleManager._endLifecycleOperation();');
this._operationInProgress = false;
}
}
107 changes: 34 additions & 73 deletions test/core/room-lifecycle-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,11 @@ describe('room lifecycle manager', () => {
5,
);

// Start the attach
// Start the attach and wait until the first channel attach is called to ensure the op is in-progress
const attachPromise = monitor.attach();
await vi.waitFor(() => {
expect(context.firstContributor.channel.attach).toHaveBeenCalled();
});

// Simulate a channel state change on the second channel
context.secondContributor.emulateStateChange({
Expand Down Expand Up @@ -1208,8 +1211,11 @@ describe('room lifecycle manager', () => {
5,
);

// Start the detach
// Start the detach and wait for the first channel detach to be called to ensure the op is in-progress
const detachPromise = monitor.detach();
await vi.waitFor(() => {
expect(context.firstContributor.channel.detach).toHaveBeenCalled();
});

// Simulate a channel state change on the second channel
context.secondContributor.emulateStateChange({
Expand Down Expand Up @@ -2273,7 +2279,9 @@ describe('room lifecycle manager', () => {
expect(context.firstContributor.discontinuityDetected).toBeCalledWith(baseError);
});

it<TestContext>('registers a discontinuity after re-attachment if room is detached at the time', async (context) => {
it<TestContext>('should prefer the first discontinuity event if multiple are received', async (context) => {
vi.useFakeTimers();

// Force our status and contributors into attached
const status = new DefaultRoomLifecycle('roomId', makeTestLogger());
context.firstContributor.emulateStateChange({
Expand Down Expand Up @@ -2304,87 +2312,35 @@ describe('room lifecycle manager', () => {
);

// Send the monitor through the attach cycle

mockChannelAttachSuccess(context.firstContributor.channel);
mockChannelAttachSuccess(context.secondContributor.channel);
mockChannelAttachSuccess(context.thirdContributor.channel);

await monitor.attach();

// Send the monitor through the detach cycle
mockChannelDetachSuccess(context.firstContributor.channel);
// Send the monitor through the detach cycle - the first contributor wont detach until a promise resolves
vi.spyOn(context.firstContributor.channel, 'detach').mockImplementation(() => {
return new Promise((resolve) => {
setTimeout(() => {
vi.spyOn(context.firstContributor.channel, 'state', 'get').mockReturnValue(AblyChannelState.Detached);
vi.spyOn(context.firstContributor.channel, 'errorReason', 'get').mockReturnValue(baseError);
resolve();
}, 1000);
});
});
mockChannelDetachSuccess(context.secondContributor.channel);
mockChannelDetachSuccess(context.thirdContributor.channel);

await monitor.detach();
void monitor.detach();

// Emit an update / discontinuity event on the first contributor during the detached state for whatever reason
context.firstContributor.emulateStateChange(
{
current: AblyChannelState.Attached,
previous: AblyChannelState.Attached,
resumed: false,
reason: baseError,
},
true,
);

// We shouldn't have registered a discontinuity event yet
expect(status.status).toEqual(RoomStatus.Detached);
expect(context.firstContributor.discontinuityDetected).not.toHaveBeenCalled();

// Now re-attach the room
await monitor.attach();

// Our first contributor should have registered a discontinuity event now
expect(status.status).toEqual(RoomStatus.Attached);
expect(context.firstContributor.discontinuityDetected).toBeCalledWith(baseError);
});

it<TestContext>('should prefer the first discontinuity event if multiple are received', async (context) => {
// Force our status and contributors into attached
const status = new DefaultRoomLifecycle('roomId', makeTestLogger());
context.firstContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError,
});
context.secondContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError2,
});
context.thirdContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError,
// Wait for the first contributor to have detach called
await vi.waitFor(() => {
expect(context.firstContributor.channel.detach).toHaveBeenCalled();
});
status.setStatus({ status: RoomStatus.Initialized });

const monitor = new RoomLifecycleManager(
status,
[context.firstContributor, context.secondContributor, context.thirdContributor],
makeTestLogger(),
5,
);

// Send the monitor through the attach cycle
mockChannelAttachSuccess(context.firstContributor.channel);
mockChannelAttachSuccess(context.secondContributor.channel);
mockChannelAttachSuccess(context.thirdContributor.channel);

await monitor.attach();

// Send the monitor through the detach cycle
mockChannelDetachSuccess(context.firstContributor.channel);
mockChannelDetachSuccess(context.secondContributor.channel);
mockChannelDetachSuccess(context.thirdContributor.channel);

await monitor.detach();

// Emit an update / discontinuity event on the first contributor during the detached state for whatever reason
// Emit an update / discontinuity event on the first contributor - naturally this wouldn't happen
// during this phase, but its a good way to emulate
const error1 = new Ably.ErrorInfo('first', 1, 1);
context.firstContributor.emulateStateChange(
{
Expand All @@ -2407,8 +2363,13 @@ describe('room lifecycle manager', () => {
true,
);

// Advance timers so the channel goes ahead and detaches.
vi.advanceTimersByTime(1000);

// We shouldn't have registered a discontinuity event yet
expect(status.status).toEqual(RoomStatus.Detached);
await vi.waitFor(() => {
expect(status.status).toEqual(RoomStatus.Detached);
});
expect(context.firstContributor.discontinuityDetected).not.toHaveBeenCalled();

// Now re-attach the room
Expand Down
Loading