Skip to content

Commit

Permalink
Refactoring, Cleanup & More Code Documentation (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 13, 2023
1 parent 46c1655 commit 2f8c83c
Show file tree
Hide file tree
Showing 20 changed files with 167 additions and 206 deletions.
41 changes: 14 additions & 27 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,9 @@ public async Task<ConnectResult> ConnectAsync()
var socketIsConnected = await this.ConnectSocketAsync().ConfigureAwait(false);

var taskCompletionSource = new TaskCompletionSource<ConnAckPacket>();
void TaskHandler(object? sender, OnConnAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.ConnAckPacket);

EventHandler<OnConnAckReceivedEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.ConnAckPacket);
};
EventHandler<OnConnAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnConnAckReceived += eventHandler;

// Construct the MQTT Connect packet and queue to send
Expand Down Expand Up @@ -138,10 +136,8 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
this.connectState = ConnectState.Disconnecting;

var taskCompletionSource = new TaskCompletionSource<DisconnectPacket>();
EventHandler<OnDisconnectSentEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.DisconnectPacket);
};
void TaskHandler(object? sender, OnDisconnectSentEventArgs args) => taskCompletionSource.SetResult(args.DisconnectPacket);
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;

this.sendQueue.Enqueue(disconnectPacket);
Expand Down Expand Up @@ -188,11 +184,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
// QoS 1: Acknowledged Delivery
var taskCompletionSource = new TaskCompletionSource<PubAckPacket>();

EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.PubAckPacket);
};
void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket);
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
Expand All @@ -207,11 +200,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
// QoS 2: Assured Delivery
var taskCompletionSource = new TaskCompletionSource<PubRecPacket>();

EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.PubRecPacket);
};
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubRecPacket);
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
Expand Down Expand Up @@ -274,12 +264,10 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);

var taskCompletionSource = new TaskCompletionSource<SubAckPacket>();
void TaskHandler(object? sender, OnSubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.SubAckPacket);

// FIXME: We should only ever have one subscribe in flight at any time (for now)
EventHandler<OnSubAckReceivedEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.SubAckPacket);
};
EventHandler<OnSubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnSubAckReceived += eventHandler;

// Construct the MQTT Connect packet and queue to send
Expand All @@ -297,7 +285,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
catch (System.TimeoutException ex)
{
// log.Error(string.Format("Connect timeout. No response received in time.", ex);
throw;
throw ex;
}
finally
{
Expand Down Expand Up @@ -352,10 +340,9 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscri
var unsubscribePacket = new UnsubscribePacket(subscriptions, (ushort)packetIdentifier);

var taskCompletionSource = new TaskCompletionSource<UnsubAckPacket>();
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = (sender, args) =>
{
taskCompletionSource.SetResult(args.UnsubAckPacket);
};

void TaskHandler(object? sender, OnUnsubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.UnsubAckPacket);
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;

this.sendQueue.Enqueue(unsubscribePacket);
Expand Down
42 changes: 19 additions & 23 deletions Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
private PipeReader? reader;
private PipeWriter? writer;

internal static bool ValidateServerCertificate(
object sender,
X509Certificate certificate,
X509Chain chain,
SslPolicyErrors sslPolicyErrors)
{
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

Console.WriteLine("Certificate error: {0}", sslPolicyErrors);

// Do not allow this client to communicate with unauthenticated servers.
return false;
}

/// <summary>
/// Make a TCP connection to a remote broker.
/// </summary>
Expand Down Expand Up @@ -102,19 +119,15 @@ internal async Task<bool> ConnectSocketAsync()
this.writer = PipeWriter.Create(this.stream);

// Start the traffic processors
this.trafficOutflowProcessor = this.TrafficOutflowProcessorAsync();
this.trafficInflowProcessor = this.TrafficInflowProcessorAsync();
_ = this.TrafficOutflowProcessorAsync();
_ = this.TrafficInflowProcessorAsync();

// Console.WriteLine($"Socket connected to {this.socket.RemoteEndPoint}");
return socketConnected;
}

