Skip to content

Commit

Permalink
Better Topic Matching (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Feb 22, 2024
1 parent 1d27b33 commit 597d1a8
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Source/HiveMQtt/Client/HiveMQClientEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet)
// Per Subscription Event Handler
foreach (var subscription in this.Subscriptions)
{
if (subscription.TopicFilter.Topic == packet.Message.Topic)
if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic))
{
subscription.MessageReceivedHandler?.Invoke(this, eventArgs);
}
Expand Down
67 changes: 67 additions & 0 deletions Source/HiveMQtt/Client/HiveMQClientUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
namespace HiveMQtt.Client;

using System;
using System.Text.RegularExpressions;
using HiveMQtt.MQTT5.Types;

/// <inheritdoc />
Expand Down Expand Up @@ -64,6 +66,71 @@ internal bool SubscriptionExists(Subscription subscription)
return null;
}

/// <summary>
/// This method is used to determine if a topic filter matches a topic.
///
/// It implements the MQTT 5.0 specification definitions for single-level
/// and multi-level wildcard characters (and related rules).
///
/// </summary>
/// <param name="pattern">The topic filter.</param>
/// <param name="candidate">The topic to match.</param>
/// <returns>A boolean indicating whether the topic filter matches the topic.</returns>
/// <exception cref="ArgumentException">Thrown when the topic filter is invalid.</exception>
public static bool MatchTopic(string pattern, string candidate)
{
if (pattern == candidate)
{
return true;
}

if (pattern == "#")
{
// A subscription to “#” will not receive any messages published to a topic beginning with a $
if (candidate.StartsWith("$", System.StringComparison.CurrentCulture))
{
return false;
}
else
{
return true;
}
}

if (pattern == "+")
{
// A subscription to “+” will not receive any messages published to a topic containing a $
if (candidate.StartsWith("$", System.StringComparison.CurrentCulture) ||
candidate.StartsWith("/", System.StringComparison.CurrentCulture))
{
return false;
}
else
{
return true;
}
}

// If pattern contains a multi-level wildcard character, it must be the last character in the pattern
// and it must be preceded by a topic level separator.
var mlwcValidityRegex = new Regex(@"(?<!/)#");

if (pattern.Contains("/#/") | mlwcValidityRegex.IsMatch(pattern))
{
throw new ArgumentException(
"The multi-level wildcard character must be specified either on its own or following a topic level separator. " +
"In either case it must be the last character specified in the Topic Filter.");
}

// ^sport\/tennis\/player1(\/?|.+)$
var regexp = "\\A" + Regex.Escape(pattern).Replace(@"\+", @"?[/][^/]*") + "\\z";

regexp = regexp.Replace(@"/\#", @"(/?|.+)");
regexp = regexp.EndsWith("\\z", System.StringComparison.CurrentCulture) ? regexp : regexp + "\\z";

return Regex.IsMatch(candidate, regexp);
}

/// <summary>
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
/// </summary>
Expand Down
105 changes: 105 additions & 0 deletions Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace HiveMQtt.Test.HiveMQClient;

using System.Text.RegularExpressions;
using System.Threading.Tasks;
using HiveMQtt.Client;
using HiveMQtt.Client.Events;
Expand Down Expand Up @@ -154,4 +155,108 @@ void GlobalMessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false);
Assert.True(disconnectResult);
}

[Fact]
public async Task PerSubHandlerWithSingleLevelWildcardAsync()
{
// Create a subscribeClient that subscribes to a topic with a single-level wildcard
var subscribeClient = new HiveMQClient();
var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false);
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);

var tcs = new TaskCompletionSource<bool>();
var messageCount = 0;

var subscribeOptions = new SubscribeOptionsBuilder()
.WithSubscription("tests/PerSubHandlerWithSingleLevelWildcard/+/msg", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) =>
{
messageCount++;
var pattern = @"^tests/PerSubHandlerWithSingleLevelWildcard/[0-2]/msg$";
var regex = new Regex(pattern);
Assert.Matches(regex, args.PublishMessage.Topic);

Assert.Equal("test", args.PublishMessage.PayloadAsString);

if (messageCount == 3)
{
tcs.SetResult(true);
}
})
.Build();

