From 1a6efa6c3361bb65459a2dd4f94983ef0743b6cf Mon Sep 17 00:00:00 2001 From: weareoutman Date: Fri, 15 Nov 2024 17:14:33 +0800 Subject: [PATCH] fix(): emit error callback for message.subscribe when connect failed after retry limit exceeded --- etc/runtime.api.md | 4 +- .../src/websocket/MessageDispatcher.ts | 49 +++++++++++++------ .../src/websocket/MessageService.ts | 10 +++- .../CustomTemplates/setupTemplateProxy.ts | 4 +- packages/runtime/src/internal/Runtime.ts | 4 +- 5 files changed, 48 insertions(+), 23 deletions(-) diff --git a/etc/runtime.api.md b/etc/runtime.api.md index 5921e3c947..1615662a04 100644 --- a/etc/runtime.api.md +++ b/etc/runtime.api.md @@ -474,8 +474,8 @@ export interface RuntimeHooks { messageDispatcher?: { subscribe(...args: unknown[]): Promise; unsubscribe(...args: unknown[]): Promise; - onMessage(channel: string, listener: (data: unknown) => void): void; - onClose(listener: () => void): void; + onMessage(channel: string, listener: (data: unknown) => void): () => void; + onClose(listener: () => void): () => void; reset(): void; }; // (undocumented) diff --git a/packages/easyops-runtime/src/websocket/MessageDispatcher.ts b/packages/easyops-runtime/src/websocket/MessageDispatcher.ts index af065bd269..600c7ad246 100644 --- a/packages/easyops-runtime/src/websocket/MessageDispatcher.ts +++ b/packages/easyops-runtime/src/websocket/MessageDispatcher.ts @@ -4,6 +4,7 @@ import { MessageService, type MessageListener, CloseListener, + RETRY_LIMIT, } from "./MessageService.js"; interface MessageResponse { @@ -98,8 +99,8 @@ export class MessageDispatcher { }); } - onClose(listener: CloseListener): void { - this.#ms.onClose(listener); + onClose(listener: CloseListener) { + return this.#ms.onClose(listener); } reset(): void { @@ -125,21 +126,37 @@ export class MessageDispatcher { }; const promise = new Promise((resolve, reject) => { const identity = getIdentity(payload); - this.#ms.onMessage((response) => { - const isSuccess = - response.event === - (type === "sub" ? "TOPIC.SUB_SUCCESS" : "TOPIC.UNSUB_SUCCESS"); - const isFailed = - response.event === - (type === "sub" ? "TOPIC.SUB_FAILED" : "TOPIC.UNSUB_FAILED"); - if ( - (isSuccess || isFailed) && - // Put this after event type checks, to prevent unnecessary - // JSON stringify. - identity === getIdentity(response.payload) - ) { - (isSuccess ? resolve : reject)(response); + const disposeOnMessage = this.#ms.onMessage( + (response) => { + const isSuccess = + response.event === + (type === "sub" ? "TOPIC.SUB_SUCCESS" : "TOPIC.UNSUB_SUCCESS"); + const isFailed = + response.event === + (type === "sub" ? "TOPIC.SUB_FAILED" : "TOPIC.UNSUB_FAILED"); + if ( + (isSuccess || isFailed) && + // Put this after event type checks, to prevent unnecessary + // JSON stringify. + identity === getIdentity(response.payload) + ) { + (isSuccess ? resolve : reject)(response); + disposeOnMessage(); + // eslint-disable-next-line @typescript-eslint/no-use-before-define + disposeOnClose(); + } } + ); + // istanbul ignore next: currently can't mock this + const disposeOnClose = this.#ms.onClose(() => { + // V2 will emit error callback for sub/unsub each time connect failed, + // while v3 will emit only once after retry limit exceeded. + // Keep the `EVENT.detail.retryCount` for compatibility. + reject({ + retryCount: RETRY_LIMIT, + }); + disposeOnMessage(); + disposeOnClose(); }); this.#ms.send(request); }); diff --git a/packages/easyops-runtime/src/websocket/MessageService.ts b/packages/easyops-runtime/src/websocket/MessageService.ts index c246798ca5..d7ab4480e3 100644 --- a/packages/easyops-runtime/src/websocket/MessageService.ts +++ b/packages/easyops-runtime/src/websocket/MessageService.ts @@ -2,7 +2,7 @@ export type MessageListener = (response: T) => void; export type CloseListener = () => void; const RETRY_TIMEOUT_UNIT = 1000; -const RETRY_LIMIT = 5; +export const RETRY_LIMIT = 5; export class MessageService { #url: string; @@ -103,10 +103,18 @@ export class MessageService { onMessage(listener: MessageListener) { this.#messageListeners.add(listener as MessageListener); + + return () => { + this.#messageListeners.delete(listener as MessageListener); + }; } onClose(listener: CloseListener) { this.#closeListeners.add(listener); + + return () => { + this.#closeListeners.delete(listener); + }; } reset() { diff --git a/packages/runtime/src/internal/CustomTemplates/setupTemplateProxy.ts b/packages/runtime/src/internal/CustomTemplates/setupTemplateProxy.ts index e24798d3de..5ee0d9c96b 100644 --- a/packages/runtime/src/internal/CustomTemplates/setupTemplateProxy.ts +++ b/packages/runtime/src/internal/CustomTemplates/setupTemplateProxy.ts @@ -36,7 +36,7 @@ export function setupTemplateProxy( asyncHostProps: AsyncPropertyEntry[] ): AsyncPropertyEntry[] => { return propertyProxies! - .map(({ from, to }) => { + .map(({ from, to }) => { const filtered = asyncHostProps.filter( (entry) => entry[0] === from ); @@ -48,7 +48,7 @@ export function setupTemplateProxy( ]; } }) - .filter(Boolean) as [string, Promise][]; + .filter(Boolean) as AsyncPropertyEntry[]; }; asyncComputedProps = getComputedProps(asyncHostPropertyEntries); diff --git a/packages/runtime/src/internal/Runtime.ts b/packages/runtime/src/internal/Runtime.ts index 7286ff4f38..06ccf1b892 100644 --- a/packages/runtime/src/internal/Runtime.ts +++ b/packages/runtime/src/internal/Runtime.ts @@ -111,8 +111,8 @@ export interface RuntimeHooks { messageDispatcher?: { subscribe(...args: unknown[]): Promise; unsubscribe(...args: unknown[]): Promise; - onMessage(channel: string, listener: (data: unknown) => void): void; - onClose(listener: () => void): void; + onMessage(channel: string, listener: (data: unknown) => void): () => void; + onClose(listener: () => void): () => void; reset(): void; }; pageView?: {