From 362974544a22fdaab1334a56935a8d01d4f3cc26 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 24 May 2024 10:55:12 -0700 Subject: [PATCH] Add new constructors for MQTT5 packet builders (#745) --- .../java/com/example/mqtt5/Mqtt5Sample.java | 16 +--- .../crt/mqtt5/packets/PublishPacket.java | 13 +++ .../crt/mqtt5/packets/SubscribePacket.java | 19 ++++ .../crt/mqtt5/packets/UnsubscribePacket.java | 16 +++- .../awssdk/crt/test/Mqtt5ClientTest.java | 87 ++++++------------- .../test/Mqtt5to3AdapterConnectionTest.java | 4 +- .../main/java/canary/mqtt5/Mqtt5Canary.java | 9 +- 7 files changed, 78 insertions(+), 86 deletions(-) diff --git a/samples/mqtt5/src/main/java/com/example/mqtt5/Mqtt5Sample.java b/samples/mqtt5/src/main/java/com/example/mqtt5/Mqtt5Sample.java index e548cdc8c..22a256cd3 100644 --- a/samples/mqtt5/src/main/java/com/example/mqtt5/Mqtt5Sample.java +++ b/samples/mqtt5/src/main/java/com/example/mqtt5/Mqtt5Sample.java @@ -171,10 +171,7 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId(clientID); // Add a will - PublishPacketBuilder willBuilder = new PublishPacketBuilder(); - willBuilder.withTopic("test/topic/will"); - willBuilder.withPayload("Goodbye".getBytes()); - willBuilder.withQOS(QOS.AT_MOST_ONCE); + PublishPacketBuilder willBuilder = new PublishPacketBuilder("test/topic/will", QOS.AT_MOST_ONCE, "Goodbye".getBytes()); connectBuilder.withWill(willBuilder.build()); // Add the connection options optionsBuilder.withConnectOptions(connectBuilder.build()); @@ -192,8 +189,7 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { } // Subscribe - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription("test/topic", QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder("test/topic", QOS.AT_LEAST_ONCE); // Make sure it is successful try { SubAckPacket subAckPacket = client.subscribe(subscribePacketBuilder.build()).get(60, TimeUnit.SECONDS); @@ -209,10 +205,7 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { } // Publish - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withPayload("Hello World!".getBytes()); - publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); - publishPacketBuilder.withTopic("test/topic"); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World!".getBytes()); // Add user properties List publishProperties = new ArrayList(); publishProperties.add(new UserProperty("Red", "Blue")); @@ -231,8 +224,7 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { } // Unsubscribe - UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); - unsubscribePacketBuilder.withSubscription("test/topic"); + UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder("test/topic"); // Make sure it is successful try { client.unsubscribe(unsubscribePacketBuilder.build()).get(60, TimeUnit.SECONDS); diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/PublishPacket.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/PublishPacket.java index c8ff2ae54..66cb38e63 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/PublishPacket.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/PublishPacket.java @@ -449,6 +449,19 @@ public PublishPacketBuilder withUserProperties(List userProperties */ public PublishPacketBuilder() {} + /** + * Creates a new PublishPacketBuilder with common parameters set. + * + * @param topic The topic this message should be published to. + * @param packetQOS The MQTT quality of service level the message should be delivered with. + * @param payload The payload for the publish message. + */ + public PublishPacketBuilder(String topic, QOS packetQOS, byte[] payload) { + this.topic = topic; + this.packetQOS = packetQOS; + this.payload = payload; + } + /** * Creates a new PublishPacket using the settings set in the builder. * diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/SubscribePacket.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/SubscribePacket.java index 7c96f114a..4f754f8d6 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/SubscribePacket.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/SubscribePacket.java @@ -307,6 +307,25 @@ public SubscribePacketBuilder withUserProperties(List userProperti */ public SubscribePacketBuilder() {} + /** + * Creates a new SubscribePacketBuilder with one subscription defined. + * + * @param subscription The subscription to add within the SubscribePacket. + */ + public SubscribePacketBuilder(Subscription subscription) { + withSubscription(subscription); + } + + /** + * Creates a new SubscribePacketBuilder with one subscription defined. + * + * @param topicFilter The topic filter to subscribe to. + * @param qos The maximum QoS on which the subscriber will accept publish messages. + */ + public SubscribePacketBuilder(String topicFilter, QOS qos) { + withSubscription(new Subscription(topicFilter, qos)); + } + /** * Creates a new SUBSCRIBE packet using the settings set in the builder. * @return The SubscribePacket created from the builder diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/UnsubscribePacket.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/UnsubscribePacket.java index 736d52adc..d2683fec5 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/UnsubscribePacket.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/packets/UnsubscribePacket.java @@ -51,14 +51,15 @@ static final public class UnsubscribePacketBuilder { /** * Sets a single topic filter that the client wishes to unsubscribe from. - * @param subscription A single topic filter that the client wishes to unsubscribe from + * + * @param topicFilter A single topic filter that the client wishes to unsubscribe from. * @return The UnsubscribePacketBuilder after setting the subscription. */ - public UnsubscribePacketBuilder withSubscription(String subscription) { + public UnsubscribePacketBuilder withSubscription(String topicFilter) { if (this.subscriptions == null) { this.subscriptions = new ArrayList(); } - this.subscriptions.add(subscription); + this.subscriptions.add(topicFilter); return this; } @@ -80,6 +81,15 @@ public UnsubscribePacketBuilder withUserProperties(List userProper */ public UnsubscribePacketBuilder() {} + /** + * Creates a new UnsubscribePacketBuilder with one subscription defined. + * + * @param topicFilter A single topic filter that the client wishes to unsubscribe from. + */ + public UnsubscribePacketBuilder(String topicFilter) { + withSubscription(topicFilter); + } + /** * Creates a new UnsubscribePacket using the settings set in the builder. * @return The UnsubscribePacket created from the builder diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index 562b49c56..fda1139f5 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -95,8 +95,7 @@ public void New_UC2() { SocketOptions socketOptions = new SocketOptions(); ) { - PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); - willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId("MQTT5 CRT") @@ -202,8 +201,7 @@ public void New_UC4() { ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); SocketOptions socketOptions = new SocketOptions(); ) { - PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); - willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId("MQTT5 CRT"); @@ -454,8 +452,7 @@ public void ConnDC_UC6() { ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); SocketOptions socketOptions = new SocketOptions(); ) { - PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); - willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId("MQTT5 CRT" + UUID.randomUUID().toString()); @@ -718,8 +715,7 @@ public void ConnWS_UC6() { ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); SocketOptions socketOptions = new SocketOptions(); ) { - PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); - willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId("MQTT5 CRT"+UUID.randomUUID().toString()); @@ -1558,8 +1554,7 @@ public void NewNegative_UC3_ALT() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - PublishPacketBuilder publishBuilder = new PublishPacketBuilder(); - publishBuilder.withPayload("Hello World".getBytes()).withTopic("test/topic").withQOS(QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); publishBuilder.withMessageExpiryIntervalSeconds(9223372036854775807L); try { CompletableFuture future = client.publish(publishBuilder.build()); @@ -1606,8 +1601,7 @@ public void NewNegative_UC4() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(); - subscribeBuilder.withSubscription("test/topic", QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder("test/topic", QOS.AT_LEAST_ONCE); subscribeBuilder.withSubscriptionIdentifier(-100L); try { CompletableFuture future = client.subscribe(subscribeBuilder.build()); @@ -1654,8 +1648,7 @@ public void NewNegative_UC4_ALT() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(); - subscribeBuilder.withSubscription("test/topic", QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder("test/topic", QOS.AT_LEAST_ONCE); subscribeBuilder.withSubscriptionIdentifier(9223372036854775807L); try { CompletableFuture future = client.subscribe(subscribeBuilder.build()); @@ -1884,16 +1877,10 @@ public void Op_UC1() { PublishEvents_Futured publishEvents = new PublishEvents_Futured(); builder.withPublishEvents(publishEvents); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic); - publishPacketBuilder.withPayload("Hello World".getBytes()); - publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); - - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); - UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); - unsubscribePacketBuilder.withSubscription(testTopic); + UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(testTopic); try (Mqtt5Client client = new Mqtt5Client(builder.build())) { client.start(); @@ -1945,10 +1932,7 @@ public void Op_UC2() { builder.withTlsContext(tlsContext); ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); - PublishPacketBuilder willPacket = new PublishPacketBuilder(); - willPacket.withTopic(testTopic); - willPacket.withQOS(QOS.AT_LEAST_ONCE); - willPacket.withPayload("Hello World".getBytes()); + PublishPacketBuilder willPacket = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); connectOptions.withWill(willPacket.build()); connectOptions.withWillDelayIntervalSeconds(0L); builder.withConnectOptions(connectOptions.build()); @@ -1965,8 +1949,7 @@ public void Op_UC2() { tlsOptionsTwo.close(); builderTwo.withTlsContext(tlsContextTwo); - SubscribePacketBuilder subscribeOptions = new SubscribePacketBuilder(); - subscribeOptions.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribeOptions = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); try ( Mqtt5Client clientOne = new Mqtt5Client(builder.build()); @@ -2023,11 +2006,9 @@ public void Op_UC3() { Random random = new Random(); random.nextBytes(randomBytes); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic).withPayload(randomBytes).withQOS(QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, randomBytes); - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); try (Mqtt5Client client = new Mqtt5Client(builder.build())) { client.start(); @@ -2080,14 +2061,10 @@ public void Op_UC4() { PublishEvents_Futured publishEvents = new PublishEvents_Futured(); builderTwo.withPublishEvents(publishEvents); - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); ConnectPacketBuilder connectPacketBuilder = new ConnectPacketBuilder(); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic); - publishPacketBuilder.withPayload("Hello World".getBytes()); - publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); connectPacketBuilder.withWill(publishPacketBuilder.build()); connectPacketBuilder.withKeepAliveIntervalSeconds(4l); builder.withConnectOptions(connectPacketBuilder.build()); @@ -2166,8 +2143,7 @@ public void Op_SharedSubscription() { publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); // SubscribePacketBuilder - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(sharedTopicfilter, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(sharedTopicfilter, QOS.AT_LEAST_ONCE); try ( Mqtt5Client publisherClient = new Mqtt5Client(publisherBuilder.build()); @@ -2561,14 +2537,10 @@ public void QoS1_UC1() { subscriber.start(); eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic); - publishPacketBuilder.withPayload("Hello World".getBytes()); - publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); for (int i = 0; i < messageCount; i++) { publisher.publish(publishPacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); @@ -2636,11 +2608,8 @@ public void Retain_UC1() { // Connect and publish a retained message publisher.start(); publisherEvents.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic) - .withPayload("Hello World".getBytes()) - .withQOS(QOS.AT_LEAST_ONCE) - .withRetain(true); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); + publishPacketBuilder.withRetain(true); publisher.publish(publishPacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); // Setup for clearing the retained message @@ -2742,8 +2711,7 @@ public void Interrupt_Sub_UC1() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); - subscribePacketBuilder.withSubscription(testTopic, QOS.AT_LEAST_ONCE); + SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); try { CompletableFuture subscribeResult = client.subscribe(subscribePacketBuilder.build()); @@ -2791,8 +2759,7 @@ public void Interrupt_Unsub_UC1() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); - unsubscribePacketBuilder.withSubscription(testTopic); + UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(testTopic); try { CompletableFuture unsubscribeResult = client.unsubscribe(unsubscribePacketBuilder.build()); @@ -2840,8 +2807,7 @@ public void Interrupt_Publish_UC1() { client.start(); events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic).withQOS(QOS.AT_LEAST_ONCE).withPayload("null".getBytes()); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "null".getBytes()); try { CompletableFuture publishResult = client.publish(publishPacketBuilder.build()); @@ -2913,10 +2879,7 @@ public void OperationStatistics_UC1() { fail("Unacked operation size was not zero!"); } - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withTopic(testTopic); - publishPacketBuilder.withPayload("Hello World".getBytes()); - publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes()); for (int i = 0; i < messageCount; i++) { publisher.publish(publishPacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java index 3c65a373a..a94f8e63e 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java @@ -169,9 +169,7 @@ public void TestCreationFull() { ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); SocketOptions socketOptions = new SocketOptions();) { - PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); - willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()) - .withTopic("test/topic"); + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder("test/topic", QOS.AT_LEAST_ONCE, "Hello World".getBytes()); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectBuilder.withClientId("MQTT5 CRT") diff --git a/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java b/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java index 683e01f2d..f46977dc9 100644 --- a/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java +++ b/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java @@ -525,8 +525,7 @@ public static void OperationUnsubscribeBad(int clientIdx) { clientsData.get(clientIdx).isWaitingForOperation = true; PrintLog("[OP] About to unsubscribe (bad) client ID " + clientIdx); - UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); - unsubscribePacketBuilder.withSubscription("Non_existent_topic_here"); + UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder("Non_existent_topic_here"); try { client.unsubscribe(unsubscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS); } catch (Exception ex) { @@ -554,16 +553,14 @@ public static void OperationPublish(int clientIdx, QOS qos, String topic) { clientsData.get(clientIdx).isWaitingForOperation = true; PrintLog("[OP] About to publish client ID " + clientIdx + " with QoS " + qos + " with topic " + topic); - PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); - publishPacketBuilder.withQOS(qos); int payload_size = random.nextInt(MAX_PAYLOAD_SIZE); byte[] payload_bytes = new byte[payload_size]; for (int i = 0; i < payload_size; i++) { payload_bytes[i] = (byte)random.nextInt(128); } - publishPacketBuilder.withPayload(payload_bytes); - publishPacketBuilder.withTopic(topic); + + PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(topic, qos, payload_bytes); // Add user properties! List propertyList = new ArrayList();