var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false);

Assert.NotEmpty(subResult.Subscriptions);
Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode);

var pubClient = new HiveMQClient();
var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false);
Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success);

// Publish 3 messages that will match the single-level wildcard
for (var i = 0; i < 3; i++)
{
await pubClient.PublishAsync($"tests/PerSubHandlerWithSingleLevelWildcard/{i}/msg", "test").ConfigureAwait(false);
}

// Wait for the 3 messages to be received by the per-subscription handler
var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
Assert.True(handlerResult);

var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false);
Assert.True(disconnectResult);
}

[Fact]
public async Task PerSubHandlerWithMultiLevelWildcardAsync()
{
// Create a subscribeClient that subscribes to a topic with a single-level wildcard
var subscribeClient = new HiveMQClient();
var connectResult = await subscribeClient.ConnectAsync().ConfigureAwait(false);
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);

var tcs = new TaskCompletionSource<bool>();
var messageCount = 0;

var subscribeOptions = new SubscribeOptionsBuilder()
.WithSubscription(
"tests/PerSubHandlerWithMultiLevelWildcard/#",
MQTT5.Types.QualityOfService.AtLeastOnceDelivery,
messageReceivedHandler: (sender, args) =>
{
messageCount++;
var pattern = @"\Atests/PerSubHandlerWithMultiLevelWildcard/(/?|.+)\z";
var regex = new Regex(pattern);
Assert.Matches(regex, args.PublishMessage.Topic);

Assert.Equal("test", args.PublishMessage.PayloadAsString);

if (messageCount == 3)
{
tcs.SetResult(true);
}
})
.Build();

var subResult = await subscribeClient.SubscribeAsync(subscribeOptions).ConfigureAwait(false);

Assert.NotEmpty(subResult.Subscriptions);
Assert.Equal(SubAckReasonCode.GrantedQoS1, subResult.Subscriptions[0].SubscribeReasonCode);

var pubClient = new HiveMQClient();
var pubConnectResult = await pubClient.ConnectAsync().ConfigureAwait(false);
Assert.True(pubConnectResult.ReasonCode == ConnAckReasonCode.Success);

// Publish 3 messages that will match the multi-level wildcard
await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1", "test").ConfigureAwait(false);
await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2", "test").ConfigureAwait(false);
await pubClient.PublishAsync($"tests/PerSubHandlerWithMultiLevelWildcard/1/2/3/4/5", "test").ConfigureAwait(false);

// Wait for the 3 messages to be received by the per-subscription handler
var handlerResult = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
Assert.True(handlerResult);

var disconnectResult = await subscribeClient.DisconnectAsync().ConfigureAwait(false);
Assert.True(disconnectResult);
}
}
122 changes: 122 additions & 0 deletions Tests/HiveMQtt.Test/HiveMQClient/UtilTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
namespace HiveMQtt.Test.HiveMQClient;

using Xunit;
using HiveMQtt.Client;