internal bool CloseSocket()
{
// Shutdown the traffic processors
this.trafficOutflowProcessor = null;
this.trafficInflowProcessor = null;

// Shutdown the pipeline
this.reader = null;
this.writer = null;
Expand All @@ -125,21 +138,4 @@ internal bool CloseSocket()

return true;
}

internal static bool ValidateServerCertificate(
object sender,
X509Certificate certificate,
X509Chain chain,
SslPolicyErrors sslPolicyErrors)
{
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

Console.WriteLine("Certificate error: {0}", sslPolicyErrors);

// Do not allow this client to communicate with unauthenticated servers.
return false;
}
}
5 changes: 0 additions & 5 deletions Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
// Transactional packets indexed by packet identifer
private readonly ConcurrentDictionary<int, List<ControlPacket>> transactionQueue = new();

private Task<bool>? trafficOutflowProcessor;

private Task<bool>? trafficInflowProcessor;

/// <summary>
/// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue.
/// </summary>
Expand Down Expand Up @@ -161,7 +157,6 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>

stopWatch.Restart();
}

} // while

Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}");
Expand Down
6 changes: 3 additions & 3 deletions Source/HiveMQtt/Client/HiveMQClientUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ from executing a second time.
/// runtime from inside the finalizer and you should not reference
/// other objects. Only unmanaged resources can be disposed.
/// </summary>
/// <param name="disposing">fixme.</param>
/// <param name="disposing">True if called from user code.</param>
protected virtual void Dispose(bool disposing)
{
// Check to see if Dispose has already been called.
Expand All @@ -72,14 +72,14 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
// Dispose managed resources.
{ }
// { }
}

// Call the appropriate methods to clean up
// unmanaged resources here.
// If disposing is false,
// only the following code is executed.
{ }
// { }

// Note disposing has been done.
this.disposed = true;
Expand Down
2 changes: 1 addition & 1 deletion Source/HiveMQtt/Client/Options/SubscribeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
namespace HiveMQtt.Client.Options;

using HiveMQtt.MQTT5.Types;
using HiveMQtt.Client.Exceptions;
using HiveMQtt.MQTT5.Types;

public class SubscribeOptions
{
Expand Down
2 changes: 0 additions & 2 deletions Source/HiveMQtt/Client/Options/UnsubscribeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
namespace HiveMQtt.Client.Options;

using System.Collections;

public class UnsubscribeOptions
{
public UnsubscribeOptions() => this.UserProperties = new Dictionary<string, string>();
Expand Down
10 changes: 5 additions & 5 deletions Source/HiveMQtt/Client/Results/SubscribeResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
/// <para>
/// The topic filter must be the same as one used in the Subscribe operation.
/// </para>
/// <returns>The Subscription for the given topic filter or null if not found.</returns>
/// </summary>
/// <param name="topicFilter">The topic filter to get the Subscription for.</param>
/// <returns>The Subscription for the given topic filter or null if not found.</returns>
public Subscription? GetSubscription(string topicFilter)
{
foreach (var subscription in this.Subscriptions)
Expand All @@ -78,14 +79,13 @@ internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
return subscription;
}
}

return null;
}

