diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs
index 31d18ad8..02332c47 100644
--- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs
@@ -141,7 +141,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet)
// Per Subscription Event Handler
foreach (var subscription in this.Subscriptions)
{
- if (subscription.TopicFilter.Topic == packet.Message.Topic)
+ if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic))
{
subscription.MessageReceivedHandler?.Invoke(this, eventArgs);
}
diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs
index c543dc21..18993be5 100644
--- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs
@@ -15,6 +15,8 @@
*/
namespace HiveMQtt.Client;
+using System;
+using System.Text.RegularExpressions;
using HiveMQtt.MQTT5.Types;
///
@@ -64,6 +66,71 @@ internal bool SubscriptionExists(Subscription subscription)
return null;
}
+ ///
+ /// This method is used to determine if a topic filter matches a topic.
+ ///
+ /// It implements the MQTT 5.0 specification definitions for single-level
+ /// and multi-level wildcard characters (and related rules).
+ ///
+ ///
+ /// The topic filter.
+ /// The topic to match.
+ /// A boolean indicating whether the topic filter matches the topic.
+ /// Thrown when the topic filter is invalid.
+ public static bool MatchTopic(string pattern, string candidate)
+ {
+ if (pattern == candidate)
+ {
+ return true;
+ }
+
+ if (pattern == "#")
+ {
+ // A subscription to “#” will not receive any messages published to a topic beginning with a $
+ if (candidate.StartsWith("$", System.StringComparison.CurrentCulture))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ if (pattern == "+")
+ {
+ // A subscription to “+” will not receive any messages published to a topic containing a $
+ if (candidate.StartsWith("$", System.StringComparison.CurrentCulture) ||
+ candidate.StartsWith("/", System.StringComparison.CurrentCulture))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ // If pattern contains a multi-level wildcard character, it must be the last character in the pattern
+ // and it must be preceded by a topic level separator.
+ var mlwcValidityRegex = new Regex(@"(?
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
///
diff --git a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs
index 5a2088c4..d8e33057 100644
--- a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs
+++ b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs
@@ -1,5 +1,6 @@
namespace HiveMQtt.Test.HiveMQClient;
+using System.Text.RegularExpressions;
using System.Threading.Tasks;
using HiveMQtt.Client;
using HiveMQtt.Client.Events;
@@ -154,4 +155,108 @@ void GlobalMessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false);
Assert.True(disconnectResult);
}
+
+ [Fact]
+ public async Task PerSubHandlerWithSingleLevelWildcardAsync()
+ {
+ // Create a subscribeClient that subscribes to a topic with a single-level wildcard
+ var subscribeClient = new HiveMQClient();
+ var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false);
+ Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
+
+ var tcs = new TaskCompletionSource();
+ var messageCount = 0;
+
+ var subscribeOptions = new SubscribeOptionsBuilder()
+ .WithSubscription("tests/PerSubHandlerWithSingleLevelWildcard/+/msg", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) =>
+ {
+ messageCount++;
+ var pattern = @"^tests/PerSubHandlerWithSingleLevelWildcard/[0-2]/msg$";
+ var regex = new Regex(pattern);
+ Assert.Matches(regex, args.PublishMessage.Topic);
+
+ Assert.Equal("test", args.PublishMessage.PayloadAsString);
+
+ if (messageCount == 3)
+ {
+ tcs.SetResult(true);
+ }
+ })
+ .Build();
+
+ var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false);
+
+ Assert.NotEmpty(subResult.Subscriptions);
+ Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode);
+
+ var pubClient = new HiveMQClient();
+ var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false);
+ Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success);
+
+ // Publish 3 messages that will match the single-level wildcard
+ for (var i = 0; i < 3; i++)
+ {
+ await pubClient.PublishAsync($"tests/PerSubHandlerWithSingleLevelWildcard/{i}/msg", "test").ConfigureAwait(false);
+ }
+
+ // Wait for the 3 messages to be received by the per-subscription handler
+ var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
+ Assert.True(handlerResult);
+
+ var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false);
+ Assert.True(disconnectResult);
+ }
+
+ [Fact]
+ public async Task PerSubHandlerWithMultiLevelWildcardAsync()
+ {
+ // Create a subscribeClient that subscribes to a topic with a single-level wildcard
+ var subscribeClient = new HiveMQClient();
+ var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false);
+ Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
+
+ var tcs = new TaskCompletionSource();
+ var messageCount = 0;
+
+ var subscribeOptions = new SubscribeOptionsBuilder()
+ .WithSubscription(
+ "tests/PerSubHandlerWithMultiLevelWildcard/#",
+ MQTT5.Types.QualityOfService.AtLeastOnceDelivery,
+ messageReceivedHandler: (sender, args) =>
+ {
+ messageCount++;
+ var pattern = @"\Atests/PerSubHandlerWithMultiLevelWildcard/(/?|.+)\z";
+ var regex = new Regex(pattern);
+ Assert.Matches(regex, args.PublishMessage.Topic);
+
+ Assert.Equal("test", args.PublishMessage.PayloadAsString);
+
+ if (messageCount == 3)
+ {
+ tcs.SetResult(true);
+ }
+ })
+ .Build();
+
+ var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false);
+
+ Assert.NotEmpty(subResult.Subscriptions);
+ Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode);
+
+ var pubClient = new HiveMQClient();
+ var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false);
+ Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success);
+
+ // Publish 3 messages that will match the multi-level wildcard
+ await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1", "test").ConfigureAwait(false);
+ await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2", "test").ConfigureAwait(false);
+ await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2/3/4/5", "test").ConfigureAwait(false);
+
+ // Wait for the 3 messages to be received by the per-subscription handler
+ var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
+ Assert.True(handlerResult);
+
+ var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false);
+ Assert.True(disconnectResult);
+ }
}
diff --git a/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs
new file mode 100644
index 00000000..348dc23d
--- /dev/null
+++ b/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs
@@ -0,0 +1,122 @@
+namespace HiveMQtt.Test.HiveMQClient;
+
+using Xunit;
+using HiveMQtt.Client;
+
+public class UtilTest
+{
+ [Fact]
+ public void SingleLevelWildcardMatch()
+ {
+ // The plus sign (‘+’ U+002B) is a wildcard character that matches only one topic level.
+ //
+ // The single-level wildcard can be used at any level in the Topic Filter, including first and last levels. Where it is used, it MUST occupy an entire level of the filter [MQTT-4.7.1-2]. It can be used at more than one level in the Topic Filter and can be used in conjunction with the multi-level wildcard.
+ //
+ // For example, “sport/tennis/+” matches “sport/tennis/player1” and “sport/tennis/player2”, but not “sport/tennis/player1/ranking”. Also, because the single-level wildcard matches only a single level, “sport/+” does not match “sport” but it does match “sport/”.
+ //
+ // · “+” is valid
+ // · “+/tennis/#” is valid
+ // · “sport+” is not valid
+ // · “sport/+/player1” is valid
+ // · “/finance” matches “+/+” and “/+”, but not “+”
+ bool result;
+
+ // “sport/tennis/+” matches “sport/tennis/player1”
+ result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1");
+ Assert.True(result);
+
+ // “sport/tennis/+” doesn't match sport/tennis/player1/ranking”
+ result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1/ranking");
+ Assert.False(result);
+
+ // “sport/+” does not match “sport”
+ result = HiveMQClient.MatchTopic("sport/+", "sport");
+ Assert.False(result);
+
+ // “sport/+” does match “sport/”
+ result = HiveMQClient.MatchTopic("sport/+", "sport/");
+ Assert.True(result);
+
+ // "sport/+/player1" matches "sport/tennis/player1"
+ result = HiveMQClient.MatchTopic("sport/+/player1", "sport/tennis/player1");
+ Assert.True(result);
+
+ // "/finance" matches “/+”
+ result = HiveMQClient.MatchTopic("/+", "/finance");
+ Assert.True(result);
+
+ // "/finance" doesn't match “+”
+ result = HiveMQClient.MatchTopic("+", "/finance");
+ Assert.False(result);
+
+ // A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients”
+ result = HiveMQClient.MatchTopic("+/monitor/Clients", "$SYS/monitor/Clients");
+ Assert.False(result);
+
+ // A subscription to “$SYS/monitor/+” will receive messages published to “$SYS/monitor/Clients”
+ result = HiveMQClient.MatchTopic("$SYS/monitor/+", "$SYS/monitor/Clients");
+ Assert.True(result);
+
+ // https://github.com/hivemq/hivemq-mqtt-client-dotnet/issues/126
+ result = HiveMQClient.MatchTopic("hivemqtt/sendmessageonloop/+/test", "hivemqtt/sendmessageonloop/2938472/test");
+ Assert.True(result);
+ }
+
+ [Fact]
+ public void MultiLevelWildcardMatch()
+ {
+ // From: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901244
+ // # The number sign (‘#’ U+0023) is a wildcard character that matches any number of levels within a topic. The multi-level wildcard represents the parent and any number of child levels. The multi-level wildcard character MUST be specified either on its own or following a topic level separator. In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-1].
+ //
+ // For example, if a Client subscribes to “sport/tennis/player1/#”, it would receive messages published using these Topic Names:
+ // · “sport/tennis/player1”
+ // · “sport/tennis/player1/ranking
+ // · “sport/tennis/player1/score/wimbledon”
+ //
+ // · “sport/#” also matches the singular “sport”, since # includes the parent level.
+ // · “#” is valid and will receive every Application Message
+ // · “sport/tennis/#” is valid
+ // · “sport/tennis#” is not valid
+ // · “sport/tennis/#/ranking” is not valid
+ bool result;
+
+ // “sport/tennis/#” matches “sport/tennis/player1”
+ result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1");
+ Assert.True(result);
+
+ // “sport/tennis/#” matches “sport/tennis/player1/ranking”
+ result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/ranking");
+ Assert.True(result);
+
+ // “sport/tennis/+” matches “sport/tennis/player1/ranking”
+ result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/score/wimbledon");
+ Assert.True(result);
+
+ // “sport/#” also matches the singular “sport”, since # includes the parent level.
+ result = HiveMQClient.MatchTopic("sport/#", "sport");
+ Assert.True(result);
+
+ // “#” is valid and will receive every Application Message
+ result = HiveMQClient.MatchTopic("#", "any/and/all/topics");
+ Assert.True(result);
+
+ // Invalid multi-level wildcards
+ Assert.Throws(() => HiveMQClient.MatchTopic("invalid/mlwc#", "sport/tennis/player1/ranking"));
+
+ // “sport/tennis/#/ranking” is not valid
+ Assert.Throws(() => HiveMQClient.MatchTopic("sport/tennis/#/ranking", "sport/tennis/player1/ranking"));
+ Assert.Throws(() => HiveMQClient.MatchTopic("/#/", "sport/tennis/player1/ranking"));
+
+ // “sport/tennis#” is not valid
+ Assert.Throws(() => HiveMQClient.MatchTopic("sports/tennis#", "sport/tennis/player1/ranking"));
+
+ // A subscription to “#” will not receive any messages published to a topic beginning with a $
+ result = HiveMQClient.MatchTopic("#", "$SYS/broker/clients/total");
+ Assert.False(result);
+
+ // A subscription to “$SYS/#” will receive messages published to topics beginning with “$SYS/”
+ result = HiveMQClient.MatchTopic("$SYS/#", "$SYS/broker/clients/total");
+ Assert.True(result);
+
+ }
+}