From d2ab68d52fb7a1898f72e4e4c40ea3af0feefcf7 Mon Sep 17 00:00:00 2001 From: Tobias Sachs Date: Fri, 18 Oct 2024 12:15:19 +0200 Subject: [PATCH 1/4] Add method `streamControllable(..)` This method essentially does the same as `stream` previously did, but instead of returning the stream it does return the SteamControler. This makes it possible for the abort the streaming on the client side by calling `close()` on the controller. The `stream()` uses `streamControllable()` to return the stream, the method is still keept to not break existing usage of the method. --- lib/hub_connection.dart | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/hub_connection.dart b/lib/hub_connection.dart index 7a96544..079bfaf 100644 --- a/lib/hub_connection.dart +++ b/lib/hub_connection.dart @@ -330,6 +330,17 @@ class HubConnection { /// Returns an object that yields results from the server as they are received. /// Stream stream(String methodName, List args) { + return streamControllable(methodName, args).stream; + } + + /// Invokes a streaming hub method on the server using the specified name and arguments. + /// + /// T: The type of the items returned by the server. + /// methodName: The name of the server method to invoke. + /// args: The arguments used to invoke the server method. + /// Returns a StreamControler object that yields results from the server as they are received. + /// + StreamController streamControllable(String methodName, List args) { final t = _replaceStreamingParams(args); final invocationDescriptor = _createStreamInvocation(methodName, args, t.item2); @@ -371,7 +382,7 @@ class HubConnection { _launchStreams(t.item1, promiseQueue); - return streamController.stream; + return streamController; } Future _sendMessage(Object? message) { @@ -379,6 +390,7 @@ class HubConnection { return _connection.send(message); } + /// Sends a js object to the server. /// message: The object to serialize and send. /// From fa261703ce82dd9b0fbdb680190b1b44036bc487 Mon Sep 17 00:00:00 2001 From: Tobias Sachs Date: Thu, 31 Oct 2024 15:17:02 +0100 Subject: [PATCH 2/4] Fix creating client to server streams prevents an index out of range exceptions and uses the correct invocation ids for sent items --- lib/hub_connection.dart | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/lib/hub_connection.dart b/lib/hub_connection.dart index 079bfaf..1e508af 100644 --- a/lib/hub_connection.dart +++ b/lib/hub_connection.dart @@ -380,7 +380,7 @@ class HubConnection { _callbacks.remove(invocationDescriptor.invocationId); }); - _launchStreams(t.item1, promiseQueue); + _launchStreams(t, promiseQueue); return streamController; } @@ -390,7 +390,6 @@ class HubConnection { return _connection.send(message); } - /// Sends a js object to the server. /// message: The object to serialize and send. /// @@ -411,9 +410,9 @@ class HubConnection { args = args ?? []; final t = _replaceStreamingParams(args); final sendPromise = - _sendWithProtocol(_createInvocation(methodName, args, true, t.item2)); + _sendWithProtocol(_createInvocation(methodName, args, true, t.keys.toList())); - _launchStreams(t.item1, sendPromise); + _launchStreams(t, sendPromise); return sendPromise; } @@ -431,7 +430,7 @@ class HubConnection { args = args ?? []; final t = _replaceStreamingParams(args); final invocationDescriptor = - _createInvocation(methodName, args, false, t.item2); + _createInvocation(methodName, args, false, t.keys.toList()); final completer = Completer(); @@ -914,7 +913,7 @@ class HubConnection { } } - _launchStreams(List> streams, Future? promiseQueue) { + _launchStreams(Map> streams, Future? promiseQueue) { if (streams.length == 0) { return; } @@ -925,13 +924,13 @@ class HubConnection { } // We want to iterate over the keys, since the keys are the stream ids - for (var i = 0; i < streams.length; i++) { - streams[i].listen((item) { + streams.forEach((id, stream) { + stream.listen((item) { promiseQueue = promiseQueue?.then((_) => - _sendWithProtocol(_createStreamItemMessage(i.toString(), item))); + _sendWithProtocol(_createStreamItemMessage(id, item))); }, onDone: () { promiseQueue = promiseQueue?.then( - (_) => _sendWithProtocol(_createCompletionMessage(i.toString()))); + (_) => _sendWithProtocol(_createCompletionMessage(id))); }, onError: (err) { String message; if (err is Exception) { @@ -941,15 +940,14 @@ class HubConnection { } promiseQueue = promiseQueue?.then((_) => _sendWithProtocol( - _createCompletionMessage(i.toString(), error: message))); + _createCompletionMessage(id, error: message))); }); - } + }); } - Tuple2>, List> _replaceStreamingParams( + Map> _replaceStreamingParams( List args) { - final List> streams = []; - final List streamIds = []; + final Map> streams = new Map>(); for (var i = 0; i < args.length; i++) { final argument = args[i]; @@ -957,15 +955,14 @@ class HubConnection { final streamId = _invocationId!; _invocationId = _invocationId! + 1; // Store the stream for later use - streams[streamId] = argument as Stream; - streamIds.add(streamId.toString()); + streams[streamId.toString()] = argument as Stream; // remove stream from args args.removeAt(i); } } - return Tuple2>, List>(streams, streamIds); + return streams; } /// isObservable From e806a38901179ab800c1597116173a5629aaa9ed Mon Sep 17 00:00:00 2001 From: Tobias Sachs Date: Thu, 31 Oct 2024 15:19:04 +0100 Subject: [PATCH 3/4] Fix json completion message for client to server streams This sets either the 'error' or the 'result' field, or neither as it is specified here: https://github.com/dotnet/aspnetcore/blob/main/src/SignalR/docs/specs/HubProtocol.md This prevents exceptions on the server when the client closes the stream. --- lib/json_hub_protocol.dart | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/json_hub_protocol.dart b/lib/json_hub_protocol.dart index 57a98b6..a34f442 100644 --- a/lib/json_hub_protocol.dart +++ b/lib/json_hub_protocol.dart @@ -218,13 +218,17 @@ class JsonHubProtocol implements IHubProtocol { } if (message is CompletionMessage) { - return { + if (message.error != null && message.result != null) { + throw ("Completion message must contain either 'error' or 'result'"); + } + var r = { "type": messageType, "invocationId": message.invocationId, "headers": message.headers.asMap, - "error": message.error, - "result": message.result }; + if (message.error != null) r["error"] = message.error; + if (message.result != null) r["result"] = message.result; + return r; } if (message is PingMessage) { From 67446439ca8b82f270b036f1435c0105868f2b6e Mon Sep 17 00:00:00 2001 From: Tobias Sachs Date: Thu, 31 Oct 2024 16:04:58 +0100 Subject: [PATCH 4/4] fix build issue not all changes where committed... --- lib/hub_connection.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/hub_connection.dart b/lib/hub_connection.dart index 1e508af..21e88a2 100644 --- a/lib/hub_connection.dart +++ b/lib/hub_connection.dart @@ -343,7 +343,7 @@ class HubConnection { StreamController streamControllable(String methodName, List args) { final t = _replaceStreamingParams(args); final invocationDescriptor = - _createStreamInvocation(methodName, args, t.item2); + _createStreamInvocation(methodName, args, t.keys.toList()); late Future promiseQueue; final StreamController streamController = StreamController( @@ -466,7 +466,7 @@ class HubConnection { _callbacks.remove(invocationDescriptor.invocationId); }); - _launchStreams(t.item1, promiseQueue); + _launchStreams(t, promiseQueue); return completer.future; }