/// <summary>
/// Gets the first Subscription in the list of Subscriptions or null if the list is empty.
/// </summary>
public Subscription? GetFirstSubscription()
{
return this.Subscriptions.FirstOrDefault();
}
/// <returns>The first Subscription in the list of Subscriptions or null if the list is empty.</returns>
public Subscription? GetFirstSubscription() => this.Subscriptions.FirstOrDefault();
}
28 changes: 22 additions & 6 deletions Source/HiveMQtt/MQTT5/ControlPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ public static int DecodeVariableByteInteger(ref SequenceReader<byte> reader, out
/// </summary>
internal ushort PacketIdentifier { get; set; }

/// <summary>
/// Decodes a MQTT packet identifier as a two byte integer from the given <c>MemoryStream</c>. It then
/// does basic validation and range checking on the decoded value.
/// </summary>
/// <param name="reader">SequenceReader containing the packet data to be decoded.</param>
/// <returns>The packet identifier as a two byte integer.</returns>
protected static int DecodePacketIdentifier(ref SequenceReader<byte> reader)
{
var packetIdentifier = DecodeTwoByteInteger(ref reader);
if (packetIdentifier == null || packetIdentifier.Value < 0 || packetIdentifier.Value > ushort.MaxValue)
{
throw new MQTTProtocolException("Invalid packet identifier");
}

return packetIdentifier.Value;
}

/// <summary>
/// Encode a UTF-8 string into a <c>MemoryStream</c>.
///
Expand Down Expand Up @@ -128,7 +145,6 @@ protected static int EncodeUTF8String(MemoryStream stream, string s)
/// <returns>A string containing the UTF-8 string.</returns>
protected static string? DecodeUTF8String(ref SequenceReader<byte> reader)
{

if (reader.TryReadBigEndian(out Int16 stringLength))
{
var array = new byte[stringLength];
Expand Down Expand Up @@ -269,7 +285,6 @@ protected static int EncodeBinaryData(MemoryStream writer, byte[] binaryData)
/// <returns>A byte[] containing the binary data.</returns>
protected static byte[]? DecodeBinaryData(ref SequenceReader<byte> reader)
{

if (reader.TryReadBigEndian(out Int16 stringLength))
{
var array = new byte[stringLength];
Expand Down Expand Up @@ -393,7 +408,7 @@ protected void EncodeProperties(MemoryStream writer)

if (writer.Length > int.MaxValue)
{
throw new ArgumentOutOfRangeException("writer", "The writer stream is too large to encode.");
throw new ArgumentOutOfRangeException(nameof(writer), "The writer stream is too large to encode.");
}

var propertyStream = new MemoryStream((int)writer.Length);
Expand Down Expand Up @@ -514,8 +529,8 @@ protected void EncodeProperties(MemoryStream writer)
foreach (var property in this.Properties.UserProperties)
{
propertiesLength += EncodeVariableByteInteger(propertyStream, (int)MQTT5PropertyType.UserProperty);
propertiesLength += EncodeUTF8String(propertyStream, (string)property.Key);
propertiesLength += EncodeUTF8String(propertyStream, (string)property.Value);
propertiesLength += EncodeUTF8String(propertyStream, property.Key);
propertiesLength += EncodeUTF8String(propertyStream, property.Value);
}
}

Expand Down Expand Up @@ -567,7 +582,7 @@ protected bool DecodeProperties(ref SequenceReader<byte> reader, int length)
this.Properties.CorrelationData = DecodeBinaryData(ref reader);
break;
case MQTT5PropertyType.SubscriptionIdentifier:
this.Properties.SubscriptionIdentifier = (Int32)DecodeVariableByteInteger(ref reader);
this.Properties.SubscriptionIdentifier = DecodeVariableByteInteger(ref reader);
break;
case MQTT5PropertyType.SessionExpiryInterval:
this.Properties.SessionExpiryInterval = DecodeFourByteInteger(ref reader);
Expand Down Expand Up @@ -648,4 +663,5 @@ protected bool DecodeProperties(ref SequenceReader<byte> reader, int length)

return true;
}

}
2 changes: 1 addition & 1 deletion Source/HiveMQtt/MQTT5/PacketDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static ControlPacket Decode(ReadOnlySequence<byte> buffer, out SequencePo

// Byte 1: Control Packet Type
srBuffer.TryRead(out var cpByte);
var controlPacketType = (int)cpByte >> 4;
var controlPacketType = cpByte >> 4;

// Byte 2-5: Remaining Length of the Variable Header
// Size of VBI in vbiLengthInBytes
Expand Down
Loading

0 comments on commit 2f8c83c

Please sign in to comment.