Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/room: add channel manager, fix bug where channel options not applied #415

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/core/channel-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Ably from 'ably';

import { Logger } from './logger.js';
import { DEFAULT_CHANNEL_OPTIONS } from './version.js';

export type ChannelOptionsMerger = (options: Ably.ChannelOptions) => Ably.ChannelOptions;

export class ChannelManager {
private readonly _realtime: Ably.Realtime;
private readonly _logger: Logger;
private readonly _registeredOptions = new Map<string, Ably.ChannelOptions>();
private readonly _requestedChannels = new Set<string>();

constructor(realtime: Ably.Realtime, logger: Logger) {
logger.trace('ChannelManager();');
this._realtime = realtime;
this._logger = logger;
}

mergeOptions(channelName: string, merger: ChannelOptionsMerger): void {
this._logger.trace('ChannelManager.registerOptions();', { channelName });
if (this._requestedChannels.has(channelName)) {
this._logger.error('channel options cannot be modified after the channel has been requested', { channelName });
throw new Ably.ErrorInfo('channel options cannot be modified after the channel has been requested', 40000, 400);
}

const currentOpts = this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS;
this._registeredOptions.set(channelName, merger(currentOpts));
}

get(channelName: string): Ably.RealtimeChannel {
this._logger.trace('ChannelManager.get();', { channelName });
this._requestedChannels.add(channelName);
return this._realtime.channels.get(
channelName,
this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS,
);
}

release(channelName: string): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to also this._requestedChannels.delete(channelName) here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair shout - done!

this._logger.trace('ChannelManager.release();', { channelName });
this._requestedChannels.delete(channelName);
this._registeredOptions.delete(channelName);
this._realtime.channels.release(channelName);
}
}
16 changes: 0 additions & 16 deletions src/core/channel.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
import * as Ably from 'ably';

import { DEFAULT_CHANNEL_OPTIONS } from './version.js';

export const getChannel = (name: string, realtime: Ably.Realtime, opts?: Ably.ChannelOptions): Ably.RealtimeChannel => {
const resolvedOptions = {
...opts,
params: {
...opts?.params,
...DEFAULT_CHANNEL_OPTIONS.params,
},
};

return realtime.channels.get(name, resolvedOptions);
};

/**
* Get the channel name for the chat messages channel.
* @param roomId The room ID.
Expand Down
13 changes: 7 additions & 6 deletions src/core/messages.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import {
DiscontinuityEmitter,
Expand Down Expand Up @@ -300,16 +301,16 @@ export class DefaultMessages
/**
* Constructs a new `DefaultMessages` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager An instance of the ChannelManager.
* @param chatApi An instance of the ChatApi.
* @param clientId The client ID of the user.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, clientId: string, logger: Logger) {
super();
this._roomId = roomId;

this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);

this._chatApi = chatApi;
this._clientId = clientId;
Expand All @@ -320,8 +321,8 @@ export class DefaultMessages
/**
* Creates the realtime channel for messages.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(messagesChannelName(roomId), realtime);
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(messagesChannelName(roomId));

addListenerToChannelWithoutAttach({
listener: this._processEvent.bind(this),
Expand Down
33 changes: 27 additions & 6 deletions src/core/occupancy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import {
DiscontinuityEmitter,
Expand Down Expand Up @@ -106,24 +107,24 @@ export class DefaultOccupancy
/**
* Constructs a new `DefaultOccupancy` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager An instance of the ChannelManager.
* @param chatApi An instance of the ChatApi.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, logger: Logger) {
super();

this._roomId = roomId;
this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._chatApi = chatApi;
this._logger = logger;
}

/**
* Creates the realtime channel for occupancy.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(messagesChannelName(roomId), realtime, { params: { occupancy: 'metrics' } });
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(DefaultOccupancy.channelName(roomId));
addListenerToChannelWithoutAttach({
listener: this._internalOccupancyListener.bind(this),
events: ['[meta]occupancy'],
Expand Down Expand Up @@ -244,4 +245,24 @@ export class DefaultOccupancy
get detachmentErrorCode(): ErrorCodes {
return ErrorCodes.OccupancyDetachmentFailed;
}

/**
* Merges the channel options for the room with the ones required for presence.
*
* @param roomOptions The room options to merge for.
* @returns A function that merges the channel options for the room with the ones required for presence.
*/
static channelOptionMerger(): ChannelOptionsMerger {
return (options) => ({ ...options, params: { ...options.params, occupancy: 'metrics' } });
}

/**
* Returns the channel name for the presence channel.
*
* @param roomId The unique identifier of the room.
* @returns The channel name for the presence channel.
*/
static channelName(roomId: string): string {
return messagesChannelName(roomId);
}
}
55 changes: 38 additions & 17 deletions src/core/presence.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js';
import {
DiscontinuityEmitter,
DiscontinuityListener,
Expand Down Expand Up @@ -198,35 +199,24 @@ export class DefaultPresence
/**
* Constructs a new `DefaultPresence` instance.
* @param roomId The unique identifier of the room.
* @param roomOptions The room options for presence.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager The channel manager to use for creating the presence channel.
* @param clientId The client ID, attached to presences messages as an identifier of the sender.
* A channel can have multiple connections using the same clientId.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) {
super();

this._channel = this._makeChannel(roomId, roomOptions, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._clientId = clientId;
this._logger = logger;
}

/**
* Creates the realtime channel for presence.
*/
private _makeChannel(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime): Ably.RealtimeChannel {
// Set our channel modes based on the room options
const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[];
if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) {
channelModes.push('PRESENCE');
}

if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) {
channelModes.push('PRESENCE_SUBSCRIBE');
}

