From 597d1a85704c9dd4c20983bec942870976b70f3e Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 22 Feb 2024 18:28:30 +0100 Subject: [PATCH] Better Topic Matching (#128) --- Source/HiveMQtt/Client/HiveMQClientEvents.cs | 2 +- Source/HiveMQtt/Client/HiveMQClientUtil.cs | 67 ++++++++++ .../HiveMQClient/SubscribeBuilderTest.cs | 105 +++++++++++++++ Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs | 122 ++++++++++++++++++ 4 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 31d18ad8..02332c47 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -141,7 +141,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) // Per Subscription Event Handler foreach (var subscription in this.Subscriptions) { - if (subscription.TopicFilter.Topic == packet.Message.Topic) + if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic)) { subscription.MessageReceivedHandler?.Invoke(this, eventArgs); } diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index c543dc21..18993be5 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -15,6 +15,8 @@ */ namespace HiveMQtt.Client; +using System; +using System.Text.RegularExpressions; using HiveMQtt.MQTT5.Types; /// @@ -64,6 +66,71 @@ internal bool SubscriptionExists(Subscription subscription) return null; } + /// + /// This method is used to determine if a topic filter matches a topic. + /// + /// It implements the MQTT 5.0 specification definitions for single-level + /// and multi-level wildcard characters (and related rules). + /// + /// + /// The topic filter. + /// The topic to match. + /// A boolean indicating whether the topic filter matches the topic. + /// Thrown when the topic filter is invalid. + public static bool MatchTopic(string pattern, string candidate) + { + if (pattern == candidate) + { + return true; + } + + if (pattern == "#") + { + // A subscription to “#” will not receive any messages published to a topic beginning with a $ + if (candidate.StartsWith("$", System.StringComparison.CurrentCulture)) + { + return false; + } + else + { + return true; + } + } + + if (pattern == "+") + { + // A subscription to “+” will not receive any messages published to a topic containing a $ + if (candidate.StartsWith("$", System.StringComparison.CurrentCulture) || + candidate.StartsWith("/", System.StringComparison.CurrentCulture)) + { + return false; + } + else + { + return true; + } + } + + // If pattern contains a multi-level wildcard character, it must be the last character in the pattern + // and it must be preceded by a topic level separator. + var mlwcValidityRegex = new Regex(@"(? /// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0. /// diff --git a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs index 5a2088c4..d8e33057 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs @@ -1,5 +1,6 @@ namespace HiveMQtt.Test.HiveMQClient; +using System.Text.RegularExpressions; using System.Threading.Tasks; using HiveMQtt.Client; using HiveMQtt.Client.Events; @@ -154,4 +155,108 @@ void GlobalMessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); } + + [Fact] + public async Task PerSubHandlerWithSingleLevelWildcardAsync() + { + // Create a subscribeClient that subscribes to a topic with a single-level wildcard + var subscribeClient = new HiveMQClient(); + var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + var tcs = new TaskCompletionSource(); + var messageCount = 0; + + var subscribeOptions = new SubscribeOptionsBuilder() + .WithSubscription("tests/PerSubHandlerWithSingleLevelWildcard/+/msg", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) => + { + messageCount++; + var pattern = @"^tests/PerSubHandlerWithSingleLevelWildcard/[0-2]/msg$"; + var regex = new Regex(pattern); + Assert.Matches(regex, args.PublishMessage.Topic); + + Assert.Equal("test", args.PublishMessage.PayloadAsString); + + if (messageCount == 3) + { + tcs.SetResult(true); + } + }) + .Build(); + + var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false); + + Assert.NotEmpty(subResult.Subscriptions); + Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode); + + var pubClient = new HiveMQClient(); + var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false); + Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success); + + // Publish 3 messages that will match the single-level wildcard + for (var i = 0; i < 3; i++) + { + await pubClient.PublishAsync($"tests/PerSubHandlerWithSingleLevelWildcard/{i}/msg", "test").ConfigureAwait(false); + } + + // Wait for the 3 messages to be received by the per-subscription handler + var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false); + Assert.True(handlerResult); + + var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } + + [Fact] + public async Task PerSubHandlerWithMultiLevelWildcardAsync() + { + // Create a subscribeClient that subscribes to a topic with a single-level wildcard + var subscribeClient = new HiveMQClient(); + var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + var tcs = new TaskCompletionSource(); + var messageCount = 0; + + var subscribeOptions = new SubscribeOptionsBuilder() + .WithSubscription( + "tests/PerSubHandlerWithMultiLevelWildcard/#", + MQTT5.Types.QualityOfService.AtLeastOnceDelivery, + messageReceivedHandler: (sender, args) => + { + messageCount++; + var pattern = @"\Atests/PerSubHandlerWithMultiLevelWildcard/(/?|.+)\z"; + var regex = new Regex(pattern); + Assert.Matches(regex, args.PublishMessage.Topic); + + Assert.Equal("test", args.PublishMessage.PayloadAsString); + + if (messageCount == 3) + { + tcs.SetResult(true); + } + }) + .Build(); + + var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false); + + Assert.NotEmpty(subResult.Subscriptions); + Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode); + + var pubClient = new HiveMQClient(); + var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false); + Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success); + + // Publish 3 messages that will match the multi-level wildcard + await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1", "test").ConfigureAwait(false); + await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2", "test").ConfigureAwait(false); + await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2/3/4/5", "test").ConfigureAwait(false); + + // Wait for the 3 messages to be received by the per-subscription handler + var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false); + Assert.True(handlerResult); + + var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs new file mode 100644 index 00000000..348dc23d --- /dev/null +++ b/Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs @@ -0,0 +1,122 @@ +namespace HiveMQtt.Test.HiveMQClient; + +using Xunit; +using HiveMQtt.Client; + +public class UtilTest +{ + [Fact] + public void SingleLevelWildcardMatch() + { + // The plus sign (‘+’ U+002B) is a wildcard character that matches only one topic level. + // + // The single-level wildcard can be used at any level in the Topic Filter, including first and last levels. Where it is used, it MUST occupy an entire level of the filter [MQTT-4.7.1-2]. It can be used at more than one level in the Topic Filter and can be used in conjunction with the multi-level wildcard. + // + // For example, “sport/tennis/+” matches “sport/tennis/player1” and “sport/tennis/player2”, but not “sport/tennis/player1/ranking”. Also, because the single-level wildcard matches only a single level, “sport/+” does not match “sport” but it does match “sport/”. + // + // · “+” is valid + // · “+/tennis/#” is valid + // · “sport+” is not valid + // · “sport/+/player1” is valid + // · “/finance” matches “+/+” and “/+”, but not “+” + bool result; + + // “sport/tennis/+” matches “sport/tennis/player1” + result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1"); + Assert.True(result); + + // “sport/tennis/+” doesn't match sport/tennis/player1/ranking” + result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1/ranking"); + Assert.False(result); + + // “sport/+” does not match “sport” + result = HiveMQClient.MatchTopic("sport/+", "sport"); + Assert.False(result); + + // “sport/+” does match “sport/” + result = HiveMQClient.MatchTopic("sport/+", "sport/"); + Assert.True(result); + + // "sport/+/player1" matches "sport/tennis/player1" + result = HiveMQClient.MatchTopic("sport/+/player1", "sport/tennis/player1"); + Assert.True(result); + + // "/finance" matches “/+” + result = HiveMQClient.MatchTopic("/+", "/finance"); + Assert.True(result); + + // "/finance" doesn't match “+” + result = HiveMQClient.MatchTopic("+", "/finance"); + Assert.False(result); + + // A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients” + result = HiveMQClient.MatchTopic("+/monitor/Clients", "$SYS/monitor/Clients"); + Assert.False(result); + + // A subscription to “$SYS/monitor/+” will receive messages published to “$SYS/monitor/Clients” + result = HiveMQClient.MatchTopic("$SYS/monitor/+", "$SYS/monitor/Clients"); + Assert.True(result); + + // https://github.com/hivemq/hivemq-mqtt-client-dotnet/issues/126 + result = HiveMQClient.MatchTopic("hivemqtt/sendmessageonloop/+/test", "hivemqtt/sendmessageonloop/2938472/test"); + Assert.True(result); + } + + [Fact] + public void MultiLevelWildcardMatch() + { + // From: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901244 + // # The number sign (‘#’ U+0023) is a wildcard character that matches any number of levels within a topic. The multi-level wildcard represents the parent and any number of child levels. The multi-level wildcard character MUST be specified either on its own or following a topic level separator. In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-1]. + // + // For example, if a Client subscribes to “sport/tennis/player1/#”, it would receive messages published using these Topic Names: + // · “sport/tennis/player1” + // · “sport/tennis/player1/ranking + // · “sport/tennis/player1/score/wimbledon” + // + // · “sport/#” also matches the singular “sport”, since # includes the parent level. + // · “#” is valid and will receive every Application Message + // · “sport/tennis/#” is valid + // · “sport/tennis#” is not valid + // · “sport/tennis/#/ranking” is not valid + bool result; + + // “sport/tennis/#” matches “sport/tennis/player1” + result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1"); + Assert.True(result); + + // “sport/tennis/#” matches “sport/tennis/player1/ranking” + result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/ranking"); + Assert.True(result); + + // “sport/tennis/+” matches “sport/tennis/player1/ranking” + result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/score/wimbledon"); + Assert.True(result); + + // “sport/#” also matches the singular “sport”, since # includes the parent level. + result = HiveMQClient.MatchTopic("sport/#", "sport"); + Assert.True(result); + + // “#” is valid and will receive every Application Message + result = HiveMQClient.MatchTopic("#", "any/and/all/topics"); + Assert.True(result); + + // Invalid multi-level wildcards + Assert.Throws(() => HiveMQClient.MatchTopic("invalid/mlwc#", "sport/tennis/player1/ranking")); + + // “sport/tennis/#/ranking” is not valid + Assert.Throws(() => HiveMQClient.MatchTopic("sport/tennis/#/ranking", "sport/tennis/player1/ranking")); + Assert.Throws(() => HiveMQClient.MatchTopic("/#/", "sport/tennis/player1/ranking")); + + // “sport/tennis#” is not valid + Assert.Throws(() => HiveMQClient.MatchTopic("sports/tennis#", "sport/tennis/player1/ranking")); + + // A subscription to “#” will not receive any messages published to a topic beginning with a $ + result = HiveMQClient.MatchTopic("#", "$SYS/broker/clients/total"); + Assert.False(result); + + // A subscription to “$SYS/#” will receive messages published to topics beginning with “$SYS/” + result = HiveMQClient.MatchTopic("$SYS/#", "$SYS/broker/clients/total"); + Assert.True(result); + + } +}