Skip to content

Commit

Permalink
Merge pull request #17 from xconnio/remove-stream-controller
Browse files Browse the repository at this point in the history
Remove stream controller & close stream subscription
  • Loading branch information
Mahad-10 authored May 10, 2024
2 parents 01f8141 + b66734f commit 77043dd
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 21 deletions.
16 changes: 5 additions & 11 deletions lib/src/acceptor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,22 @@ class WAMPSessionAcceptor {

Completer<BaseSession> completer = Completer<BaseSession>();

// ignore: cancel_subscriptions
late StreamSubscription<dynamic> wsStreamSubscription;
final sessionStreamController = StreamController.broadcast();

wsStreamSubscription = ws.listen((message) {
MapEntry<Object, bool> received = acceptor.receive(message);
ws.add(received.key);
if (received.value) {
wsStreamSubscription.onData(null);
wsStreamSubscription
..onData(null)
..onDone(null);

var base = BaseSession(ws, wsStreamSubscription, acceptor.getSessionDetails(), _serializer);
completer.complete(base);
}
});

wsStreamSubscription.onDone(() {
sessionStreamController.stream.isEmpty.then(
(isEmpty) => {
if (!isEmpty) {sessionStreamController.close()},
},
);
wsStreamSubscription.cancel();
});

return completer.future;
}
}
14 changes: 4 additions & 10 deletions lib/src/joiner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,22 @@ class WAMPSessionJoiner {

var welcomeCompleter = Completer<BaseSession>();

// ignore: cancel_subscriptions
late StreamSubscription<dynamic> wsStreamSubscription;
final sessionStreamController = StreamController.broadcast();

wsStreamSubscription = ws.listen((event) {
dynamic toSend = joiner.receive(event);
if (toSend == null) {
wsStreamSubscription.onData(null);
wsStreamSubscription
..onData(null)
..onDone(null);

BaseSession baseSession = BaseSession(ws, wsStreamSubscription, joiner.getSessionDetails(), _serializer);
welcomeCompleter.complete(baseSession);
} else {
ws.add(toSend);
}
});
wsStreamSubscription.onDone(() {
sessionStreamController.stream.isEmpty.then(
(isEmpty) => {
if (!isEmpty) {sessionStreamController.close()},
},
);
wsStreamSubscription.cancel();
});

return welcomeCompleter.future;
}
Expand Down
1 change: 1 addition & 0 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class BaseSession extends IBaseSession {

@override
Future<void> close() async {
await _wsStreamSubscription.cancel();
await _ws.close();
}
}
Expand Down

0 comments on commit 77043dd

Please sign in to comment.