diff --git a/lib/src/types.dart b/lib/src/types.dart index be30ff9..4eaf043 100644 --- a/lib/src/types.dart +++ b/lib/src/types.dart @@ -229,7 +229,7 @@ class ClientSideLocalBaseSession implements IBaseSession { String authrole() => _authrole; @override - serializer() => _serializer; + Serializer serializer() => _serializer; @override Future send(Object data) async { @@ -238,8 +238,11 @@ class ClientSideLocalBaseSession implements IBaseSession { @override Future receive() async { - await _completer.future; - _completer = Completer(); + if (_incomingMessages.isEmpty) { + await _completer.future; + _completer = Completer(); + } + return _incomingMessages.removeFirst(); } @@ -258,7 +261,9 @@ class ClientSideLocalBaseSession implements IBaseSession { Future feed(Object data) async { _incomingMessages.add(data); - _completer.complete(); + if (!_completer.isCompleted) { + _completer.complete(); + } } } diff --git a/test/router_test.dart b/test/router_test.dart new file mode 100644 index 0000000..10677ad --- /dev/null +++ b/test/router_test.dart @@ -0,0 +1,91 @@ +import "package:test/test.dart"; +import "package:wampproto/messages.dart" as msg; +import "package:wampproto/serializers.dart"; + +import "package:xconn/exports.dart"; +import "package:xconn/src/types.dart"; + +void main() { + const testProcedure = "io.xconn.test_procedure"; + const testTopic = "io.xconn.test_topic"; + + var router = Router()..addRealm("realm1"); + + var serializer = JSONSerializer(); + var clientSideBase = ClientSideLocalBaseSession(1, "realm1", "local", "local", serializer, router); + var serverSideBase = ServerSideLocalBaseSession(1, "realm1", "local", "local", serializer, other: clientSideBase); + + router.attachClient(serverSideBase); + + late int registrationID; + + test("register", () async { + var registerMsg = msg.Register(3, testProcedure); + await router.receiveMessage(clientSideBase, registerMsg); + + var registered = await clientSideBase.receiveMessage(); + expect(registered, isA()); + + registrationID = (registered as msg.Registered).registrationID; + }); + + test("call", () async { + var callMsg = msg.Call(4, testProcedure); + await router.receiveMessage(clientSideBase, callMsg); + + var invocation = await clientSideBase.receiveMessage(); + expect(invocation, isA()); + + var requestID = (invocation as msg.Invocation).requestID; + var yieldMsg = msg.Yield(requestID); + await router.receiveMessage(clientSideBase, yieldMsg); + + var result = await clientSideBase.receiveMessage(); + expect(result, isA()); + }); + + test("unregister", () async { + var unregisterMsg = msg.UnRegister(5, registrationID); + await router.receiveMessage(clientSideBase, unregisterMsg); + + var unregistered = await clientSideBase.receiveMessage(); + expect(unregistered, isA()); + }); + + late int subscriptionID; + + test("subscribe", () async { + var subscribeMsg = msg.Subscribe(6, testTopic); + await router.receiveMessage(clientSideBase, subscribeMsg); + + var subscribed = await clientSideBase.receiveMessage(); + expect(subscribed, isA()); + + subscriptionID = (subscribed as msg.Subscribed).subscriptionID; + }); + + test("publish", () async { + var publish = msg.Publish(7, testTopic); + await router.receiveMessage(clientSideBase, publish); + + var event = await clientSideBase.receiveMessage(); + expect(event, isA()); + + var publishAck = msg.Publish(8, testTopic, options: {"acknowledge": true}); + await router.receiveMessage(clientSideBase, publishAck); + + var eventAck = await clientSideBase.receiveMessage(); + expect(eventAck, isA()); + + var published = await clientSideBase.receiveMessage(); + expect(published, isA()); + }); + + test("unsubscribe", () async { + var unsubscribeMsg = msg.UnSubscribe(9, subscriptionID); + await router.receiveMessage(clientSideBase, unsubscribeMsg); + + var unsubscribed = await clientSideBase.receiveMessage(); + expect(unsubscribed, isA()); + }); +}