Skip to content

Commit

Permalink
Add Last Will and Testament Support (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Sep 7, 2023
1 parent 71e468a commit d8a0d1a
Show file tree
Hide file tree
Showing 28 changed files with 665 additions and 168 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<ItemGroup Label="Package References">
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" PrivateAssets="all" Version="17.4.33" />
<PackageReference Include="MinVer" PrivateAssets="all" Version="4.2.0" />
<PackageReference Include="StyleCop.Analyzers" PrivateAssets="all" Version="1.2.0-beta.435" />
<PackageReference Include="StyleCop.Analyzers" PrivateAssets="all" Version="1.1.118" />
</ItemGroup>

</Project>
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,47 @@ await client.PublishAsync(
).ConfigureAwait(false);
```

### Last Will and Testament

The Last Will and Testament support of MQTT can be used to notify subscribers that your client is offline.

For a more in-depth explanation, see [What is MQTT Last Will and Testament (LWT)? – MQTT Essentials: Part 9](https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/).

```C#
// Specify the Last Will and Testament specifics in HiveMQClientOptions
var options = new HiveMQClientOptions
{
LastWillAndTestament = new LastWillAndTestament("last/will", QualityOfService.AtLeastOnceDelivery, "last will message"),
};

// Optionally set extended properties on the Last Will and Testament message
options.LastWillAndTestament.WillDelayInterval = 1;
options.LastWillAndTestament.PayloadFormatIndicator = 1;
options.LastWillAndTestament.MessageExpiryInterval = 100;
options.LastWillAndTestament.ContentType = "application/text";
options.LastWillAndTestament.ResponseTopic = "response/topic";
options.LastWillAndTestament.CorrelationData = new byte[] { 1, 2, 3, 4, 5 };
options.LastWillAndTestament.UserProperties.Add("userPropertyKey", "userPropertyValue");

// ConnectAsync will transmit the Last Will and Testament configuration.
var client = new HiveMQClient(options);
connectResult = await client.ConnectAsync().ConfigureAwait(false);

// The Last Will and Testament message will be sent to the "last/will" topic if your clients get
// unexpectedly disconnected or alternatively, if your client disconnects with `DisconnectWithWillMessage`
var disconnectOptions = new DisconnectOptions { ReasonCode = DisconnectReasonCode.DisconnectWithWillMessage };
var disconnectResult = await client.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
``````

Because the client above disconnected with `DisconnectReasonCode.DisconnectWithWillMessage`, subscribers to the `last/will` topic will receive the Last Will and Testament message as specified above.

### More

For more examples that you can easily copy/paste, see our [Examples](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Documentation/Examples.md).
There is even an https://github.com/hivemq/hivemq-mqtt-client-dotnet/tree/main/Examples/HiveMQtt-CLI to demonstrate usage of the package.

## Other MQTT Clients

* [Java](https://github.com/hivemq/hivemq-mqtt-client)
Expand Down
8 changes: 6 additions & 2 deletions Source/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<CodeAnalysisRuleSet>..\..\StyleCop.Analyzers.ruleset</CodeAnalysisRuleSet>
<!-- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> -->
<PropertyGroup>
<CodeAnalysisRuleSet>./../../StyleCop.Analyzers.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<ItemGroup>
<AdditionalFiles Include="$(MSBuildThisFileDirectory)./../stylecop.json" Link="stylecop.json" />
</ItemGroup>

</Project>
3 changes: 1 addition & 2 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


namespace HiveMQtt.Client;

using System;
Expand Down Expand Up @@ -109,7 +108,7 @@ public async Task<ConnectResult> ConnectAsync()

// Data massage: This class is used for end users. Let's prep the data so it's easily understandable.
// If the Session Expiry Interval is absent the value in the CONNECT Packet used.
connectResult.Properties.SessionExpiryInterval ??= (UInt32)this.Options.SessionExpiryInterval;
connectResult.Properties.SessionExpiryInterval ??= (uint)this.Options.SessionExpiryInterval;

// Fire the corresponding event
this.AfterConnectEventLauncher(connectResult);
Expand Down
2 changes: 1 addition & 1 deletion Source/HiveMQtt/Client/HiveMQClientEvents.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-present HiveMQ and the HiveMQ Community
* Copyright 2023-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
21 changes: 11 additions & 10 deletions Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ internal async Task<bool> ConnectSocketAsync()
ipAddress = address;
break;
}

if (address.AddressFamily == AddressFamily.InterNetwork)
{
ipAddress = address;
Expand All @@ -87,12 +88,9 @@ internal async Task<bool> ConnectSocketAsync()
}
}

if (ipAddress == null)
{
// We have multiple address returned, but none of them match the PreferIPv6 option.
// Use the first one whatever it is.
ipAddress = ipHostInfo.AddressList[0];
}
// We have multiple address returned, but none of them match the PreferIPv6 option.
// Use the first one whatever it is.
ipAddress ??= ipHostInfo.AddressList[0];

IPEndPoint ipEndPoint = new(ipAddress, this.Options.Port);

Expand Down Expand Up @@ -126,11 +124,14 @@ internal async Task<bool> ConnectSocketAsync()
return socketConnected;
}

