From d32fc96949b660a5ccb60730ea136bdbd1344734 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 23 Jan 2024 12:17:16 +0100 Subject: [PATCH] Per Subscription Event Handlers (#120) --- Documentation/docs/connecting.md | 80 ++++++ Documentation/docs/events/Reference.md | 14 +- Documentation/docs/events/_category_.json | 2 +- Documentation/docs/help.md | 2 +- Documentation/docs/how-to/_category_.json | 2 +- .../docs/how-to/allow-invalid-certs.md | 16 +- Documentation/docs/how-to/publish.md | 78 ------ Documentation/docs/how-to/subscribe-multi.md | 21 -- Documentation/docs/how-to/subscribe.md | 70 ----- Documentation/docs/publishing.md | 111 ++++++++ Documentation/docs/quickstart.md | 241 ----------------- Documentation/docs/subscribing.md | 247 ++++++++++++++++++ .../Events/OnMessageReceivedEventArgs.cs | 2 +- Source/HiveMQtt/Client/HiveMQClient.cs | 22 +- Source/HiveMQtt/Client/HiveMQClientEvents.cs | 11 + .../HiveMQtt/Client/LastWillAndTestament.cs | 4 +- .../Client/Options/SubscribeOptions.cs | 11 + .../Client/SubscribeOptionsBuilder.cs | 47 +++- Source/HiveMQtt/MQTT5/MalformedPacket.cs | 2 + Source/HiveMQtt/MQTT5/Types/Subscription.cs | 6 + .../HiveMQClient/ClientOptionsBuilderTest.cs | 3 +- .../HiveMQtt.Test/HiveMQClient/ConnectTest.cs | 69 +---- Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs | 4 +- .../LastWillAndTestamentBuilderTest.cs | 1 - .../HiveMQClient/SubscribeBuilderTest.cs | 100 ++++++- Tests/HiveMQtt.Test/HiveMQClient/TLSTest.cs | 36 --- 26 files changed, 664 insertions(+), 538 deletions(-) create mode 100644 Documentation/docs/connecting.md delete mode 100644 Documentation/docs/how-to/publish.md delete mode 100644 Documentation/docs/how-to/subscribe-multi.md delete mode 100644 Documentation/docs/how-to/subscribe.md create mode 100644 Documentation/docs/publishing.md create mode 100644 Documentation/docs/subscribing.md diff --git a/Documentation/docs/connecting.md b/Documentation/docs/connecting.md new file mode 100644 index 00000000..bfbe3aab --- /dev/null +++ b/Documentation/docs/connecting.md @@ -0,0 +1,80 @@ +--- +sidebar_position: 4 +--- + +# Connecting to an MQTT Broker + +## with Defaults + +Without any options given, the `HiveMQClient` will search on `localhost` port 1883 for an unsecured broker. + +If you don't have a broker at this location, see the next sections. + +```csharp +using HiveMQtt.Client; + +// Connect +var client = new HiveMQClient(); +var connectResult = await client.ConnectAsync().ConfigureAwait(false); +``` + +## With Options + +The `HiveMQClientOptions` class provides a set of options that can be used to configure various aspects of the `HiveMQClient`. + +The easiest way to construct this class is to use `HiveMQClientOptionsBuilder`. + +```csharp +var options = new HiveMQClientOptionsBuilder(). + WithBroker('candy.x39.eu.hivemq.cloud'). + WithPort(8883). + WithUseTLS(true). + Build(); + +var client = new HiveMQClient(options); +var connectResult = await client.ConnectAsync().ConfigureAwait(false); +``` + +## `HiveMQClientOptionsBuilder` Reference + +To illustrate _each and every possible call_ with `HiveMQClientOptionsBuilder`, see the following example: + +```csharp +using HiveMQtt.MQTT5.Types; // For QualityOfService enum + +var options = new HiveMQClientOptionsBuilder() + .WithBroker("broker.hivemq.com") + .WithPort(1883) + .WithClientId("myClientId") + .WithAllowInvalidBrokerCertificates(true) + .WithUseTls(true) + .WithCleanStart(true) + .WithKeepAlive(60) + .WithAuthenticationMethod("UsernamePassword") + .WithAuthenticationData(Encoding.UTF8.GetBytes("authenticationData")) + .WithUserProperty("property1", "value1") + .WithUserProperties(new Dictionary { { "property1", "value1" }, { "property2", "value2" } }) + .WithLastWill(new LastWillAndTestament { + Topic = "lwt/topic", + PayloadAsString = "LWT message", + QoS = QualityOfService.AtLeastOnceDelivery, + Retain = true }) + .WithMaximumPacketSize(1024) + .WithReceiveMaximum(100) + .WithSessionExpiryInterval(3600) + .WithUserName("myUserName") + .WithPassword("myPassword") + .WithPreferIPv6(true) + .WithTopicAliasMaximum(10) + .WithRequestProblemInformation(true) + .WithRequestResponseInformation(true) + .Build(); +``` + +## See Also + +* [How to Set a Last Will & Testament](/docs/how-to/set-lwt) +* [Connect with TLS but allow Invalid TLS Certificates](/docs/how-to/allow-invalid-certs) +* [Securely Connect to a Broker with Basic Authentication Credentials](/docs/how-to/connect-with-auth) +* [Custom Client Certificates](/docs/how-to/client-certificates) +* [HiveMQClientOptionsBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs) diff --git a/Documentation/docs/events/Reference.md b/Documentation/docs/events/Reference.md index e78bd46d..ccb76514 100644 --- a/Documentation/docs/events/Reference.md +++ b/Documentation/docs/events/Reference.md @@ -1,11 +1,14 @@ --- sidebar_position: 1 --- -# Reference +# Event Reference -## List of Supported Events +This document provides a comprehensive list of events supported by the HiveMQtt client library. These events are categorized into two sections: General and Packet Level. + +## General Events + +General events are triggered by high-level operations such as connecting, subscribing, unsubscribing, and receiving messages. -### General | Event | EventArgs Class | Event Arguments | | ------------- | ------------------------ | -------------------- | @@ -17,9 +20,10 @@ sidebar_position: 1 | AfterUnsubscribe | `AfterUnsubscribeEventArgs` | `UnsubscribeResult` | | OnMessageReceived | `OnMessageReceivedEventArgs` | `MQTT5PublishMessage` | -### Packet Level +## Packet Level Events + +Packet level events are triggered by the underlying MQTT packet activity. These events provide a more granular level of control and can be useful for debugging or advanced use cases. -These events happen based on MQTT packet activity. | Event | EventArgs Class | Event Arguments | | ------------- | ------------------------ | -------------------- | diff --git a/Documentation/docs/events/_category_.json b/Documentation/docs/events/_category_.json index 90363069..5e3b88b0 100644 --- a/Documentation/docs/events/_category_.json +++ b/Documentation/docs/events/_category_.json @@ -1,4 +1,4 @@ { "label": "Lifecycle Events", - "position": 4 + "position": 9 } diff --git a/Documentation/docs/help.md b/Documentation/docs/help.md index e4cb8786..2724ef34 100644 --- a/Documentation/docs/help.md +++ b/Documentation/docs/help.md @@ -1,5 +1,5 @@ --- -sidebar_position: 6 +sidebar_position: 100 --- # Getting Help diff --git a/Documentation/docs/how-to/_category_.json b/Documentation/docs/how-to/_category_.json index 2147ae62..6f957d4e 100644 --- a/Documentation/docs/how-to/_category_.json +++ b/Documentation/docs/how-to/_category_.json @@ -1,6 +1,6 @@ { "label": "How To Guides", - "position": 4, + "position": 10, "link": { "type": "generated-index", "description": "Guides on how to do almost anything with HiveMQtt." diff --git a/Documentation/docs/how-to/allow-invalid-certs.md b/Documentation/docs/how-to/allow-invalid-certs.md index acf80564..0f9a8ce7 100644 --- a/Documentation/docs/how-to/allow-invalid-certs.md +++ b/Documentation/docs/how-to/allow-invalid-certs.md @@ -1,6 +1,10 @@ -# Connect with TLS but allow Invalid TLS Certificates +# Connecting with TLS and Allowing Invalid TLS Certificates -Use the `AllowInvalidBrokerCertificates` option in `HiveMQClientOptions` to disable the TLS certificate check upon connect. +In certain development or testing scenarios, you might need to connect to an MQTT broker that uses TLS but has an invalid or self-signed certificate. The HiveMQtt client library provides an option to disable the TLS certificate check upon connection, which can be useful in these situations. + +The `AllowInvalidBrokerCertificates` option in the `HiveMQClientOptions` class allows you to disable the TLS certificate check. + +Here's an example of how to use this option: ```csharp var options = new HiveMQClientOptionsBuilder() @@ -14,6 +18,14 @@ var client = new HiveMQClient(options); var connectResult = await client.ConnectAsync().ConfigureAwait(false); ``` +In this example, we first create an instance of HiveMQClientOptionsBuilder. We then set the broker address, port, and enable TLS. The WithAllowInvalidBrokerCertificates(true) method call disables the TLS certificate check. Finally, we build the options and use them to create a new HiveMQClient. + +:::note + +Disabling the TLS certificate check can expose your application to security risks, such as man-in-the-middle attacks. Therefore, this option should only be used in controlled environments for development or testing purposes. + +::: + ## See Also * [HiveMQClientOptionsBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs) diff --git a/Documentation/docs/how-to/publish.md b/Documentation/docs/how-to/publish.md deleted file mode 100644 index 8d133c2d..00000000 --- a/Documentation/docs/how-to/publish.md +++ /dev/null @@ -1,78 +0,0 @@ -# Publish - -## Simple - -The simple way to publish a message is to use the following API: - -```csharp -await client.PublishAsync( - "core/dynamic_graph/entity/227489", // Topic to publish to - "{'2023': '👍'}", // Message to publish - QualityOfService.AtMostOnceDelivery - ).ConfigureAwait(false); - -``` - -The 3 arguments are: - -1. The topic to publish to (`string`) -2. The message to publish (`string` or `byte[]`) -3. Quality of Service level (defaults to `AtMostOnceDelivery`) - -For the Quality of Service, see the [QualityOfService enum](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/QualityOfService.cs). - -But if you want more control and extended options for a publish, see the next section. - -## MQTT5PublishMessage - -The [MQTT5PublishMessage](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs) class represents the entirety of a publish message in MQTT. If you construct this class directly, you can access all of the MQTT publish options such as `Retain`, `PayloadFormatIndicator`, `UserProperties` and so forth. - -```csharp -var message = new MQTT5PublishMessage -{ - Topic = topic, - Payload = Encoding.ASCII.GetBytes(payload), - QoS = qos, -}; - -message.Retain = True -message.UserProperties.Add("Client-Geo", "-33.8688, 151.2093"); - -var result = await client.PublishAsync(message); -``` - -For the full details, see the source code on [MQTT5PublishMessage](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs). - -## PublishMessageBuilder - -The `PublishMessageBuilder` class provides a convenient way to construct MQTT publish messages with various options and properties. It allows you to customize the topic, payload, quality of service (QoS) level, retain flag, and other attributes of the message. - -```csharp -var publishMessage = new PublishMessageBuilder(). - WithTopic("topic1/example"). - WithPayload("{'HiveMQ': '👍'}"). - WithContentType("application/json") - WithResponseTopic("response/topic") - Build(); - -await client.PublishAsync(publishMessage).ConfigureAwait(false); -``` - -By using `PublishMessageBuilder`, you can easily construct MQTT publish messages with the desired properties and options. It provides a fluent and intuitive way to customize the topic, payload, QoS level, retain flag, and other attributes of the message. - -## Publish Return Value: `PublishResult` - -The `PublishAsync` method returns a `PublishResult` object. - -For `QualityOfService.AtMostOnceDelivery`, since it's a "fire-and-forget" method, it doesn't contain any useful information. - -For `QualityOfService.AtLeastOnceDelivery` (QoS level 1) and `QualityOfService.ExactlyOnceDelivery` (QoS level 2), the `PublishResult` contains `PublishResult.QoS1ReasonCode` and `PublishResult.QoS2ReasonCode` respectfully. - -For ease of use, you can call `PublishResult.ReasonCode()` to retrieve the appropriate result code automatically. - -## See Also - -* [PublishMessageBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/PublishMessageBuilder.cs) -* [MQTT5PublishMessage.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs) -* [QualityOfService.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/QualityOfService.cs) -* [PublishResult.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Results/PublishResult.cs) diff --git a/Documentation/docs/how-to/subscribe-multi.md b/Documentation/docs/how-to/subscribe-multi.md deleted file mode 100644 index 675b098c..00000000 --- a/Documentation/docs/how-to/subscribe-multi.md +++ /dev/null @@ -1,21 +0,0 @@ -# Subscribe to Multiple Topics At Once With Varying QoS Levels - -```csharp -using HiveMQtt.Client.Options; -using HiveMQtt.Client.Results; - -var options = new SubscribeOptions(); -options.TopicFilters.Add(new TopicFilter { Topic = "foo/boston", QoS = QualityOfService.AtLeastOnceDelivery }); -options.TopicFilters.Add(new TopicFilter { Topic = "bar/landshut", QoS = QualityOfService.AtMostOnceDelivery }); - -var result = await client.SubscribeAsync(options); -``` - -* `result.Subscriptions` contains the list of subscriptions made with this call -* `client.Subscriptions` is updated with complete list of subscriptions made up to this point -* each `Subscription` object has a resulting `ReasonCode` that represents the Subscribe result in `result.Subscriptions[0].ReasonCode` - -## See Also - -* [TopicFilter.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/TopicFilter.cs) -* [SubscribeOptionsBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs) diff --git a/Documentation/docs/how-to/subscribe.md b/Documentation/docs/how-to/subscribe.md deleted file mode 100644 index 61dcdfc7..00000000 --- a/Documentation/docs/how-to/subscribe.md +++ /dev/null @@ -1,70 +0,0 @@ -# Subscribe - -## Set Your Message Handlers First - -You can subscribe to one or many topics in MQTT. But to do so, you must first set a message handler. - -_Tip: Set your message handler before connecting to the MQTT broker as it may send messages before your handler is setup!_ - -```csharp -// Message Handler -client.OnMessageReceived += (sender, args) => -{ - Console.WriteLine("Message Received: {}", args.PublishMessage.PayloadAsString) -}; -``` - -or alternatively: - -```csharp -private static void MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) -{ - Console.WriteLine("Message Received: {}", eventArgs.PublishMessage.PayloadAsString) -} - -client.OnMessageReceived += MessageHandler; -``` - -* See: [OnMessageReceivedEventArgs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs) - -## Basic - -To subscribe to a single topic with a Quality of Service level, use `SubscribeAsync` as follows. - -```csharp -// Subscribe -var subscribeResult = await client.SubscribeAsync("instrument/x9284/boston", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); - -assert subscribeResult.Subscriptions.Length() == 1 -assert subscribeResult.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS1 -``` - -## Using `SubscribeOptions` - -To utilize the complete set of options for `SubscribeAsync`, create a `SubscribeOptions` object. - -```csharp -var topic1 = "instrument/x9284/boston" -var topic2 = "instrument/x9284/austin" -var qos = QualityOfService.AtLeastOnceDelivery; - -var subscribeOptions = new SubscribeOptions(); - -var tf1 = new TopicFilter(topic1, qos); -var tf2 = new TopicFilter(topic2, qos); - -subscribeOptions.TopicFilters.Add(tf1); -subscribeOptions.TopicFilters.Add(tf2); - -subscribeOptions.UserProperties.Add("Client-Geo", "-33.8688, 151.2093"); - -var result = await client.SubscribeAsync(subscribeOptions).ConfigureAwait(false); -``` - -## See Also - -* [TopicFilter](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/TopicFilter.cs) -* [SubscribeOptions](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Options/SubscribeOptions.cs) -* [SubscribeResult](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Results/SubscribeResult.cs) -* [MQTT Topics, Wildcards, & Best Practices](https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/) -* [MQTT Topic Tree & Topic Matching](https://www.hivemq.com/article/mqtt-topic-tree-matching-challenges-best-practices-explained/) diff --git a/Documentation/docs/publishing.md b/Documentation/docs/publishing.md new file mode 100644 index 00000000..37cce853 --- /dev/null +++ b/Documentation/docs/publishing.md @@ -0,0 +1,111 @@ +--- +sidebar_position: 6 +--- + +# Publishing Messages + +In MQTT, "publish" is an operation that allows an MQTT client to send a message to an MQTT broker, which then distributes the message to all subscribed clients interested in the topic of the message. + +## Simple + +Use the PublishAsync method to publish a payload to the desired topic by providing the topic string and payload as parameters. + +```csharp +var publishResult = await client.PublishAsync("topic1/example", "Hello Payload") +``` + +Optionally, you can specify the desired quality of service (QoS) level for the publish. By default, the QoS level is set to `QualityOfService.AtMostOnceDelivery`. + +```csharp +using HiveMQtt.MQTT5.Types; // For the QualityOfService enum + +var publishResult = await client.PublishAsync("topic1/example", "Hello Payload", QualityOfService.ExactlyOnceDelivery) +``` + +## With Options + +The `PublishMessageBuilder` class provides a convenient way to construct MQTT publish messages with various options and properties. It allows you to customize the topic, payload, quality of service (QoS) level, retain flag, and other attributes of the message. + +```csharp +var publishMessage = new PublishMessageBuilder(). + WithTopic("topic1/example"). + WithPayload("{'HiveMQ': '👍'}"). + WithContentType("application/json") + WithResponseTopic("response/topic") + Build(); + +await client.PublishAsync(publishMessage).ConfigureAwait(false); +``` + +By using `PublishMessageBuilder`, you can easily construct MQTT publish messages with the desired properties and options. It provides a fluent and intuitive way to customize the topic, payload, QoS level, retain flag, and other attributes of the message. + +### `PublishMessagebuilder` Reference + +To illustrate _each and every possible call_ with `PublishMessageBuilder`, see the following example: + +```csharp +var publishMessage = new PublishMessageBuilder() + .WithTopic("topic1/example") + .WithPayload("Hello, HiveMQtt!") + .WithQualityOfServiceLevel(QualityOfService.AtLeastOnceDelivery) + .WithRetainFlag(true) + .WithPayloadFormatIndicator(MQTT5PayloadFormatIndicator.UTF8Encoded) + .WithContentType("application/json") + .WithResponseTopic("response/topic") + .WithCorrelationData(Encoding.UTF8.GetBytes("correlation-data")) + .WithUserProperty("property1", "value1") + .WithUserProperties(new Dictionary { { "property1", "value1" }, { "property2", "value2" } }); + .WithMessageExpiryInterval(3600) + .WithSubscriptionIdentifier(123) + .WithSubscriptionIdentifiers(1, 2, 3) + .WithTopicAlias(456) + .WithContentTypeAlias(789) + .WithResponseTopicAlias(987) + .Build() +``` + +## MQTT5PublishMessage + +The [MQTT5PublishMessage](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs) class represents the entirety of a publish message in MQTT. If you construct this class directly, you can access all of the MQTT publish options such as `Retain`, `PayloadFormatIndicator`, `UserProperties` and so forth. + +```csharp +var message = new MQTT5PublishMessage +{ + Topic = topic, + Payload = Encoding.ASCII.GetBytes(payload), + QoS = qos, +}; + +message.Retain = True +message.UserProperties.Add("Client-Geo", "-33.8688, 151.2093"); + +var result = await client.PublishAsync(message); +``` + +For the full details, see the source code on [MQTT5PublishMessage](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs). + + +## Return Value of Publish: `PublishResult` + +The `PublishAsync` method in the HiveMQtt client library returns an instance of the `PublishResult` class. This object provides detailed information about the outcome of the publish operation. + +## Quality of Service (QoS) and `PublishResult` + +The information contained in the `PublishResult` object varies depending on the Quality of Service (QoS) level used for the publish operation. + +* QoS Level 0 (`QualityOfService.AtMostOnceDelivery`): This level is often referred to as "fire-and-forget". It does not provide any acknowledgement of delivery, and as such, the `PublishResult` object does not contain any meaningful information. + +* QoS Level 1 (`QualityOfService.AtLeastOnceDelivery`): This level ensures that the message is delivered at least once. The `PublishResult` object for this QoS level contains a `QoS1ReasonCode` property, which provides information about the outcome of the publish operation. + +* QoS Level 2 (`QualityOfService.ExactlyOnceDelivery`): This level ensures that the message is delivered exactly once. The `PublishResult` object for this QoS level contains a `QoS2ReasonCode` property, which provides information about the outcome of the publish operation. + +## Retrieving the Reason Code + +For convenience, the `PublishResult` class provides a `ReasonCode` method. This method automatically retrieves the appropriate reason code (`QoS1ReasonCode` or `QoS2ReasonCode`) based on the QoS level of the publish operation. This allows you to easily access the outcome of the publish operation without having to check the QoS level manually. + +## See Also + +* [PublishMessageBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/PublishMessageBuilder.cs) +* [MQTT5PublishMessage.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/MQTT5PublishMessage.cs) +* [QualityOfService.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/QualityOfService.cs) +* [PublishResult.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Results/PublishResult.cs) diff --git a/Documentation/docs/quickstart.md b/Documentation/docs/quickstart.md index eaaddbdd..0d42eaeb 100644 --- a/Documentation/docs/quickstart.md +++ b/Documentation/docs/quickstart.md @@ -66,245 +66,4 @@ var subscribeResult = await client.SubscribeAsync(subscribeOptions); var publishResult = await client.PublishAsync("topic1/example", "Hello Payload") ``` -## Connecting -### with Defaults - -Without any options given, the `HiveMQClient` will search on `localhost` port 1883 for an unsecured broker. - -If you don't have a broker at this location, see the next sections. - -```csharp -using HiveMQtt.Client; - -// Connect -var client = new HiveMQClient(); -var connectResult = await client.ConnectAsync().ConfigureAwait(false); -``` - -### With Options - -The `HiveMQClientOptions` class provides a set of options that can be used to configure various aspects of the `HiveMQClient`. - -The easiest way to construct this class is to use `HiveMQClientOptionsBuilder`. - -```csharp -var options = new HiveMQClientOptionsBuilder(). - WithBroker('candy.x39.eu.hivemq.cloud'). - WithPort(8883). - WithUseTLS(true). - Build(); - -var client = new HiveMQClient(options); -var connectResult = await client.ConnectAsync().ConfigureAwait(false); -``` - -### Reference - -To illustrate _each and every possible call_ with `HiveMQClientOptionsBuilder`, see the following example: - -```csharp -using HiveMQtt.MQTT5.Types; // For QualityOfService enum - -var options = new HiveMQClientOptionsBuilder() - .WithBroker("broker.hivemq.com") - .WithPort(1883) - .WithClientId("myClientId") - .WithAllowInvalidBrokerCertificates(true) - .WithUseTls(true) - .WithCleanStart(true) - .WithKeepAlive(60) - .WithAuthenticationMethod("UsernamePassword") - .WithAuthenticationData(Encoding.UTF8.GetBytes("authenticationData")) - .WithUserProperty("property1", "value1") - .WithUserProperties(new Dictionary { { "property1", "value1" }, { "property2", "value2" } }) - .WithLastWill(new LastWillAndTestament { - Topic = "lwt/topic", - PayloadAsString = "LWT message", - QoS = QualityOfService.AtLeastOnceDelivery, - Retain = true }) - .WithMaximumPacketSize(1024) - .WithReceiveMaximum(100) - .WithSessionExpiryInterval(3600) - .WithUserName("myUserName") - .WithPassword("myPassword") - .WithPreferIPv6(true) - .WithTopicAliasMaximum(10) - .WithRequestProblemInformation(true) - .WithRequestResponseInformation(true) - .Build(); -``` - -## Subscribing - -In MQTT, "subscribe" is an operation that allows an MQTT client to request to receive messages published to specific topics from an MQTT broker. - -### Simple - -Use the SubscribeAsync method to subscribe to the desired topic by providing the topic string as a parameter. - -```csharp -await client.SubscribeAsync("instrument/x9284/boston").ConfigureAwait(false); -``` - -Optionally, you can specify the desired quality of service (QoS) level for the subscription. By default, the QoS level is set to `QualityOfServiceLevel.AtMostOnce`. - -```csharp -using HiveMQtt.MQTT5.Types; // For QualityOfService enum - -string topic = "my/topic"; -QualityOfService qos = QualityOfService.AtLeastOnceDelivery; - -await client.SubscribeAsync(topic, qosLevel); -``` - -### With Options - -The `SubscribeOptionsBuilder` class provides a convenient way to construct subscription options for MQTT subscriptions. It allows you to configure various aspects of the subscription(s), including the topic filter, quality of service (QoS) level, user properties, and more. - -To use the SubscribeOptionsBuilder: - -Create an instance of the SubscribeOptionsBuilder class. - -```csharp -var builder = new SubscribeOptionsBuilder(); -``` - -Use the `WithSubscription` method to specify the topic filter and QoS level for the subscription. This method can be called multiple times to create multiple subscriptions at once. - -```csharp -builder.WithSubscription("topic1", QualityOfService.AtLeastOnceDelivery) - .WithSubscription("topic2", QualityOfService.ExactlyOnceDelivery); -``` - -Optionally, you can use the `WithUserProperties` method to add custom user properties to the subscription. User properties are key-value pairs that provide additional metadata or application-specific information. - -```csharp -var userProperties = new Dictionary -{ - { "property1", "value1" }, - { "property2", "value2" } -}; - -builder.WithUserProperties(userProperties); -``` - -There also exists a singular `WithUserProperty` if you just need to send one key-value pair: - -```csharp -builder.WithUserProperty("property1", "value1") -``` - -Call the Build method to create the SubscribeOptions object. - -```csharp -var options = builder.Build(); -``` - -Use the created `SubscribeOptions` object when subscribing to MQTT topics using the MQTT client library. - -```csharp -await client.SubscribeAsync(options); -``` - -By using the `SubscribeOptionsBuilder`, you can easily configure multiple subscriptions with different topic filters and QoS levels. Additionally, you have the flexibility to include custom user properties to provide additional information or metadata for the subscriptions. - -### Reference - -To illustrate _each and every possible call_ with `SubscribeOptionsBuilder`, see the following example: - -```csharp -using HiveMQtt.MQTT5.Types; - -var options = new SubscribeOptionsBuilder(). - .WithSubscription( - "topic1", // Topic - QualityOfService.ExactlyOnceDelivery, // Quality of Service Level - true, // NoLocal - true, // RetainAsPublished - RetainHandling.SendAtSubscribe // RetainHandling - ). - WithUserProperty("property1", "value1"). - WithUserProperties( - new Dictionary { - { "property1", "value1" }, { "property2", "value2" } }). - Build(); - -``` - -In `WithSubscription`, the first two arguments are required. The additional optional parameters are defined as: - -* NoLocal: The NoLocal option, when set to true, indicates that the subscriber does not want to receive messages published by itself. This option is useful in scenarios where a client is publishing and subscribing to the same topic. By setting NoLocal to true, the client can avoid receiving its own published messages. - -* RetainAsPublished: The RetainAsPublished option, when set to false, indicates that the broker should not send retained messages to the subscriber when it first subscribes to a topic. Retained messages are those that are stored by the broker and sent to new subscribers upon subscription. By setting RetainAsPublished to false, the subscriber will not receive any retained messages for that topic. - -* Retain handling: Retain handling refers to the behavior of the broker when it receives a subscription request for a topic that has retained messages. In MQTT 5, there are three options for retain handling: - - * `RetainHandling.SendAtSubscribe`: The broker sends any retained messages for the topic to the subscriber immediately upon subscription. - * `RetainHandling.SendAtSubscribeIfNewSubscription`: The broker sends retained messages to new subscribers only if there are no existing subscriptions for that topic. - * `RetainHandling.DoNotSendAtSubscribe`: The broker does not send any retained messages to the subscriber upon subscription. - -These options provide flexibility and control over the behavior of the subscription process in MQTT 5, allowing subscribers to customize their experience based on their specific requirements. - - -## Publishing - -In MQTT, "publish" is an operation that allows an MQTT client to send a message to an MQTT broker, which then distributes the message to all subscribed clients interested in the topic of the message. - -### Simple - -Use the PublishAsync method to publish a payload to the desired topic by providing the topic string and payload as parameters. - -```csharp -var publishResult = await client.PublishAsync("topic1/example", "Hello Payload") -``` - -Optionally, you can specify the desired quality of service (QoS) level for the publish. By default, the QoS level is set to `QualityOfService.AtMostOnceDelivery`. - -```csharp -using HiveMQtt.MQTT5.Types; // For the QualityOfService enum - -var publishResult = await client.PublishAsync("topic1/example", "Hello Payload", QualityOfService.ExactlyOnceDelivery) -``` - -### With Options - -The `PublishMessageBuilder` class provides a convenient way to construct MQTT publish messages with various options and properties. It allows you to customize the topic, payload, quality of service (QoS) level, retain flag, and other attributes of the message. - -```csharp -var publishMessage = new PublishMessageBuilder(). - WithTopic("topic1/example"). - WithPayload("{'HiveMQ': '👍'}"). - WithContentType("application/json") - WithResponseTopic("response/topic") - Build(); - -await client.PublishAsync(publishMessage).ConfigureAwait(false); -``` - -By using `PublishMessageBuilder`, you can easily construct MQTT publish messages with the desired properties and options. It provides a fluent and intuitive way to customize the topic, payload, QoS level, retain flag, and other attributes of the message. - -### Reference - -To illustrate _each and every possible call_ with `PublishMessageBuilder`, see the following example: - -```csharp -var publishMessage = new PublishMessageBuilder() - .WithTopic("topic1/example") - .WithPayload("Hello, HiveMQtt!") - .WithQualityOfServiceLevel(QualityOfService.AtLeastOnceDelivery) - .WithRetainFlag(true) - .WithPayloadFormatIndicator(MQTT5PayloadFormatIndicator.UTF8Encoded) - .WithContentType("application/json") - .WithResponseTopic("response/topic") - .WithCorrelationData(Encoding.UTF8.GetBytes("correlation-data")) - .WithUserProperty("property1", "value1") - .WithUserProperties(new Dictionary { { "property1", "value1" }, { "property2", "value2" } }); - .WithMessageExpiryInterval(3600) - .WithSubscriptionIdentifier(123) - .WithSubscriptionIdentifiers(1, 2, 3) - .WithTopicAlias(456) - .WithContentTypeAlias(789) - .WithResponseTopicAlias(987) - .Build() -``` diff --git a/Documentation/docs/subscribing.md b/Documentation/docs/subscribing.md new file mode 100644 index 00000000..32d871df --- /dev/null +++ b/Documentation/docs/subscribing.md @@ -0,0 +1,247 @@ +--- +sidebar_position: 5 +--- + +# Subscribing to Topics + +In MQTT, "subscribe" is an operation that allows an MQTT client to request to receive messages published to specific topics from an MQTT broker. + + +## Simple + +Use the SubscribeAsync method to subscribe to the desired topic by providing the topic string as a parameter. + +```csharp +await client.SubscribeAsync("instrument/x9284/boston").ConfigureAwait(false); +``` + +Optionally, you can specify the desired quality of service (QoS) level for the subscription. By default, the QoS level is set to `QualityOfServiceLevel.AtMostOnce`. + +```csharp +using HiveMQtt.MQTT5.Types; // For QualityOfService enum + +string topic = "my/topic"; +QualityOfService qos = QualityOfService.AtLeastOnceDelivery; + +await client.SubscribeAsync(topic, qosLevel); +``` + +:::tip + +Make sure to set your message handlers **before subscribing**. See [this section](#important-tip-prioritize-setting-your-message-handlers) below for more details. + +::: + +## With Options + +The `SubscribeOptionsBuilder` class provides a convenient way to construct subscription options for MQTT subscriptions. It allows you to configure various aspects of the subscription(s), including the topic filter, quality of service (QoS) level, user properties, and more. + +To use the SubscribeOptionsBuilder: + +Create an instance of the SubscribeOptionsBuilder class. + +```csharp +var builder = new SubscribeOptionsBuilder(); +``` + +Use the `WithSubscription` method to specify the topic filter and QoS level for the subscription. This method can be called multiple times to create multiple subscriptions at once. + +```csharp +builder.WithSubscription("topic1", QualityOfService.AtLeastOnceDelivery) + .WithSubscription("topic2", QualityOfService.ExactlyOnceDelivery); +``` + +Optionally, you can use the `WithUserProperties` method to add custom user properties to the subscription. User properties are key-value pairs that provide additional metadata or application-specific information. + +```csharp +var userProperties = new Dictionary +{ + { "property1", "value1" }, + { "property2", "value2" } +}; + +builder.WithUserProperties(userProperties); +``` + +There also exists a singular `WithUserProperty` if you just need to send one key-value pair: + +```csharp +builder.WithUserProperty("property1", "value1") +``` + +Call the Build method to create the SubscribeOptions object. + +```csharp +var options = builder.Build(); +``` + +Use the created `SubscribeOptions` object when subscribing to MQTT topics using the MQTT client library. + +```csharp +await client.SubscribeAsync(options); +``` + +By using the `SubscribeOptionsBuilder`, you can easily configure multiple subscriptions with different topic filters and QoS levels. Additionally, you have the flexibility to include custom user properties to provide additional information or metadata for the subscriptions. + +### `SubscribeOptionsBuilder` Reference + +To illustrate _each and every possible call_ with `SubscribeOptionsBuilder`, see the following example: + +```csharp +using HiveMQtt.MQTT5.Types; + +var options = new SubscribeOptionsBuilder(). + .WithSubscription( + "topic1", // Topic + QualityOfService.ExactlyOnceDelivery, // Quality of Service Level + true, // NoLocal + true, // RetainAsPublished + RetainHandling.SendAtSubscribe // RetainHandling + ). + WithUserProperty("property1", "value1"). + WithUserProperties( + new Dictionary { + { "property1", "value1" }, { "property2", "value2" } }). + Build(); + +``` + +In `WithSubscription`, the first two arguments are required. The additional optional parameters are defined as: + +* `NoLoca`l: The NoLocal option, when set to true, indicates that the subscriber does not want to receive messages published by itself. This option is useful in scenarios where a client is publishing and subscribing to the same topic. By setting NoLocal to true, the client can avoid receiving its own published messages. + +* `RetainAsPublished`: The RetainAsPublished option, when set to false, indicates that the broker should not send retained messages to the subscriber when it first subscribes to a topic. Retained messages are those that are stored by the broker and sent to new subscribers upon subscription. By setting RetainAsPublished to false, the subscriber will not receive any retained messages for that topic. + +* `Retain handling`: Retain handling refers to the behavior of the broker when it receives a subscription request for a topic that has retained messages. In MQTT 5, there are three options for retain handling: + + * `RetainHandling.SendAtSubscribe`: The broker sends any retained messages for the topic to the subscriber immediately upon subscription. + * `RetainHandling.SendAtSubscribeIfNewSubscription`: The broker sends retained messages to new subscribers only if there are no existing subscriptions for that topic. + * `RetainHandling.DoNotSendAtSubscribe`: The broker does not send any retained messages to the subscriber upon subscription. + +These options provide flexibility and control over the behavior of the subscription process in MQTT 5, allowing subscribers to customize their experience based on their specific requirements. + +## Important Tip: Prioritize Setting Your Message Handlers + +In MQTT communication, the message handler is responsible for processing incoming messages from the broker. It's crucial to set up your message handler before establishing a connection to the MQTT broker. + +Why is this order important? Once a connection is established, the broker may start sending messages immediately, especially if there are retained messages for the topics you're subscribing to. If the message handler is not set up in advance, these incoming messages might not be processed, leading to potential data loss or unexpected behavior. + + + +```csharp +// Message Handler +client.OnMessageReceived += (sender, args) => +{ + Console.WriteLine("Message Received: {}", args.PublishMessage.PayloadAsString) +}; + +await client.ConnectAsync(); +``` + +or alternatively: + +```csharp +private static void MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) +{ + Console.WriteLine("Message Received: {}", eventArgs.PublishMessage.PayloadAsString) +} + +client.OnMessageReceived += MessageHandler; +await client.ConnectAsync(); +``` + +In this example, the message handler is defined as a lambda function that writes the received message to the console. Only after the message handler is set up do we connect to the broker using the ConnectAsync method. + +Remember, prioritizing the setup of your message handler ensures that your application is ready to process incoming messages as soon as the connection to the broker is established. + +* See Also: [OnMessageReceivedEventArgs.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs) + +## Subscribe: Multiple Topics At Once + +```csharp +using HiveMQtt.Client.Options; +using HiveMQtt.Client.Results; + +var options = new SubscribeOptions(); +options.TopicFilters.Add(new TopicFilter { Topic = "foo/boston", QoS = QualityOfService.AtLeastOnceDelivery }); +options.TopicFilters.Add(new TopicFilter { Topic = "bar/landshut", QoS = QualityOfService.AtMostOnceDelivery }); + +var result = await client.SubscribeAsync(options); +``` + +* `result.Subscriptions` contains the list of subscriptions made with this call +* `client.Subscriptions` is updated with complete list of subscriptions made up to this point +* each `Subscription` object has a resulting `ReasonCode` that represents the Subscribe result in `result.Subscriptions[0].ReasonCode` + +### Using `SubscribeOptionsBuilder` + +```csharp +var subscribeOptions = new SubscribeOptionsBuilder() + .WithSubscription("my/topic1", MQTT5.Types.QualityOfService.AtLeastOnceDelivery) + .WithSubscription("my/topic/2", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, true, true, MQTT5.Types.RetainHandling.SendAtSubscribe) + .WithUserProperty("Client-Geo", "38.115662, 13.361470") + .Build(); + +var subResult = await subClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false); +``` + +## Per Subscription Callbacks + +### Introduction + +The `SubscribeOptionsBuilder` class in the HiveMQtt client library provides a convenient way to configure subscription options for MQTT subscriptions. One of the key features of the `SubscribeOptionsBuilder` is the ability to specify per subscription callbacks using the `WithSubscription` method. This allows you to define custom event handlers that will be invoked when messages are received for specific topics. + +### The Problem + +In MQTT communication, it is common to have different subscriptions for different topics, each requiring specific handling or processing of the received messages. The challenge is to associate a specific callback or event handler with each subscription, so that the appropriate logic can be executed when messages are received for those topics. + +### Per Subscription Callbacks with `WithSubscription`` + +The `WithSubscription` method of the `SubscribeOptionsBuilder` class provides a solution to this problem. It allows you to specify a topic filter and an event handler that will be associated with that topic filter. The event handler will be invoked whenever a message is received for the subscribed topic. + +The signature of the `WithSubscription` method is as follows: + +```csharp +public SubscribeOptionsBuilder WithSubscription(TopicFilter topicFilter, EventHandler? handler = null) +``` + +Here's an example of how you might use the `WithSubscription` method to set up a per subscription callback: + +```csharp +var builder = new SubscribeOptionsBuilder(); +var options = builder.WithSubscription( + new TopicFilter("test/topic", QualityOfService.AtLeastOnceDelivery), + (sender, e) => + { + Console.WriteLine($"Message received on topic {e.Topic}: {e.Message}"); + }) + .Build(); +``` + +In this example, we first create an instance of `SubscribeOptionsBuilder`. Then we call the `WithSubscription` method to add a subscription with a topic filter and an event handler. The event handler is a lambda function that writes a message to the console whenever a message is received on the subscribed topic. Finally, we call the Build method to create the SubscribeOptions. + +Alternatively the message handler can be independently defined: + +```csharp +private static void MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) +{ + Console.WriteLine("Message Received: {}", eventArgs.PublishMessage.PayloadAsString) +} + +var builder = new SubscribeOptionsBuilder(); +var options = builder.WithSubscription( + new TopicFilter("test/topic", QualityOfService.AtLeastOnceDelivery), + MessageHandler) + .Build(); + +``` + +## See Also + +* [MQTT Topics, Wildcards, & Best Practices](https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/) +* [MQTT Topic Tree & Topic Matching](https://www.hivemq.com/article/mqtt-topic-tree-matching-challenges-best-practices-explained/) +* [TopicFilter.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/MQTT5/Types/TopicFilter.cs) +* [SubscribeOptions.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Options/SubscribeOptions.cs) +* [SubscribeResult.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/Results/SubscribeResult.cs) +* [SubscribeOptionsBuilder.cs](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs) + diff --git a/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs b/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs index d6591fe7..8299fd14 100644 --- a/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs +++ b/Source/HiveMQtt/Client/Events/OnMessageReceivedEventArgs.cs @@ -20,7 +20,7 @@ namespace HiveMQtt.Client.Events; /// /// Event arguments for the event. /// This event is called when a message is received from the broker. -/// contains the received message. +/// contains the received message. /// public class OnMessageReceivedEventArgs : EventArgs { diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 0d737189..7e5e1334 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -284,20 +284,20 @@ public async Task SubscribeAsync(SubscribeOptions options) // Fire the corresponding event this.BeforeSubscribeEventLauncher(options); + // FIXME: We should only ever have one subscribe in flight at any time (for now) + // Construct the MQTT Connect packet var packetIdentifier = this.GeneratePacketIdentifier(); var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier); + // Setup the task completion source to wait for the SUBACK var taskCompletionSource = new TaskCompletionSource(); void TaskHandler(object? sender, OnSubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.SubAckPacket); - - // FIXME: We should only ever have one subscribe in flight at any time (for now) EventHandler eventHandler = TaskHandler; this.OnSubAckReceived += eventHandler; - // Construct the MQTT Connect packet and queue to send + // Queue the constructed packet to be sent on the wire this.sendQueue.Enqueue(subscribePacket); - // FIXME: Cancellation token and better timeout value SubAckPacket subAck; SubscribeResult subscribeResult; try @@ -318,7 +318,19 @@ public async Task SubscribeAsync(SubscribeOptions options) subscribeResult = new SubscribeResult(options, subAck); // Add the subscriptions to the client - this.Subscriptions.AddRange(subscribeResult.Subscriptions); + foreach (var subscription in subscribeResult.Subscriptions) + { + // If the user has registered a handler for this topic, add it to the subscription + foreach (var handler in options.Handlers) + { + if (handler.Key == subscription.TopicFilter.Topic) + { + subscription.MessageReceivedHandler = handler.Value; + } + } + + this.Subscriptions.Add(subscription); + } // Fire the corresponding event this.AfterSubscribeEventLauncher(subscribeResult); diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 5a8b7847..31d18ad8 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -134,7 +134,18 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) { var eventArgs = new OnMessageReceivedEventArgs(packet.Message); Logger.Trace("OnMessageReceivedEventLauncher"); + + // Global Event Handler this.OnMessageReceived?.Invoke(this, eventArgs); + + // Per Subscription Event Handler + foreach (var subscription in this.Subscriptions) + { + if (subscription.TopicFilter.Topic == packet.Message.Topic) + { + subscription.MessageReceivedHandler?.Invoke(this, eventArgs); + } + } } /* ========================================================================================= */ diff --git a/Source/HiveMQtt/Client/LastWillAndTestament.cs b/Source/HiveMQtt/Client/LastWillAndTestament.cs index ad9d06ab..3ca10007 100644 --- a/Source/HiveMQtt/Client/LastWillAndTestament.cs +++ b/Source/HiveMQtt/Client/LastWillAndTestament.cs @@ -35,7 +35,7 @@ public class LastWillAndTestament /// The Quality of Service level for the Last Will and Testament. /// The UTF-8 encoded payload of the Last Will and Testament. /// A value indicating whether the Last Will and Testament should be retained by the MQTT broker when published. - [Obsolete("Use the LastWillAndTestament constructor that uses QualityOfService with a default value instead.")] + [Obsolete("Use the LastWillAndTestament constructor that uses QualityOfService with a default value instead. Argument order: topic, payload, qos?, retain?.")] public LastWillAndTestament(string topic, QualityOfService? qos, string payload, bool retain = false) { this.Topic = topic; @@ -64,7 +64,7 @@ public LastWillAndTestament(string topic, QualityOfService? qos, string payload, /// The Quality of Service level for the Last Will and Testament. /// The byte payload of the Last Will and Testament. /// A value indicating whether the Last Will and Testament should be retained by the MQTT broker when published. - [Obsolete("Use the LastWillAndTestament constructor that uses QualityOfService with a default value instead.")] + [Obsolete("Use the LastWillAndTestament constructor that uses QualityOfService with a default value instead. Argument order: topic, payload, qos?, retain?.")] public LastWillAndTestament(string topic, QualityOfService? qos, byte[] payload, bool retain = false) { this.Topic = topic; diff --git a/Source/HiveMQtt/Client/Options/SubscribeOptions.cs b/Source/HiveMQtt/Client/Options/SubscribeOptions.cs index 6da8c8e7..2943e010 100644 --- a/Source/HiveMQtt/Client/Options/SubscribeOptions.cs +++ b/Source/HiveMQtt/Client/Options/SubscribeOptions.cs @@ -15,6 +15,7 @@ */ namespace HiveMQtt.Client.Options; +using HiveMQtt.Client.Events; using HiveMQtt.Client.Exceptions; using HiveMQtt.MQTT5.Types; @@ -41,6 +42,16 @@ public SubscribeOptions() /// public List TopicFilters { get; set; } + /// + /// Gets or sets the handlers for this subscribe. + /// + /// Per subscription callbacks can be registered here. The key is the topic filter + /// and the value is the handler. If the topic string isn't found in one of the + /// List<TopicFilters>, the handler will not be registered for that subscription. + /// + /// + public Dictionary> Handlers { get; set; } = new Dictionary>(); + /// /// Validate that the options in this instance are valid. /// diff --git a/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs b/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs index fd98c3ab..b921cfcd 100644 --- a/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs +++ b/Source/HiveMQtt/Client/SubscribeOptionsBuilder.cs @@ -15,6 +15,7 @@ */ namespace HiveMQtt.Client; +using HiveMQtt.Client.Events; using HiveMQtt.Client.Options; using HiveMQtt.MQTT5.Types; @@ -35,10 +36,49 @@ public SubscribeOptionsBuilder() /// A boolean indicating whether this client will receive the messages published by this client. /// A boolean indicating whether Application Messages forwarded using this subscription keep the RETAIN flag they were published with. /// A RetainHandling value indicating whether retained messages are sent when the subscription is established. + /// A message handler for the subscription. /// SubscribeOptionsBuilder to continue the build process. - public SubscribeOptionsBuilder WithSubscription(string topic, QualityOfService qos, bool? noLocal = null, bool? retainAsPublished = null, RetainHandling? retainHandling = RetainHandling.SendAtSubscribe) + public SubscribeOptionsBuilder WithSubscription(string topic, QualityOfService qos, bool? noLocal = null, bool? retainAsPublished = null, RetainHandling? retainHandling = RetainHandling.SendAtSubscribe, EventHandler? messageReceivedHandler = null) { this.options.TopicFilters.Add(new TopicFilter(topic, qos, noLocal, retainAsPublished, retainHandling)); + if (messageReceivedHandler != null) + { + this.options.Handlers.Add(topic, messageReceivedHandler); + } + + return this; + } + + /// + /// Adds a subscription to the list of subscriptions to be sent in the subscribe call. This variation allows + /// the caller to specify a message handler for the subscription (aka per subscription callback). + /// + /// The TopicFilter for the subscription. + /// The message handler for the subscription. + /// SubscribeOptionsBuilder to continue the build process. + public SubscribeOptionsBuilder WithSubscription(TopicFilter topicFilter, EventHandler? handler = null) + { + this.options.TopicFilters.Add(topicFilter); + if (handler != null) + { + this.options.Handlers.Add(topicFilter.Topic, handler); + } + + return this; + } + + /// + /// Adds one or many subscriptions at once given the provided list of TopicFilters. + /// + /// The list of TopicFilters to be added to the subscriptions. + /// SubscribeOptionsBuilder to continue the build process. + public SubscribeOptionsBuilder WithSubscriptions(IEnumerable topicFilters) + { + foreach (var topicFilter in topicFilters) + { + this.options.TopicFilters.Add(topicFilter); + } + return this; } @@ -87,6 +127,11 @@ public SubscribeOptionsBuilder WithUserProperties(Dictionary use return this; } + /// + /// Builds the SubscribeOptions based on the previous calls. This + /// step will also run validation on the final SubscribeOptions. + /// + /// The constructed SubscribeOptions instance. public SubscribeOptions Build() { this.options.Validate(); diff --git a/Source/HiveMQtt/MQTT5/MalformedPacket.cs b/Source/HiveMQtt/MQTT5/MalformedPacket.cs index 557e1cb1..165d92fc 100644 --- a/Source/HiveMQtt/MQTT5/MalformedPacket.cs +++ b/Source/HiveMQtt/MQTT5/MalformedPacket.cs @@ -22,7 +22,9 @@ namespace HiveMQtt.MQTT5; /// internal class MalformedPacket : ControlPacket { +#pragma warning disable IDE0052 private readonly ReadOnlySequence packetData; +#pragma warning restore IDE0052 public MalformedPacket(ReadOnlySequence buffer) => this.packetData = buffer; diff --git a/Source/HiveMQtt/MQTT5/Types/Subscription.cs b/Source/HiveMQtt/MQTT5/Types/Subscription.cs index cfc5d469..8f6ff736 100644 --- a/Source/HiveMQtt/MQTT5/Types/Subscription.cs +++ b/Source/HiveMQtt/MQTT5/Types/Subscription.cs @@ -15,6 +15,7 @@ */ namespace HiveMQtt.MQTT5.Types; +using HiveMQtt.Client.Events; using HiveMQtt.MQTT5.ReasonCodes; public class Subscription @@ -26,6 +27,11 @@ public class Subscription /// public TopicFilter TopicFilter { get; } + /// + /// Gets or sets the message handler for the subscription. + /// + public EventHandler? MessageReceivedHandler { get; set; } + /// /// Gets or sets the reason code (result) for the subscribe call. /// diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ClientOptionsBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/ClientOptionsBuilderTest.cs index 27001e96..faa35042 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ClientOptionsBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ClientOptionsBuilderTest.cs @@ -4,7 +4,6 @@ namespace HiveMQtt.Test.HiveMQClient; using Xunit; using HiveMQtt.Client; -using HiveMQtt.MQTT5.Types; public class ClientOptionsBuilderTest { @@ -42,7 +41,7 @@ public void Build_WithValidParameters_ReturnsValidOptions( .WithAuthenticationMethod(authMethod) .WithAuthenticationData(Encoding.UTF8.GetBytes(authData)) .WithUserProperties(new Dictionary { { "property1", "value1" }, { "property2", "value2" } }) - .WithLastWillAndTestament(new LastWillAndTestament("lwt/topic", QualityOfService.AtLeastOnceDelivery, "LWT message", true)) + .WithLastWillAndTestament(new LastWillAndTestament("lwt/topic", "LWT message")) .WithMaximumPacketSize(1024) .WithReceiveMaximum(100) .WithSessionExpiryInterval(3600) diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs index c6772b13..7c8c7c5c 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs @@ -10,7 +10,6 @@ namespace HiveMQtt.Test.HiveMQClient; public class ConnectTest { - /// TODO: Add out of order tests: connect when connected, disconnect when not connected, etc. [Fact] public async Task Basic_Connect_And_Disconnect_Async() { @@ -30,64 +29,6 @@ public async Task Basic_Connect_And_Disconnect_Async() Assert.False(client.IsConnected()); } - [Fact(Skip = "Inconsistent test, sometimes fails. TODO: Fix this test.")] - public async Task DoubleConnectAsync() - { - var options = new HiveMQClientOptions - { - ClientId = "DoubleConnectTest", - }; - - var client = new HiveMQClient(options); - Assert.NotNull(client); - - var connectResult = await client.ConnectAsync().ConfigureAwait(false); - - Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); - Assert.True(client.IsConnected()); - - var taskCompletionSource = new TaskCompletionSource(); - client.OnDisconnectReceived += (sender, args) => - { - Assert.True(args.DisconnectPacket.DisconnectReasonCode == DisconnectReasonCode.SessionTakenOver); - taskCompletionSource.SetResult(true); - }; - - // Connect again with the same client - connectResult = await client.ConnectAsync().ConfigureAwait(false); - - Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); - Assert.True(client.IsConnected()); - - try - { - await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); - } - catch (Exception) - { - Assert.True(false, "OnDisconnectReceived was not called"); - } - } - - [Fact(Skip = "Inconsistent test, sometimes fails. TODO: Fix this test.")] - public async Task DoubleDisconnectAsync() - { - var client = new HiveMQClient(); - Assert.NotNull(client); - - var connectResult = await client.ConnectAsync().ConfigureAwait(false); - - Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); - Assert.True(client.IsConnected()); - - var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); - Assert.False(client.IsConnected()); - - disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); - Assert.False(disconnectResult); - Assert.False(client.IsConnected()); - } - [Fact] public async Task Test_Connect_Events_Async() { @@ -105,10 +46,7 @@ public async Task Test_Connect_Events_Async() // Set up TaskCompletionSource to wait for event handlers to finish var taskCompletionSource = new TaskCompletionSource(); - client.OnDisconnectSent += (sender, args) => - { - taskCompletionSource.SetResult(true); - }; + client.OnDisconnectSent += (sender, args) => taskCompletionSource.SetResult(true); // Connect and Disconnect var result = await client.ConnectAsync().ConfigureAwait(false); @@ -147,10 +85,7 @@ public async Task Test_AfterDisconnectEvent_Async() // Set up TaskCompletionSource to wait for event handlers to finish var taskCompletionSource = new TaskCompletionSource(); - client.OnDisconnectSent += (sender, args) => - { - taskCompletionSource.SetResult(true); - }; + client.OnDisconnectSent += (sender, args) => taskCompletionSource.SetResult(true); // Connect and Disconnect var result = await client.ConnectAsync().ConfigureAwait(false); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs index a913fa28..fd78c13e 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs @@ -14,7 +14,7 @@ public async Task Basic_Last_Will_Async() { var options = new HiveMQClientOptions { - LastWillAndTestament = new LastWillAndTestament("last/will", QualityOfService.AtLeastOnceDelivery, "last will message"), + LastWillAndTestament = new LastWillAndTestament("last/will", "last will message"), }; var client = new HiveMQClient(options); @@ -66,7 +66,7 @@ public async Task Last_Will_With_Properties_Async() // Setup & Connect the client with LWT var options = new HiveMQClientOptions { - LastWillAndTestament = new LastWillAndTestament("last/will", QualityOfService.AtLeastOnceDelivery, "last will message"), + LastWillAndTestament = new LastWillAndTestament("last/will", "last will message"), }; options.LastWillAndTestament.WillDelayInterval = 1; diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs index da7cdfe6..c7f9254f 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs @@ -12,7 +12,6 @@ public class LastWillAndTestamentBuilderTest [Fact] public async Task Basic_Last_Will_Async() { - var lwt = new LastWillAndTestamentBuilder() .WithTopic("last/will") .WithPayload("last will message") diff --git a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs index b360c030..5a2088c4 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs @@ -2,7 +2,7 @@ namespace HiveMQtt.Test.HiveMQClient; using System.Threading.Tasks; using HiveMQtt.Client; -using HiveMQtt.Client.Options; +using HiveMQtt.Client.Events; using HiveMQtt.MQTT5.ReasonCodes; using Xunit; @@ -56,4 +56,102 @@ public async Task MultiSubscribeExtendedOptionsAsync() var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); } + + [Fact] + public async Task PerSubscriptionHandlerAsync() + { + var subClient = new HiveMQClient(); + var connectResult = await subClient.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + var tcs = new TaskCompletionSource(); + + var subscribeOptions = new SubscribeOptionsBuilder() + .WithSubscription("tests/PerSubscriptionHandler", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) => + { + Assert.Equal("tests/PerSubscriptionHandler", args.PublishMessage.Topic); + Assert.Equal("test", args.PublishMessage.PayloadAsString); + tcs.SetResult(true); + }) + .Build(); + + var subResult = await subClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false); + + Assert.NotEmpty(subResult.Subscriptions); + Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode); + + var pubClient = new HiveMQClient(); + var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false); + Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success); + + var pubResult = await pubClient.PublishAsync("tests/PerSubscriptionHandler", "test").ConfigureAwait(false); + + var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false); + Assert.True(handlerResult); + + var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } + + [Fact] + public async Task PerSubscriptionHandlerAndGlobalHandlerAsync() + { + var subClient = new HiveMQClient(); + var connectResult = await subClient.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + var tcs = new TaskCompletionSource(); + var gtcs = new TaskCompletionSource(); + + // Setup a per-subscription handler + var subscribeOptions = new SubscribeOptionsBuilder() + .WithSubscription("tests/PerSubscriptionHandler", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) => + { + Assert.Equal("tests/PerSubscriptionHandler", args.PublishMessage.Topic); + Assert.Equal("test", args.PublishMessage.PayloadAsString); + tcs.SetResult(true); + }) + .Build(); + + // Setup a global message handler + void GlobalMessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + Assert.Equal("tests/PerSubscriptionHandler", eventArgs.PublishMessage.Topic); + Assert.Equal("test", eventArgs.PublishMessage.PayloadAsString); + gtcs.SetResult(true); + } + + subClient.OnMessageReceived += GlobalMessageHandler; + + // Both handlers should be called + var subResult = await subClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false); + + Assert.NotEmpty(subResult.Subscriptions); + Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode); + + var pubClient = new HiveMQClient(); + var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false); + Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success); + + var pubResult = await pubClient.PublishAsync("tests/PerSubscriptionHandler", "test").ConfigureAwait(false); + + // Wait on both TaskCompletionSource objects - both handlers should get called + var timeout = TimeSpan.FromSeconds(10); + var delayTask = Task.Delay(timeout); + var completedTask = await Task.WhenAny(Task.WhenAll(tcs.Task, gtcs.Task), delayTask).ConfigureAwait(false); + + if (completedTask == delayTask) + { + throw new TimeoutException("The operation has timed out."); + } + + // If we reach here, it means both tasks have completed before the timeout + var handlerResult = await tcs.Task.ConfigureAwait(false); + var globalHandlerResult = await gtcs.Task.ConfigureAwait(false); + Assert.True(handlerResult); + Assert.True(globalHandlerResult); + + var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/TLSTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/TLSTest.cs index 2cd88fcc..1e1f8a56 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/TLSTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/TLSTest.cs @@ -2,48 +2,12 @@ namespace HiveMQtt.Test.HiveMQClient; using System.Threading.Tasks; using HiveMQtt.Client; -using HiveMQtt.Client.Exceptions; using HiveMQtt.Client.Options; using HiveMQtt.MQTT5.ReasonCodes; using Xunit; public class TLSTest { - [Fact(Skip = "Manual test for now. Requires a broker with self-signed certificate.")] - public async Task Invalid_TLS_Cert_Throws_Async() - { - var options = new HiveMQClientOptions - { - Host = "localhost", - Port = 8883, - AllowInvalidBrokerCertificates = false, - }; - - var client = new HiveMQClient(options); - - var exception = await Assert.ThrowsAsync(client.ConnectAsync).ConfigureAwait(false); - Assert.NotNull(exception); - } - - [Fact(Skip = "Manual test for now. Requires a broker with self-signed certificate.")] - public async Task Allow_Invalid_TLS_Cert_Async() - { - var options = new HiveMQClientOptions - { - Host = "localhost", - Port = 8883, - AllowInvalidBrokerCertificates = true, - }; - - var client = new HiveMQClient(options); - var connectResult = await client.ConnectAsync().ConfigureAwait(false); - - Assert.Equal(ConnAckReasonCode.Success, connectResult.ReasonCode); - - var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); - Assert.True(disconnectResult); - } - [Fact] public async Task Public_Broker_TLS_Async() {