Skip to content

Commit

Permalink
fix(): emit error callback for message.subscribe when connect failed …
Browse files Browse the repository at this point in the history
…after retry limit exceeded
  • Loading branch information
weareoutman committed Nov 15, 2024
1 parent f9fb776 commit 1a6efa6
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
4 changes: 2 additions & 2 deletions etc/runtime.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ export interface RuntimeHooks {
messageDispatcher?: {
subscribe(...args: unknown[]): Promise<unknown>;
unsubscribe(...args: unknown[]): Promise<unknown>;
onMessage(channel: string, listener: (data: unknown) => void): void;
onClose(listener: () => void): void;
onMessage(channel: string, listener: (data: unknown) => void): () => void;
onClose(listener: () => void): () => void;
reset(): void;
};
// (undocumented)
Expand Down
49 changes: 33 additions & 16 deletions packages/easyops-runtime/src/websocket/MessageDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
MessageService,
type MessageListener,
CloseListener,
RETRY_LIMIT,
} from "./MessageService.js";

interface MessageResponse {
Expand Down Expand Up @@ -98,8 +99,8 @@ export class MessageDispatcher {
});
}

onClose(listener: CloseListener): void {
this.#ms.onClose(listener);
onClose(listener: CloseListener) {
return this.#ms.onClose(listener);
}

reset(): void {
Expand All @@ -125,21 +126,37 @@ export class MessageDispatcher {
};
const promise = new Promise((resolve, reject) => {
const identity = getIdentity(payload);
this.#ms.onMessage<MessageResponse>((response) => {
const isSuccess =
response.event ===
(type === "sub" ? "TOPIC.SUB_SUCCESS" : "TOPIC.UNSUB_SUCCESS");
const isFailed =
response.event ===
(type === "sub" ? "TOPIC.SUB_FAILED" : "TOPIC.UNSUB_FAILED");
if (
(isSuccess || isFailed) &&
// Put this after event type checks, to prevent unnecessary
// JSON stringify.
identity === getIdentity(response.payload)
) {
(isSuccess ? resolve : reject)(response);
const disposeOnMessage = this.#ms.onMessage<MessageResponse>(
(response) => {
const isSuccess =
response.event ===
(type === "sub" ? "TOPIC.SUB_SUCCESS" : "TOPIC.UNSUB_SUCCESS");
const isFailed =
response.event ===
(type === "sub" ? "TOPIC.SUB_FAILED" : "TOPIC.UNSUB_FAILED");
if (
(isSuccess || isFailed) &&
// Put this after event type checks, to prevent unnecessary
// JSON stringify.
identity === getIdentity(response.payload)
) {
(isSuccess ? resolve : reject)(response);
disposeOnMessage();
// eslint-disable-next-line @typescript-eslint/no-use-before-define
disposeOnClose();
}
}
);
// istanbul ignore next: currently can't mock this
const disposeOnClose = this.#ms.onClose(() => {
// V2 will emit error callback for sub/unsub each time connect failed,
// while v3 will emit only once after retry limit exceeded.
// Keep the `EVENT.detail.retryCount` for compatibility.
reject({
retryCount: RETRY_LIMIT,
});
disposeOnMessage();
disposeOnClose();
});
this.#ms.send(request);
});
Expand Down
10 changes: 9 additions & 1 deletion packages/easyops-runtime/src/websocket/MessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export type MessageListener<T = unknown> = (response: T) => void;
export type CloseListener = () => void;

const RETRY_TIMEOUT_UNIT = 1000;
const RETRY_LIMIT = 5;
export const RETRY_LIMIT = 5;

export class MessageService {
#url: string;
Expand Down Expand Up @@ -103,10 +103,18 @@ export class MessageService {

onMessage<T = unknown>(listener: MessageListener<T>) {
this.#messageListeners.add(listener as MessageListener<unknown>);

return () => {
this.#messageListeners.delete(listener as MessageListener<unknown>);
};
}

onClose(listener: CloseListener) {
this.#closeListeners.add(listener);

return () => {
this.#closeListeners.delete(listener);
};
}

reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function setupTemplateProxy(
asyncHostProps: AsyncPropertyEntry[]
): AsyncPropertyEntry[] => {
return propertyProxies!
.map(({ from, to }) => {
.map<AsyncPropertyEntry | undefined>(({ from, to }) => {
const filtered = asyncHostProps.filter(
(entry) => entry[0] === from
);
Expand All @@ -48,7 +48,7 @@ export function setupTemplateProxy(
];
}
})
.filter(Boolean) as [string, Promise<unknown>][];
.filter(Boolean) as AsyncPropertyEntry[];
};

asyncComputedProps = getComputedProps(asyncHostPropertyEntries);
Expand Down
4 changes: 2 additions & 2 deletions packages/runtime/src/internal/Runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ export interface RuntimeHooks {
messageDispatcher?: {
subscribe(...args: unknown[]): Promise<unknown>;
unsubscribe(...args: unknown[]): Promise<unknown>;
onMessage(channel: string, listener: (data: unknown) => void): void;
onClose(listener: () => void): void;
onMessage(channel: string, listener: (data: unknown) => void): () => void;
onClose(listener: () => void): () => void;
reset(): void;
};
pageView?: {
Expand Down

0 comments on commit 1a6efa6

Please sign in to comment.