internal bool CloseSocket()
internal bool CloseSocket(bool? shutdownPipeline = true)
{
// Shutdown the pipeline
this.reader = null;
this.writer = null;
if (shutdownPipeline == true)
{
// Shutdown the pipeline
this.reader = null;
this.writer = null;
}

// Shutdown the socket
this.socket?.Shutdown(SocketShutdown.Both);
Expand Down
41 changes: 30 additions & 11 deletions Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
if (elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
{
// Send PingReq
var writeResult = await this.writer.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
var writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
this.OnPingReqSentEventLauncher(new PingReqPacket());
stopWatch.Restart();
}
Expand All @@ -82,22 +82,22 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
// FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time.
case ConnectPacket connectPacket:
Trace.WriteLine("--> ConnectPacket");
writeResult = await this.writer.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
this.OnConnectSentEventLauncher(connectPacket);
break;
case DisconnectPacket disconnectPacket:
Trace.WriteLine("--> DisconnectPacket");
writeResult = await this.writer.WriteAsync(DisconnectPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false);
this.OnDisconnectSentEventLauncher(disconnectPacket);
break;
case SubscribePacket subscribePacket:
Trace.WriteLine("--> SubscribePacket");
writeResult = await this.writer.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
this.OnSubscribeSentEventLauncher(subscribePacket);
break;
case UnsubscribePacket unsubscribePacket:
Trace.WriteLine("--> UnsubscribePacket");
writeResult = await this.writer.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
this.OnUnsubscribeSentEventLauncher(unsubscribePacket);
break;
case PublishPacket publishPacket:
Expand All @@ -112,38 +112,39 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
}
}

writeResult = await this.writer.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);

this.OnPublishSentEventLauncher(publishPacket);
break;
case PubAckPacket pubAckPacket:
// This is in response to a received Publish packet. Communication chain management
// was done in the receiver code. Just send the response.
Trace.WriteLine("--> PubAckPacket");
writeResult = await this.writer.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
this.OnPubAckSentEventLauncher(pubAckPacket);
break;
case PubRecPacket pubRecPacket:
// This is in response to a received Publish packet. Communication chain management
// was done in the receiver code. Just send the response.
Trace.WriteLine("--> PubRecPacket");
writeResult = await this.writer.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
this.OnPubRecSentEventLauncher(pubRecPacket);
break;
case PubRelPacket pubRelPacket:
// This is in response to a received PubRec packet. Communication chain management
// was done in the receiver code. Just send the response.
Trace.WriteLine("--> PubRelPacket");
writeResult = await this.writer.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
this.OnPubRelSentEventLauncher(pubRelPacket);
break;
case PubCompPacket pubCompPacket:
// This is in response to a received PubRel packet. Communication chain management
// was done in the receiver code. Just send the response.
Trace.WriteLine("--> PubCompPacket");
writeResult = await this.writer.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
this.OnPubCompSentEventLauncher(pubCompPacket);
break;

/* case AuthPacket authPacket:
/* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false);
/* this.OnAuthSentEventLauncher(authPacket);
Expand Down Expand Up @@ -174,7 +175,7 @@ private Task<bool> TrafficInflowProcessorAsync() => Task.Run(async () =>

while (this.connectState is ConnectState.Connecting or ConnectState.Connected)
{
readResult = await this.reader.ReadAsync().ConfigureAwait(false);
readResult = await this.ReadAsync().ConfigureAwait(false);

if (readResult.IsCanceled)
{
Expand Down Expand Up @@ -350,4 +351,22 @@ private Task<bool> TrafficInflowProcessorAsync() => Task.Run(async () =>

return true;
});

internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
if (this.writer is null)
{
throw new HiveMQttClientException("Writer is null");
}
return this.writer.WriteAsync(source, cancellationToken);
}

internal ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
if (this.reader is null)
{
throw new HiveMQttClientException("Reader is null");
}
return this.reader.ReadAsync(cancellationToken);
}
}
28 changes: 14 additions & 14 deletions Source/HiveMQtt/Client/HiveMQClientUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
private bool disposed = false;
private int lastPacketId = 0;

/// <summary>
/// Generate a packet identifier.
/// </summary>
/// <returns>A valid packet identifier.</returns>
protected int GeneratePacketIdentifier()
{
if (this.lastPacketId == ushort.MaxValue)
{
this.lastPacketId = 0;
}

return Interlocked.Increment(ref this.lastPacketId);
}

/// <summary>
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
/// </summary>
Expand All @@ -51,6 +37,20 @@ from executing a second time.
GC.SuppressFinalize(this);
}

/// <summary>
/// Generate a packet identifier.
/// </summary>
/// <returns>A valid packet identifier.</returns>
protected int GeneratePacketIdentifier()
{
if (this.lastPacketId == ushort.MaxValue)
{
this.lastPacketId = 0;
}

return Interlocked.Increment(ref this.lastPacketId);
}

/// <summary>
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0
/// Dispose(bool disposing) executes in two distinct scenarios.
Expand Down
Loading

0 comments on commit d8a0d1a

Please sign in to comment.