diff --git a/lib/src/session.dart b/lib/src/session.dart index ce8512c..f708f7a 100644 --- a/lib/src/session.dart +++ b/lib/src/session.dart @@ -32,6 +32,7 @@ class Session { final Map> publishRequests = {}; final Map subscribeRequests = {}; final Map subscriptions = {}; + final Map unsubscribeRequests = {}; void processIncomingMessage(Message msg) { if (msg is Result) { @@ -74,6 +75,12 @@ class Session { if (endpoint != null) { endpoint(msg); } + } else if (msg is UnSubscribed) { + var request = unsubscribeRequests.remove(msg.requestID); + if (request != null) { + subscriptions.remove(request.subscriptionId); + request.future.complete(); + } } } @@ -143,4 +150,14 @@ class Session { return completer.future; } + + Future unsubscribe(Subscription sub) { + var unsubscribe = UnSubscribe(nextID, sub.subscriptionId); + + var completer = Completer(); + unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completer, sub.subscriptionId); + baseSession.send(wampSession.sendMessage(unsubscribe)); + + return completer.future; + } } diff --git a/lib/src/types.dart b/lib/src/types.dart index 23b0bfe..cf39c15 100644 --- a/lib/src/types.dart +++ b/lib/src/types.dart @@ -58,3 +58,10 @@ class SubscribeRequest { final Completer future; final void Function(Event) endpoint; } + +class UnsubscribeRequest { + UnsubscribeRequest(this.future, this.subscriptionId); + + final Completer future; + final int subscriptionId; +}