Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client to server streaming fixes #99

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions lib/hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,20 @@ class HubConnection {
/// Returns an object that yields results from the server as they are received.
///
Stream<Object?> stream(String methodName, List<Object> args) {
return streamControllable(methodName, args).stream;
}

/// Invokes a streaming hub method on the server using the specified name and arguments.
///
/// T: The type of the items returned by the server.
/// methodName: The name of the server method to invoke.
/// args: The arguments used to invoke the server method.
/// Returns a StreamControler object that yields results from the server as they are received.
///
StreamController<Object?> streamControllable(String methodName, List<Object> args) {
final t = _replaceStreamingParams(args);
final invocationDescriptor =
_createStreamInvocation(methodName, args, t.item2);
_createStreamInvocation(methodName, args, t.keys.toList());

late Future<void> promiseQueue;
final StreamController streamController = StreamController<Object?>(
Expand Down Expand Up @@ -369,9 +380,9 @@ class HubConnection {
_callbacks.remove(invocationDescriptor.invocationId);
});

_launchStreams(t.item1, promiseQueue);
_launchStreams(t, promiseQueue);

return streamController.stream;
return streamController;
}

Future<void> _sendMessage(Object? message) {
Expand Down Expand Up @@ -399,9 +410,9 @@ class HubConnection {
args = args ?? [];
final t = _replaceStreamingParams(args);
final sendPromise =
_sendWithProtocol(_createInvocation(methodName, args, true, t.item2));
_sendWithProtocol(_createInvocation(methodName, args, true, t.keys.toList()));

_launchStreams(t.item1, sendPromise);
_launchStreams(t, sendPromise);
return sendPromise;
}

Expand All @@ -419,7 +430,7 @@ class HubConnection {
args = args ?? [];
final t = _replaceStreamingParams(args);
final invocationDescriptor =
_createInvocation(methodName, args, false, t.item2);
_createInvocation(methodName, args, false, t.keys.toList());

final completer = Completer<Object?>();

Expand Down Expand Up @@ -455,7 +466,7 @@ class HubConnection {
_callbacks.remove(invocationDescriptor.invocationId);
});

_launchStreams(t.item1, promiseQueue);
_launchStreams(t, promiseQueue);

return completer.future;
}
Expand Down Expand Up @@ -902,7 +913,7 @@ class HubConnection {
}
}

_launchStreams(List<Stream<Object>> streams, Future<void>? promiseQueue) {
_launchStreams(Map<String, Stream<Object>> streams, Future<void>? promiseQueue) {
if (streams.length == 0) {
return;
}
Expand All @@ -913,13 +924,13 @@ class HubConnection {
}

// We want to iterate over the keys, since the keys are the stream ids
for (var i = 0; i < streams.length; i++) {
streams[i].listen((item) {
streams.forEach((id, stream) {
stream.listen((item) {
promiseQueue = promiseQueue?.then((_) =>
_sendWithProtocol(_createStreamItemMessage(i.toString(), item)));
_sendWithProtocol(_createStreamItemMessage(id, item)));
}, onDone: () {
promiseQueue = promiseQueue?.then(
(_) => _sendWithProtocol(_createCompletionMessage(i.toString())));
(_) => _sendWithProtocol(_createCompletionMessage(id)));
}, onError: (err) {
String message;
if (err is Exception) {
Expand All @@ -929,31 +940,29 @@ class HubConnection {
}

promiseQueue = promiseQueue?.then((_) => _sendWithProtocol(
_createCompletionMessage(i.toString(), error: message)));
_createCompletionMessage(id, error: message)));
});
}
});
}

Tuple2<List<Stream<Object>>, List<String>> _replaceStreamingParams(
Map<String, Stream<Object>> _replaceStreamingParams(
List<Object> args) {
final List<Stream<Object>> streams = [];
final List<String> streamIds = [];
final Map<String, Stream<Object>> streams = new Map<String, Stream<Object>>();

for (var i = 0; i < args.length; i++) {
final argument = args[i];
if (argument is Stream) {
final streamId = _invocationId!;
_invocationId = _invocationId! + 1;
// Store the stream for later use
streams[streamId] = argument as Stream<Object>;
streamIds.add(streamId.toString());
streams[streamId.toString()] = argument as Stream<Object>;

// remove stream from args
args.removeAt(i);
}
}

return Tuple2<List<Stream<Object>>, List<String>>(streams, streamIds);
return streams;
}

/// isObservable
Expand Down
10 changes: 7 additions & 3 deletions lib/json_hub_protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,17 @@ class JsonHubProtocol implements IHubProtocol {
}

if (message is CompletionMessage) {
return {
if (message.error != null && message.result != null) {
throw ("Completion message must contain either 'error' or 'result'");
}
var r = {
"type": messageType,
"invocationId": message.invocationId,
"headers": message.headers.asMap,
"error": message.error,
"result": message.result
};
if (message.error != null) r["error"] = message.error;
if (message.result != null) r["result"] = message.result;
return r;
}

if (message is PingMessage) {
Expand Down