const channel = getChannel(messagesChannelName(roomId), realtime, { modes: channelModes });
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(DefaultPresence.channelName(roomId));

addListenerToChannelPresenceWithoutAttach({
listener: this.subscribeToEvents.bind(this),
Expand Down Expand Up @@ -418,4 +408,35 @@ export class DefaultPresence
get detachmentErrorCode(): ErrorCodes {
return ErrorCodes.PresenceDetachmentFailed;
}

/**
* Merges the channel options for the room with the ones required for presence.
*
* @param roomOptions The room options to merge for.
* @returns A function that merges the channel options for the room with the ones required for presence.
*/
static channelOptionMerger(roomOptions: RoomOptions): ChannelOptionsMerger {
return (options) => {
const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[];
if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) {
channelModes.push('PRESENCE');
}

if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) {
channelModes.push('PRESENCE_SUBSCRIBE');
}

return { ...options, modes: channelModes };
};
}

/**
* Returns the channel name for the presence channel.
*
* @param roomId The unique identifier of the room.
* @returns The channel name for the presence channel.
*/
static channelName(roomId: string): string {
return messagesChannelName(roomId);
}
}
12 changes: 6 additions & 6 deletions src/core/room-reactions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Ably from 'ably';

import { getChannel } from './channel.js';
import { ChannelManager } from './channel-manager.js';
import {
DiscontinuityEmitter,
DiscontinuityListener,
Expand Down Expand Up @@ -143,23 +143,23 @@ export class DefaultRoomReactions
/**
* Constructs a new `DefaultRoomReactions` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager The ChannelManager instance.
* @param clientId The client ID of the user.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) {
super();

this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._clientId = clientId;
this._logger = logger;
}

/**
* Creates the realtime channel for room reactions.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(`${roomId}::$chat::$reactions`, realtime);
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(`${roomId}::$chat::$reactions`);
addListenerToChannelWithoutAttach({
listener: this._forwarder.bind(this),
events: [RoomReactionEvents.Reaction],
Expand Down
36 changes: 30 additions & 6 deletions src/core/room.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';
import cloneDeep from 'lodash.clonedeep';

import { ChannelManager } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import { Logger } from './logger.js';
import { DefaultMessages, Messages } from './messages.js';
Expand Down Expand Up @@ -167,32 +168,34 @@ export class DefaultRoom implements Room {
this._logger = logger;
this._lifecycle = new DefaultRoomLifecycle(roomId, logger);

const channelManager = this._getChannelManager(options, realtime, logger);

// Setup features
this._messages = new DefaultMessages(roomId, realtime, this._chatApi, realtime.auth.clientId, logger);
this._messages = new DefaultMessages(roomId, channelManager, this._chatApi, realtime.auth.clientId, logger);

const features: ContributesToRoomLifecycle[] = [this._messages];

if (options.presence) {
this._logger.debug('enabling presence on room', { roomId });
this._presence = new DefaultPresence(roomId, options, realtime, realtime.auth.clientId, logger);
this._presence = new DefaultPresence(roomId, channelManager, realtime.auth.clientId, logger);
features.push(this._presence);
}

if (options.typing) {
this._logger.debug('enabling typing on room', { roomId });
this._typing = new DefaultTyping(roomId, options.typing, realtime, realtime.auth.clientId, logger);
this._typing = new DefaultTyping(roomId, options.typing, channelManager, realtime.auth.clientId, logger);
features.push(this._typing);
}

if (options.reactions) {
this._logger.debug('enabling reactions on room', { roomId });
this._reactions = new DefaultRoomReactions(roomId, realtime, realtime.auth.clientId, logger);
this._reactions = new DefaultRoomReactions(roomId, channelManager, realtime.auth.clientId, logger);
features.push(this._reactions);
}

if (options.occupancy) {
this._logger.debug('enabling occupancy on room', { roomId });
this._occupancy = new DefaultOccupancy(roomId, realtime, this._chatApi, logger);
this._occupancy = new DefaultOccupancy(roomId, channelManager, this._chatApi, logger);
features.push(this._occupancy);
}

Expand All @@ -210,13 +213,34 @@ export class DefaultRoom implements Room {
await this._lifecycleManager.release();

for (const feature of features) {
realtime.channels.release(feature.channel.name);
channelManager.release(feature.channel.name);
}

finalized = true;
};
}

/**
* Gets the channel manager for the room, which handles merging channel options together and creating channels.
*
* @param options The room options.
* @param realtime An instance of the Ably Realtime client.
* @param logger An instance of the Logger.
*/
private _getChannelManager(options: RoomOptions, realtime: Ably.Realtime, logger: Logger): ChannelManager {
const manager = new ChannelManager(realtime, logger);

if (options.occupancy) {
manager.mergeOptions(DefaultOccupancy.channelName(this._roomId), DefaultOccupancy.channelOptionMerger());
}

if (options.presence) {
manager.mergeOptions(DefaultPresence.channelName(this._roomId), DefaultPresence.channelOptionMerger(options));
}

return manager;
}

/**
* @inheritdoc Room
*/
Expand Down
Loading
Loading