public class UtilTest
{
[Fact]
public void SingleLevelWildcardMatch()
{
// The plus sign (‘+’ U+002B) is a wildcard character that matches only one topic level.
//
// The single-level wildcard can be used at any level in the Topic Filter, including first and last levels. Where it is used, it MUST occupy an entire level of the filter [MQTT-4.7.1-2]. It can be used at more than one level in the Topic Filter and can be used in conjunction with the multi-level wildcard.
//
// For example, “sport/tennis/+” matches “sport/tennis/player1” and “sport/tennis/player2”, but not “sport/tennis/player1/ranking”. Also, because the single-level wildcard matches only a single level, “sport/+” does not match “sport” but it does match “sport/”.
//
// · “+” is valid
// · “+/tennis/#” is valid
// · “sport+” is not valid
// · “sport/+/player1” is valid
// · “/finance” matches “+/+” and “/+”, but not “+”
bool result;

// “sport/tennis/+” matches “sport/tennis/player1”
result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1");
Assert.True(result);

// “sport/tennis/+” doesn't match sport/tennis/player1/ranking”
result = HiveMQClient.MatchTopic("sport/tennis/+", "sport/tennis/player1/ranking");
Assert.False(result);

// “sport/+” does not match “sport”
result = HiveMQClient.MatchTopic("sport/+", "sport");
Assert.False(result);

// “sport/+” does match “sport/”
result = HiveMQClient.MatchTopic("sport/+", "sport/");
Assert.True(result);

// "sport/+/player1" matches "sport/tennis/player1"
result = HiveMQClient.MatchTopic("sport/+/player1", "sport/tennis/player1");
Assert.True(result);

// "/finance" matches “/+”
result = HiveMQClient.MatchTopic("/+", "/finance");
Assert.True(result);

// "/finance" doesn't match “+”
result = HiveMQClient.MatchTopic("+", "/finance");
Assert.False(result);

// A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients”
result = HiveMQClient.MatchTopic("+/monitor/Clients", "$SYS/monitor/Clients");
Assert.False(result);

// A subscription to “$SYS/monitor/+” will receive messages published to “$SYS/monitor/Clients”
result = HiveMQClient.MatchTopic("$SYS/monitor/+", "$SYS/monitor/Clients");
Assert.True(result);

// https://github.com/hivemq/hivemq-mqtt-client-dotnet/issues/126
result = HiveMQClient.MatchTopic("hivemqtt/sendmessageonloop/+/test", "hivemqtt/sendmessageonloop/2938472/test");
Assert.True(result);
}

[Fact]
public void MultiLevelWildcardMatch()
{
// From: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901244
// # The number sign (‘#’ U+0023) is a wildcard character that matches any number of levels within a topic. The multi-level wildcard represents the parent and any number of child levels. The multi-level wildcard character MUST be specified either on its own or following a topic level separator. In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-1].
//
// For example, if a Client subscribes to “sport/tennis/player1/#”, it would receive messages published using these Topic Names:
// · “sport/tennis/player1”
// · “sport/tennis/player1/ranking
// · “sport/tennis/player1/score/wimbledon”
//
// · “sport/#” also matches the singular “sport”, since # includes the parent level.
// · “#” is valid and will receive every Application Message
// · “sport/tennis/#” is valid
// · “sport/tennis#” is not valid
// · “sport/tennis/#/ranking” is not valid
bool result;

// “sport/tennis/#” matches “sport/tennis/player1”
result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1");
Assert.True(result);

// “sport/tennis/#” matches “sport/tennis/player1/ranking”
result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/ranking");
Assert.True(result);

// “sport/tennis/+” matches “sport/tennis/player1/ranking”
result = HiveMQClient.MatchTopic("sport/tennis/player1/#", "sport/tennis/player1/score/wimbledon");
Assert.True(result);

// “sport/#” also matches the singular “sport”, since # includes the parent level.
result = HiveMQClient.MatchTopic("sport/#", "sport");
Assert.True(result);

// “#” is valid and will receive every Application Message
result = HiveMQClient.MatchTopic("#", "any/and/all/topics");
Assert.True(result);

// Invalid multi-level wildcards
Assert.Throws<ArgumentException>(() => HiveMQClient.MatchTopic("invalid/mlwc#", "sport/tennis/player1/ranking"));

// “sport/tennis/#/ranking” is not valid
Assert.Throws<ArgumentException>(() => HiveMQClient.MatchTopic("sport/tennis/#/ranking", "sport/tennis/player1/ranking"));
Assert.Throws<ArgumentException>(() => HiveMQClient.MatchTopic("/#/", "sport/tennis/player1/ranking"));

// “sport/tennis#” is not valid
Assert.Throws<ArgumentException>(() => HiveMQClient.MatchTopic("sports/tennis#", "sport/tennis/player1/ranking"));

// A subscription to “#” will not receive any messages published to a topic beginning with a $
result = HiveMQClient.MatchTopic("#", "$SYS/broker/clients/total");
Assert.False(result);

// A subscription to “$SYS/#” will receive messages published to topics beginning with “$SYS/”
result = HiveMQClient.MatchTopic("$SYS/#", "$SYS/broker/clients/total");
Assert.True(result);

}
}

0 comments on commit 597d1a8

Please sign in to comment.