Skip to content

Commit

Permalink
core/lifecycle: properly set operation in progress flag
Browse files Browse the repository at this point in the history
There's a lot of places where we don't set the operation in progress flag - e.g. after detach.

This change fixes this.
  • Loading branch information
AndyTWF committed Nov 25, 2024
1 parent a7ef4a9 commit 5ae54bd
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 92 deletions.
65 changes: 46 additions & 19 deletions src/core/room-lifecycle-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ export class RoomLifecycleManager {
this._transientDetachTimeouts = new Map();
this._lifecycle = lifecycle;

// This shouldn't be the case except in testing, but if we're already attached, then we should consider
// ourselves not in the middle of an operation and thus consider channel events.
if (this._lifecycle.status !== RoomStatus.Attached) {
this._operationInProgress = true;
}

this._setupContributorListeners(transientDetachTimeout);
}

Expand Down Expand Up @@ -229,7 +223,7 @@ export class RoomLifecycleManager {
channel: contributor.channel.name,
});
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._lifecycle.setStatus({
status: RoomStatus.Failed,
error: change.reason,
Expand Down Expand Up @@ -321,7 +315,7 @@ export class RoomLifecycleManager {
// We freeze our state, so that individual channel state changes do not affect the room status
// We also set our room state to the state of the contributor
// We clear all the transient detach timeouts, because we're closing all the channels
this._operationInProgress = true;
this._startLifecycleOperation();
this._clearAllTransientDetachTimeouts();

// We enter the protected block with priority Internal, so take precedence over user-driven actions
Expand Down Expand Up @@ -361,6 +355,7 @@ export class RoomLifecycleManager {
*/
private async _doRetry(contributor: ContributesToRoomLifecycle): Promise<void> {
// A helper that allows us to retry the attach operation
// eslint-disable-next-line unicorn/consistent-function-scoping
const doAttachWithRetry = () => {
this._logger.debug('RoomLifecycleManager.doAttachWithRetry();');
this._lifecycle.setStatus({ status: RoomStatus.Attaching });
Expand All @@ -374,13 +369,16 @@ export class RoomLifecycleManager {
// If we're in failed, then we should wind down all the channels, eventually - but we're done here
if (result.status === RoomStatus.Failed) {
void this._mtx.runExclusive(
() => this._runDownChannelsOnFailedAttach(),
() =>
this._runDownChannelsOnFailedAttach().finally(() => {
this._endLifecycleOperation();
}),
LifecycleOperationPrecedence.Internal,
);
return;
}

// If we're in suspended, then we should wait for the channel to reattach, but wait for it to do so
// If we're in suspended, then we should wait for the channel to reattach and then try again
if (result.status === RoomStatus.Suspended) {
const failedFeature = result.failedFeature;
if (!failedFeature) {
Expand All @@ -393,7 +391,8 @@ export class RoomLifecycleManager {
return this._doRetry(failedFeature).catch();
}

// We attached, huzzah!
// We attached, huzzah! It's the end of the loop
this._endLifecycleOperation();
});
};

Expand All @@ -404,6 +403,9 @@ export class RoomLifecycleManager {
try {
await this._doChannelWindDown(contributor).catch(() => {
// If in doing the wind down, we've entered failed state, then it's game over anyway
// TODO: Another PR, but in the even if we get a failed channel, we still need to do the wind down
// of other channels for atomicity.
// https://github.com/ably/ably-chat-js/issues/416
if (this._lifecycle.status === RoomStatus.Failed) {
throw new Error('room is in a failed state');
}
Expand All @@ -417,27 +419,32 @@ export class RoomLifecycleManager {
});
} catch {
// If an error gets through here, then the room has entered the failed state, we're done.
this._endLifecycleOperation();
return;
}

// If our problem channel has reattached, then we can retry the attach
if (contributor.channel.state === RoomStatus.Attached) {
if (contributor.channel.state === 'attached') {
this._logger.debug('RoomLifecycleManager._doRetry(); feature reattached, retrying attach');
return doAttachWithRetry();
}

// Otherwise, wait for our problem channel to re-attach and try again
return new Promise<void>((resolve) => {
const listener = (change: Ably.ChannelStateChange) => {
if (change.current === RoomStatus.Attached) {
if (change.current === 'attached') {
contributor.channel.off(listener);
resolve();
return;
}

if (change.current === RoomStatus.Failed) {
if (change.current === 'failed') {
contributor.channel.off(listener);
this._lifecycle.setStatus({ status: RoomStatus.Failed, error: change.reason });

// Its ok to just set operation in progress = false and return here
// As every other channel is wound down.
this._endLifecycleOperation();
throw change.reason ?? new Ably.ErrorInfo('unknown error in _doRetry', ErrorCodes.RoomLifecycleError, 500);
}
};
Expand Down Expand Up @@ -487,15 +494,15 @@ export class RoomLifecycleManager {

// At this point, we force the room status to be attaching
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._lifecycle.setStatus({ status: RoomStatus.Attaching });

return this._doAttach().then((result: RoomAttachmentResult) => {
// If we're in a failed state, then we should wind down all the channels, eventually
if (result.status === RoomStatus.Failed) {
this._logger.debug('RoomLifecycleManager.attach(); room entered failed, winding down channels', { result });
void this._mtx.runExclusive(
() => this._runDownChannelsOnFailedAttach(),
() => this._runDownChannelsOnFailedAttach().finally(() => (this._operationInProgress = false)),
LifecycleOperationPrecedence.Internal,
);

Expand Down Expand Up @@ -589,7 +596,7 @@ export class RoomLifecycleManager {

// We successfully attached all the channels - set our status to attached, start listening changes in channel status
this._lifecycle.setStatus(attachResult);
this._operationInProgress = false;
this._endLifecycleOperation();

// Iterate the pending discontinuity events and trigger them
for (const [contributor, error] of this._pendingDiscontinuityEvents) {
Expand Down Expand Up @@ -713,7 +720,7 @@ export class RoomLifecycleManager {
}

// We force the room status to be detaching
this._operationInProgress = true;
this._startLifecycleOperation();
this._clearAllTransientDetachTimeouts();
this._lifecycle.setStatus({ status: RoomStatus.Detaching });

Expand Down Expand Up @@ -763,6 +770,9 @@ export class RoomLifecycleManager {
done = true;
}

// The process is finished, so set operationInProgress to false
this._endLifecycleOperation();

// If we aren't in the failed state, then we're detached
if (this._lifecycle.status !== RoomStatus.Failed) {
this._lifecycle.setStatus({ status: RoomStatus.Detached });
Expand Down Expand Up @@ -819,7 +829,7 @@ export class RoomLifecycleManager {

// We force the room status to be releasing
this._clearAllTransientDetachTimeouts();
this._operationInProgress = true;
this._startLifecycleOperation();
this._releaseInProgress = true;
this._lifecycle.setStatus({ status: RoomStatus.Releasing });

Expand Down Expand Up @@ -888,7 +898,24 @@ export class RoomLifecycleManager {
}),
).then(() => {
this._releaseInProgress = false;
this._endLifecycleOperation();
this._lifecycle.setStatus({ status: RoomStatus.Released });
});
}

/**
* Starts the room lifecycle operation.
*/
private _startLifecycleOperation(): void {
this._logger.debug('RoomLifecycleManager._startLifecycleOperation();');
this._operationInProgress = true;
}

/**
* Ends the room lifecycle operation.
*/
private _endLifecycleOperation(): void {
this._logger.debug('RoomLifecycleManager._endLifecycleOperation();');
this._operationInProgress = false;
}
}
107 changes: 34 additions & 73 deletions test/core/room-lifecycle-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,11 @@ describe('room lifecycle manager', () => {
5,
);

// Start the attach
// Start the attach and wait until the first channel attach is called to ensure the op is in-progress
const attachPromise = monitor.attach();
await vi.waitFor(() => {
expect(context.firstContributor.channel.attach).toHaveBeenCalled();
});

// Simulate a channel state change on the second channel
context.secondContributor.emulateStateChange({
Expand Down Expand Up @@ -1205,8 +1208,11 @@ describe('room lifecycle manager', () => {
5,
);

// Start the detach
// Start the detach and wait for the first channel detach to be called to ensure the op is in-progress
const detachPromise = monitor.detach();
await vi.waitFor(() => {
expect(context.firstContributor.channel.detach).toHaveBeenCalled();
});

// Simulate a channel state change on the second channel
context.secondContributor.emulateStateChange({
Expand Down Expand Up @@ -2162,7 +2168,9 @@ describe('room lifecycle manager', () => {
expect(context.firstContributor.discontinuityDetected).toBeCalledWith(baseError);
});

it<TestContext>('registers a discontinuity after re-attachment if room is detached at the time', async (context) => {
it<TestContext>('should prefer the first discontinuity event if multiple are received', async (context) => {
vi.useFakeTimers();

// Force our status and contributors into attached
const status = new DefaultRoomLifecycle('roomId', makeTestLogger());
context.firstContributor.emulateStateChange({
Expand Down Expand Up @@ -2193,87 +2201,35 @@ describe('room lifecycle manager', () => {
);

// Send the monitor through the attach cycle

mockChannelAttachSuccess(context.firstContributor.channel);
mockChannelAttachSuccess(context.secondContributor.channel);
mockChannelAttachSuccess(context.thirdContributor.channel);

await monitor.attach();

// Send the monitor through the detach cycle
mockChannelDetachSuccess(context.firstContributor.channel);
// Send the monitor through the detach cycle - the first contributor wont detach until a promise resolves
vi.spyOn(context.firstContributor.channel, 'detach').mockImplementation(() => {
return new Promise((resolve) => {
setTimeout(() => {
vi.spyOn(context.firstContributor.channel, 'state', 'get').mockReturnValue(AblyChannelState.Detached);
vi.spyOn(context.firstContributor.channel, 'errorReason', 'get').mockReturnValue(baseError);
resolve();
}, 1000);
});
});
mockChannelDetachSuccess(context.secondContributor.channel);
mockChannelDetachSuccess(context.thirdContributor.channel);

await monitor.detach();
void monitor.detach();

// Emit an update / discontinuity event on the first contributor during the detached state for whatever reason
context.firstContributor.emulateStateChange(
{
current: AblyChannelState.Attached,
previous: AblyChannelState.Attached,
resumed: false,
reason: baseError,
},
true,
);

// We shouldn't have registered a discontinuity event yet
expect(status.status).toEqual(RoomStatus.Detached);
expect(context.firstContributor.discontinuityDetected).not.toHaveBeenCalled();

// Now re-attach the room
await monitor.attach();

// Our first contributor should have registered a discontinuity event now
expect(status.status).toEqual(RoomStatus.Attached);
expect(context.firstContributor.discontinuityDetected).toBeCalledWith(baseError);
});

it<TestContext>('should prefer the first discontinuity event if multiple are received', async (context) => {
// Force our status and contributors into attached
const status = new DefaultRoomLifecycle('roomId', makeTestLogger());
context.firstContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError,
});
context.secondContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError2,
});
context.thirdContributor.emulateStateChange({
current: AblyChannelState.Attached,
previous: 'initialized',
resumed: false,
reason: baseError,
// Wait for the first contributor to have detach called
await vi.waitFor(() => {
expect(context.firstContributor.channel.detach).toHaveBeenCalled();
});
status.setStatus({ status: RoomStatus.Initialized });

const monitor = new RoomLifecycleManager(
status,
[context.firstContributor, context.secondContributor, context.thirdContributor],
makeTestLogger(),
5,
);

// Send the monitor through the attach cycle
mockChannelAttachSuccess(context.firstContributor.channel);
mockChannelAttachSuccess(context.secondContributor.channel);
mockChannelAttachSuccess(context.thirdContributor.channel);

await monitor.attach();

// Send the monitor through the detach cycle
mockChannelDetachSuccess(context.firstContributor.channel);
mockChannelDetachSuccess(context.secondContributor.channel);
mockChannelDetachSuccess(context.thirdContributor.channel);

await monitor.detach();

// Emit an update / discontinuity event on the first contributor during the detached state for whatever reason
// Emit an update / discontinuity event on the first contributor - naturally this wouldn't happen
// during this phase, but its a good way to emulate
const error1 = new Ably.ErrorInfo('first', 1, 1);
context.firstContributor.emulateStateChange(
{
Expand All @@ -2296,8 +2252,13 @@ describe('room lifecycle manager', () => {
true,
);

// Advance timers so the channel goes ahead and detaches.
vi.advanceTimersByTime(1000);

// We shouldn't have registered a discontinuity event yet
expect(status.status).toEqual(RoomStatus.Detached);
await vi.waitFor(() => {
expect(status.status).toEqual(RoomStatus.Detached);
});
expect(context.firstContributor.discontinuityDetected).not.toHaveBeenCalled();

// Now re-attach the room
Expand Down

0 comments on commit 5ae54bd

Please sign in to comment.