diff --git a/src/core/channel-manager.ts b/src/core/channel-manager.ts new file mode 100644 index 00000000..32334ef0 --- /dev/null +++ b/src/core/channel-manager.ts @@ -0,0 +1,46 @@ +import * as Ably from 'ably'; + +import { Logger } from './logger.js'; +import { DEFAULT_CHANNEL_OPTIONS } from './version.js'; + +export type ChannelOptionsMerger = (options: Ably.ChannelOptions) => Ably.ChannelOptions; + +export class ChannelManager { + private readonly _realtime: Ably.Realtime; + private readonly _logger: Logger; + private readonly _registeredOptions = new Map(); + private readonly _requestedChannels = new Set(); + + constructor(realtime: Ably.Realtime, logger: Logger) { + logger.trace('ChannelManager();'); + this._realtime = realtime; + this._logger = logger; + } + + mergeOptions(channelName: string, merger: ChannelOptionsMerger): void { + this._logger.trace('ChannelManager.registerOptions();', { channelName }); + if (this._requestedChannels.has(channelName)) { + this._logger.error('channel options cannot be modified after the channel has been requested', { channelName }); + throw new Ably.ErrorInfo('channel options cannot be modified after the channel has been requested', 40000, 400); + } + + const currentOpts = this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS; + this._registeredOptions.set(channelName, merger(currentOpts)); + } + + get(channelName: string): Ably.RealtimeChannel { + this._logger.trace('ChannelManager.get();', { channelName }); + this._requestedChannels.add(channelName); + return this._realtime.channels.get( + channelName, + this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS, + ); + } + + release(channelName: string): void { + this._logger.trace('ChannelManager.release();', { channelName }); + this._requestedChannels.delete(channelName); + this._registeredOptions.delete(channelName); + this._realtime.channels.release(channelName); + } +} diff --git a/src/core/channel.ts b/src/core/channel.ts index 5b0284b0..d55440c7 100644 --- a/src/core/channel.ts +++ b/src/core/channel.ts @@ -1,19 +1,3 @@ -import * as Ably from 'ably'; - -import { DEFAULT_CHANNEL_OPTIONS } from './version.js'; - -export const getChannel = (name: string, realtime: Ably.Realtime, opts?: Ably.ChannelOptions): Ably.RealtimeChannel => { - const resolvedOptions = { - ...opts, - params: { - ...opts?.params, - ...DEFAULT_CHANNEL_OPTIONS.params, - }, - }; - - return realtime.channels.get(name, resolvedOptions); -}; - /** * Get the channel name for the chat messages channel. * @param roomId The room ID. diff --git a/src/core/messages.ts b/src/core/messages.ts index cde1f3fa..7632560e 100644 --- a/src/core/messages.ts +++ b/src/core/messages.ts @@ -1,6 +1,7 @@ import * as Ably from 'ably'; -import { getChannel, messagesChannelName } from './channel.js'; +import { messagesChannelName } from './channel.js'; +import { ChannelManager } from './channel-manager.js'; import { ChatApi } from './chat-api.js'; import { DiscontinuityEmitter, @@ -300,16 +301,16 @@ export class DefaultMessages /** * Constructs a new `DefaultMessages` instance. * @param roomId The unique identifier of the room. - * @param realtime An instance of the Ably Realtime client. + * @param channelManager An instance of the ChannelManager. * @param chatApi An instance of the ChatApi. * @param clientId The client ID of the user. * @param logger An instance of the Logger. */ - constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, clientId: string, logger: Logger) { + constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, clientId: string, logger: Logger) { super(); this._roomId = roomId; - this._channel = this._makeChannel(roomId, realtime); + this._channel = this._makeChannel(roomId, channelManager); this._chatApi = chatApi; this._clientId = clientId; @@ -320,8 +321,8 @@ export class DefaultMessages /** * Creates the realtime channel for messages. */ - private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel { - const channel = getChannel(messagesChannelName(roomId), realtime); + private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel { + const channel = channelManager.get(messagesChannelName(roomId)); addListenerToChannelWithoutAttach({ listener: this._processEvent.bind(this), diff --git a/src/core/occupancy.ts b/src/core/occupancy.ts index d533678c..9159d0e0 100644 --- a/src/core/occupancy.ts +++ b/src/core/occupancy.ts @@ -1,6 +1,7 @@ import * as Ably from 'ably'; -import { getChannel, messagesChannelName } from './channel.js'; +import { messagesChannelName } from './channel.js'; +import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js'; import { ChatApi } from './chat-api.js'; import { DiscontinuityEmitter, @@ -106,15 +107,15 @@ export class DefaultOccupancy /** * Constructs a new `DefaultOccupancy` instance. * @param roomId The unique identifier of the room. - * @param realtime An instance of the Ably Realtime client. + * @param channelManager An instance of the ChannelManager. * @param chatApi An instance of the ChatApi. * @param logger An instance of the Logger. */ - constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, logger: Logger) { + constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, logger: Logger) { super(); this._roomId = roomId; - this._channel = this._makeChannel(roomId, realtime); + this._channel = this._makeChannel(roomId, channelManager); this._chatApi = chatApi; this._logger = logger; } @@ -122,8 +123,8 @@ export class DefaultOccupancy /** * Creates the realtime channel for occupancy. */ - private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel { - const channel = getChannel(messagesChannelName(roomId), realtime, { params: { occupancy: 'metrics' } }); + private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel { + const channel = channelManager.get(DefaultOccupancy.channelName(roomId)); addListenerToChannelWithoutAttach({ listener: this._internalOccupancyListener.bind(this), events: ['[meta]occupancy'], @@ -244,4 +245,24 @@ export class DefaultOccupancy get detachmentErrorCode(): ErrorCodes { return ErrorCodes.OccupancyDetachmentFailed; } + + /** + * Merges the channel options for the room with the ones required for presence. + * + * @param roomOptions The room options to merge for. + * @returns A function that merges the channel options for the room with the ones required for presence. + */ + static channelOptionMerger(): ChannelOptionsMerger { + return (options) => ({ ...options, params: { ...options.params, occupancy: 'metrics' } }); + } + + /** + * Returns the channel name for the presence channel. + * + * @param roomId The unique identifier of the room. + * @returns The channel name for the presence channel. + */ + static channelName(roomId: string): string { + return messagesChannelName(roomId); + } } diff --git a/src/core/presence.ts b/src/core/presence.ts index 512554b3..446f439e 100644 --- a/src/core/presence.ts +++ b/src/core/presence.ts @@ -1,6 +1,7 @@ import * as Ably from 'ably'; -import { getChannel, messagesChannelName } from './channel.js'; +import { messagesChannelName } from './channel.js'; +import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js'; import { DiscontinuityEmitter, DiscontinuityListener, @@ -198,16 +199,15 @@ export class DefaultPresence /** * Constructs a new `DefaultPresence` instance. * @param roomId The unique identifier of the room. - * @param roomOptions The room options for presence. - * @param realtime An instance of the Ably Realtime client. + * @param channelManager The channel manager to use for creating the presence channel. * @param clientId The client ID, attached to presences messages as an identifier of the sender. * A channel can have multiple connections using the same clientId. * @param logger An instance of the Logger. */ - constructor(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime, clientId: string, logger: Logger) { + constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) { super(); - this._channel = this._makeChannel(roomId, roomOptions, realtime); + this._channel = this._makeChannel(roomId, channelManager); this._clientId = clientId; this._logger = logger; } @@ -215,18 +215,8 @@ export class DefaultPresence /** * Creates the realtime channel for presence. */ - private _makeChannel(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime): Ably.RealtimeChannel { - // Set our channel modes based on the room options - const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[]; - if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) { - channelModes.push('PRESENCE'); - } - - if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) { - channelModes.push('PRESENCE_SUBSCRIBE'); - } - - const channel = getChannel(messagesChannelName(roomId), realtime, { modes: channelModes }); + private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel { + const channel = channelManager.get(DefaultPresence.channelName(roomId)); addListenerToChannelPresenceWithoutAttach({ listener: this.subscribeToEvents.bind(this), @@ -418,4 +408,35 @@ export class DefaultPresence get detachmentErrorCode(): ErrorCodes { return ErrorCodes.PresenceDetachmentFailed; } + + /** + * Merges the channel options for the room with the ones required for presence. + * + * @param roomOptions The room options to merge for. + * @returns A function that merges the channel options for the room with the ones required for presence. + */ + static channelOptionMerger(roomOptions: RoomOptions): ChannelOptionsMerger { + return (options) => { + const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[]; + if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) { + channelModes.push('PRESENCE'); + } + + if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) { + channelModes.push('PRESENCE_SUBSCRIBE'); + } + + return { ...options, modes: channelModes }; + }; + } + + /** + * Returns the channel name for the presence channel. + * + * @param roomId The unique identifier of the room. + * @returns The channel name for the presence channel. + */ + static channelName(roomId: string): string { + return messagesChannelName(roomId); + } } diff --git a/src/core/room-reactions.ts b/src/core/room-reactions.ts index f8c4932a..61eb295c 100644 --- a/src/core/room-reactions.ts +++ b/src/core/room-reactions.ts @@ -1,6 +1,6 @@ import * as Ably from 'ably'; -import { getChannel } from './channel.js'; +import { ChannelManager } from './channel-manager.js'; import { DiscontinuityEmitter, DiscontinuityListener, @@ -143,14 +143,14 @@ export class DefaultRoomReactions /** * Constructs a new `DefaultRoomReactions` instance. * @param roomId The unique identifier of the room. - * @param realtime An instance of the Ably Realtime client. + * @param channelManager The ChannelManager instance. * @param clientId The client ID of the user. * @param logger An instance of the Logger. */ - constructor(roomId: string, realtime: Ably.Realtime, clientId: string, logger: Logger) { + constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) { super(); - this._channel = this._makeChannel(roomId, realtime); + this._channel = this._makeChannel(roomId, channelManager); this._clientId = clientId; this._logger = logger; } @@ -158,8 +158,8 @@ export class DefaultRoomReactions /** * Creates the realtime channel for room reactions. */ - private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel { - const channel = getChannel(`${roomId}::$chat::$reactions`, realtime); + private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel { + const channel = channelManager.get(`${roomId}::$chat::$reactions`); addListenerToChannelWithoutAttach({ listener: this._forwarder.bind(this), events: [RoomReactionEvents.Reaction], diff --git a/src/core/room.ts b/src/core/room.ts index a7c2f536..1de1290a 100644 --- a/src/core/room.ts +++ b/src/core/room.ts @@ -1,6 +1,7 @@ import * as Ably from 'ably'; import cloneDeep from 'lodash.clonedeep'; +import { ChannelManager } from './channel-manager.js'; import { ChatApi } from './chat-api.js'; import { Logger } from './logger.js'; import { DefaultMessages, Messages } from './messages.js'; @@ -167,32 +168,34 @@ export class DefaultRoom implements Room { this._logger = logger; this._lifecycle = new DefaultRoomLifecycle(roomId, logger); + const channelManager = this._getChannelManager(options, realtime, logger); + // Setup features - this._messages = new DefaultMessages(roomId, realtime, this._chatApi, realtime.auth.clientId, logger); + this._messages = new DefaultMessages(roomId, channelManager, this._chatApi, realtime.auth.clientId, logger); const features: ContributesToRoomLifecycle[] = [this._messages]; if (options.presence) { this._logger.debug('enabling presence on room', { roomId }); - this._presence = new DefaultPresence(roomId, options, realtime, realtime.auth.clientId, logger); + this._presence = new DefaultPresence(roomId, channelManager, realtime.auth.clientId, logger); features.push(this._presence); } if (options.typing) { this._logger.debug('enabling typing on room', { roomId }); - this._typing = new DefaultTyping(roomId, options.typing, realtime, realtime.auth.clientId, logger); + this._typing = new DefaultTyping(roomId, options.typing, channelManager, realtime.auth.clientId, logger); features.push(this._typing); } if (options.reactions) { this._logger.debug('enabling reactions on room', { roomId }); - this._reactions = new DefaultRoomReactions(roomId, realtime, realtime.auth.clientId, logger); + this._reactions = new DefaultRoomReactions(roomId, channelManager, realtime.auth.clientId, logger); features.push(this._reactions); } if (options.occupancy) { this._logger.debug('enabling occupancy on room', { roomId }); - this._occupancy = new DefaultOccupancy(roomId, realtime, this._chatApi, logger); + this._occupancy = new DefaultOccupancy(roomId, channelManager, this._chatApi, logger); features.push(this._occupancy); } @@ -210,13 +213,34 @@ export class DefaultRoom implements Room { await this._lifecycleManager.release(); for (const feature of features) { - realtime.channels.release(feature.channel.name); + channelManager.release(feature.channel.name); } finalized = true; }; } + /** + * Gets the channel manager for the room, which handles merging channel options together and creating channels. + * + * @param options The room options. + * @param realtime An instance of the Ably Realtime client. + * @param logger An instance of the Logger. + */ + private _getChannelManager(options: RoomOptions, realtime: Ably.Realtime, logger: Logger): ChannelManager { + const manager = new ChannelManager(realtime, logger); + + if (options.occupancy) { + manager.mergeOptions(DefaultOccupancy.channelName(this._roomId), DefaultOccupancy.channelOptionMerger()); + } + + if (options.presence) { + manager.mergeOptions(DefaultPresence.channelName(this._roomId), DefaultPresence.channelOptionMerger(options)); + } + + return manager; + } + /** * @inheritdoc Room */ diff --git a/src/core/typing.ts b/src/core/typing.ts index b6707cdb..ed563c23 100644 --- a/src/core/typing.ts +++ b/src/core/typing.ts @@ -1,7 +1,7 @@ import * as Ably from 'ably'; import { dequal } from 'dequal'; -import { getChannel } from './channel.js'; +import { ChannelManager } from './channel-manager.js'; import { DiscontinuityEmitter, DiscontinuityListener, @@ -133,14 +133,20 @@ export class DefaultTyping * Constructs a new `DefaultTyping` instance. * @param roomId The unique identifier of the room. * @param options The options for typing in the room. - * @param realtime An instance of the Ably Realtime client. + * @param channelManager The channel manager for the room. * @param clientId The client ID of the user. * @param logger An instance of the Logger. */ - constructor(roomId: string, options: TypingOptions, realtime: Ably.Realtime, clientId: string, logger: Logger) { + constructor( + roomId: string, + options: TypingOptions, + channelManager: ChannelManager, + clientId: string, + logger: Logger, + ) { super(); this._clientId = clientId; - this._channel = this._makeChannel(roomId, realtime); + this._channel = this._makeChannel(roomId, channelManager); // Timeout for typing this._typingTimeoutMs = options.timeoutMs; @@ -150,8 +156,8 @@ export class DefaultTyping /** * Creates the realtime channel for typing indicators. */ - private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel { - const channel = getChannel(`${roomId}::$chat::$typingIndicators`, realtime); + private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel { + const channel = channelManager.get(`${roomId}::$chat::$typingIndicators`); addListenerToChannelPresenceWithoutAttach({ listener: this._internalSubscribeToEvents.bind(this), channel: channel, diff --git a/test/core/channel-manager.test.ts b/test/core/channel-manager.test.ts new file mode 100644 index 00000000..71acba08 --- /dev/null +++ b/test/core/channel-manager.test.ts @@ -0,0 +1,103 @@ +import * as Ably from 'ably'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { ChannelManager, ChannelOptionsMerger } from '../../src/core/channel-manager.ts'; +import { DEFAULT_CHANNEL_OPTIONS } from '../../src/core/version.ts'; +import { randomClientId } from '../helper/identifier.ts'; +import { makeTestLogger } from '../helper/logger.ts'; + +interface TestContext { + mockRealtime: Ably.Realtime; + channelManager: ChannelManager; +} + +vi.mock('ably'); + +describe('ChannelManager', () => { + beforeEach((context) => { + context.mockRealtime = new Ably.Realtime({ clientId: randomClientId() }); + context.channelManager = new ChannelManager(context.mockRealtime, makeTestLogger()); + + vi.spyOn(context.mockRealtime.channels, 'get').mockReturnValue({} as Ably.RealtimeChannel); + vi.spyOn(context.mockRealtime.channels, 'release'); + }); + + it('requests channel with default options', (context) => { + const channelName = 'test-channel'; + context.channelManager.get(channelName); + + expect(context.mockRealtime.channels.get).toHaveBeenCalledWith(channelName, DEFAULT_CHANNEL_OPTIONS); + }); + + it('should merge options correctly', (context) => { + const channelName = 'test-channel'; + const merger: ChannelOptionsMerger = (options) => ({ ...options, mode: 'presence' }); + + context.channelManager.mergeOptions(channelName, merger); + + context.channelManager.get(channelName); + expect(context.mockRealtime.channels.get).toHaveBeenCalledWith(channelName, { + ...DEFAULT_CHANNEL_OPTIONS, + mode: 'presence', + }); + }); + + it('should merge options multiple times over', (context) => { + const channelName = 'test-channel'; + const merger: ChannelOptionsMerger = (options) => ({ ...options, mode: 'presence' }); + const merger2: ChannelOptionsMerger = (options) => ({ ...options, presence: 'enter' }); + + context.channelManager.mergeOptions(channelName, merger); + context.channelManager.mergeOptions(channelName, merger2); + + context.channelManager.get(channelName); + expect(context.mockRealtime.channels.get).toHaveBeenCalledWith(channelName, { + ...DEFAULT_CHANNEL_OPTIONS, + mode: 'presence', + presence: 'enter', + }); + }); + + it('should throw error if trying to merge options for a requested channel', (context) => { + const channelName = 'test-channel'; + const merger: ChannelOptionsMerger = (options) => ({ ...options, mode: 'presence' }); + const merger2: ChannelOptionsMerger = (options) => ({ ...options, presence: 'enter' }); + + context.channelManager.mergeOptions(channelName, merger); + context.channelManager.get(channelName); + + // Should have been called once + expect(context.mockRealtime.channels.get).toHaveBeenCalledTimes(1); + expect(context.mockRealtime.channels.get).toHaveBeenCalledWith(channelName, { + ...DEFAULT_CHANNEL_OPTIONS, + mode: 'presence', + }); + + // Now try to merge again, should error + expect(() => { + context.channelManager.mergeOptions(channelName, merger2); + }).toThrowErrorInfo({ code: 40000, statusCode: 400 }); + + // And we shouldn't have called get again + expect(context.mockRealtime.channels.get).toHaveBeenCalledTimes(1); + }); + + it('should get a channel singleton', (context) => { + const channelName = 'test-channel'; + const merger: ChannelOptionsMerger = (options) => ({ ...options, mode: 'presence' }); + + context.channelManager.mergeOptions(channelName, merger); + const channel1 = context.channelManager.get(channelName); + const channel2 = context.channelManager.get(channelName); + + expect(channel1).toBe(channel2); + }); + + it('should release a channel', (context) => { + const channelName = 'test-channel'; + context.channelManager.get(channelName); + + context.channelManager.release(channelName); + expect(context.mockRealtime.channels.release).toHaveBeenCalledWith(channelName); + }); +}); diff --git a/test/core/room.test.ts b/test/core/room.test.ts index 5760ecd8..69a8de7d 100644 --- a/test/core/room.test.ts +++ b/test/core/room.test.ts @@ -8,6 +8,7 @@ import { RoomLifecycleManager } from '../../src/core/room-lifecycle-manager.ts'; import { RoomOptions, RoomOptionsDefaults } from '../../src/core/room-options.ts'; import { RoomStatus } from '../../src/core/room-status.ts'; import { DefaultTyping } from '../../src/core/typing.ts'; +import { CHANNEL_OPTIONS_AGENT_STRING, DEFAULT_CHANNEL_OPTIONS } from '../../src/core/version.ts'; import { randomRoomId } from '../helper/identifier.ts'; import { makeTestLogger } from '../helper/logger.ts'; import { ablyRealtimeClient } from '../helper/realtime-client.ts'; @@ -76,6 +77,42 @@ describe('Room', () => { }); }); + it('should apply channel options via the channel manager', (context) => { + vi.spyOn(context.realtime.channels, 'get'); + const room = context.getRoom(defaultRoomOptions) as DefaultRoom; + + // Check that the shared channel for messages, occupancy and presence was called with the correct options + const expectedMessagesChannelOptions = { + params: { occupancy: 'metrics', agent: CHANNEL_OPTIONS_AGENT_STRING }, + modes: ['PUBLISH', 'SUBSCRIBE', 'PRESENCE', 'PRESENCE_SUBSCRIBE'], + }; + + expect(context.realtime.channels.get).toHaveBeenCalledTimes(5); + expect(context.realtime.channels.get).toHaveBeenNthCalledWith( + 1, + room.messages.channel.name, + expectedMessagesChannelOptions, + ); + expect(context.realtime.channels.get).toHaveBeenNthCalledWith( + 2, + room.messages.channel.name, + expectedMessagesChannelOptions, + ); + expect(context.realtime.channels.get).toHaveBeenNthCalledWith( + 5, + room.messages.channel.name, + expectedMessagesChannelOptions, + ); + + // Check that the reactions and typing channels were called with the default options + expect(context.realtime.channels.get).toHaveBeenNthCalledWith(3, room.typing.channel.name, DEFAULT_CHANNEL_OPTIONS); + expect(context.realtime.channels.get).toHaveBeenNthCalledWith( + 4, + room.reactions.channel.name, + DEFAULT_CHANNEL_OPTIONS, + ); + }); + describe('room status', () => { it('should have a room status and error', async (context) => { const room = context.getRoom(defaultRoomOptions);