diff --git a/lib/hub_connection.dart b/lib/hub_connection.dart index 7a96544..21e88a2 100644 --- a/lib/hub_connection.dart +++ b/lib/hub_connection.dart @@ -330,9 +330,20 @@ class HubConnection { /// Returns an object that yields results from the server as they are received. /// Stream stream(String methodName, List args) { + return streamControllable(methodName, args).stream; + } + + /// Invokes a streaming hub method on the server using the specified name and arguments. + /// + /// T: The type of the items returned by the server. + /// methodName: The name of the server method to invoke. + /// args: The arguments used to invoke the server method. + /// Returns a StreamControler object that yields results from the server as they are received. + /// + StreamController streamControllable(String methodName, List args) { final t = _replaceStreamingParams(args); final invocationDescriptor = - _createStreamInvocation(methodName, args, t.item2); + _createStreamInvocation(methodName, args, t.keys.toList()); late Future promiseQueue; final StreamController streamController = StreamController( @@ -369,9 +380,9 @@ class HubConnection { _callbacks.remove(invocationDescriptor.invocationId); }); - _launchStreams(t.item1, promiseQueue); + _launchStreams(t, promiseQueue); - return streamController.stream; + return streamController; } Future _sendMessage(Object? message) { @@ -399,9 +410,9 @@ class HubConnection { args = args ?? []; final t = _replaceStreamingParams(args); final sendPromise = - _sendWithProtocol(_createInvocation(methodName, args, true, t.item2)); + _sendWithProtocol(_createInvocation(methodName, args, true, t.keys.toList())); - _launchStreams(t.item1, sendPromise); + _launchStreams(t, sendPromise); return sendPromise; } @@ -419,7 +430,7 @@ class HubConnection { args = args ?? []; final t = _replaceStreamingParams(args); final invocationDescriptor = - _createInvocation(methodName, args, false, t.item2); + _createInvocation(methodName, args, false, t.keys.toList()); final completer = Completer(); @@ -455,7 +466,7 @@ class HubConnection { _callbacks.remove(invocationDescriptor.invocationId); }); - _launchStreams(t.item1, promiseQueue); + _launchStreams(t, promiseQueue); return completer.future; } @@ -902,7 +913,7 @@ class HubConnection { } } - _launchStreams(List> streams, Future? promiseQueue) { + _launchStreams(Map> streams, Future? promiseQueue) { if (streams.length == 0) { return; } @@ -913,13 +924,13 @@ class HubConnection { } // We want to iterate over the keys, since the keys are the stream ids - for (var i = 0; i < streams.length; i++) { - streams[i].listen((item) { + streams.forEach((id, stream) { + stream.listen((item) { promiseQueue = promiseQueue?.then((_) => - _sendWithProtocol(_createStreamItemMessage(i.toString(), item))); + _sendWithProtocol(_createStreamItemMessage(id, item))); }, onDone: () { promiseQueue = promiseQueue?.then( - (_) => _sendWithProtocol(_createCompletionMessage(i.toString()))); + (_) => _sendWithProtocol(_createCompletionMessage(id))); }, onError: (err) { String message; if (err is Exception) { @@ -929,15 +940,14 @@ class HubConnection { } promiseQueue = promiseQueue?.then((_) => _sendWithProtocol( - _createCompletionMessage(i.toString(), error: message))); + _createCompletionMessage(id, error: message))); }); - } + }); } - Tuple2>, List> _replaceStreamingParams( + Map> _replaceStreamingParams( List args) { - final List> streams = []; - final List streamIds = []; + final Map> streams = new Map>(); for (var i = 0; i < args.length; i++) { final argument = args[i]; @@ -945,15 +955,14 @@ class HubConnection { final streamId = _invocationId!; _invocationId = _invocationId! + 1; // Store the stream for later use - streams[streamId] = argument as Stream; - streamIds.add(streamId.toString()); + streams[streamId.toString()] = argument as Stream; // remove stream from args args.removeAt(i); } } - return Tuple2>, List>(streams, streamIds); + return streams; } /// isObservable diff --git a/lib/json_hub_protocol.dart b/lib/json_hub_protocol.dart index 57a98b6..a34f442 100644 --- a/lib/json_hub_protocol.dart +++ b/lib/json_hub_protocol.dart @@ -218,13 +218,17 @@ class JsonHubProtocol implements IHubProtocol { } if (message is CompletionMessage) { - return { + if (message.error != null && message.result != null) { + throw ("Completion message must contain either 'error' or 'result'"); + } + var r = { "type": messageType, "invocationId": message.invocationId, "headers": message.headers.asMap, - "error": message.error, - "result": message.result }; + if (message.error != null) r["error"] = message.error; + if (message.result != null) r["result"] = message.result; + return r; } if (message is PingMessage) {