Skip to content

Commit

Permalink
Fix read non-standard message (#26)
Browse files Browse the repository at this point in the history
* Reprocess message as it is
* Read message body
* Cleanup
  • Loading branch information
glucaci authored Mar 16, 2023
1 parent 9008226 commit c250b60
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 61 deletions.
95 changes: 35 additions & 60 deletions src/Core/ServiceBus/Mappings/MessageMappings.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Text;
using Azure.Core.Amqp;
using Azure.Messaging.ServiceBus;
using CrossBusExplorer.ServiceBus.Contracts.Types;

namespace CrossBusExplorer.ServiceBus.Mappings;

public static class MessageMappings
Expand All @@ -10,7 +12,7 @@ public static Message MapToMessage(this ServiceBusReceivedMessage receivedMessag
return new Message(
receivedMessage.MessageId,
receivedMessage.Subject,
Encoding.UTF8.GetString(receivedMessage.Body),
ReadBody(receivedMessage),
new MessageSystemProperties(
receivedMessage.ContentType,
receivedMessage.CorrelationId,
Expand Down Expand Up @@ -38,6 +40,22 @@ public static Message MapToMessage(this ServiceBusReceivedMessage receivedMessag
p => p.Value?.ToString()));
}

private static string ReadBody(ServiceBusReceivedMessage receivedMessage)
{
AmqpAnnotatedMessage amqpMessage = receivedMessage.GetRawAmqpMessage();
if (amqpMessage.Body.TryGetValue(out object? value))
{
return ReadMessageValue(value);
}

if (amqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>>? data))
{
return ReadMessageData(data);
}

throw new NotSupportedException("Cannot read the message Body");
}

public static ServiceBusMessage ToServiceBusMessage(this SendMessage message)
{
var sbMessage = new ServiceBusMessage(message.Body);
Expand Down Expand Up @@ -105,73 +123,30 @@ public static ServiceBusMessage ToServiceBusMessage(this SendMessage message)
return sbMessage;
}

public static ServiceBusMessage MapToServiceBusMessage(
this ServiceBusReceivedMessage receivedMessage)
private static string ReadMessageValue(object? value)
{
var sbMessage = new ServiceBusMessage(receivedMessage.Body);

if (!string.IsNullOrEmpty(receivedMessage.Subject))
{
sbMessage.Subject = receivedMessage.Subject;
}

if (receivedMessage.ApplicationProperties != null)
{
foreach (var applicationProperty in receivedMessage.ApplicationProperties)
{
sbMessage.ApplicationProperties.Add(
applicationProperty.Key,
applicationProperty.Value);
}
}

if (!string.IsNullOrEmpty(receivedMessage.To))
return value switch
{
sbMessage.To = receivedMessage.To;
}

if (!string.IsNullOrEmpty(receivedMessage.ContentType))
{
sbMessage.ContentType = receivedMessage.ContentType;
}

if (!string.IsNullOrEmpty(receivedMessage.CorrelationId))
{
sbMessage.CorrelationId = receivedMessage.CorrelationId;
}

if (!string.IsNullOrEmpty(receivedMessage.MessageId))
{
sbMessage.MessageId = receivedMessage.MessageId;
}

if (!string.IsNullOrEmpty(receivedMessage.PartitionKey))
{
sbMessage.PartitionKey = receivedMessage.PartitionKey;
}

if (!string.IsNullOrEmpty(receivedMessage.ReplyTo))
{
sbMessage.ReplyTo = receivedMessage.ReplyTo;
}

if (!string.IsNullOrEmpty(receivedMessage.SessionId))
{
sbMessage.SessionId = receivedMessage.SessionId;
}
null => string.Empty,
string stringValue => stringValue,
_ => throw new NotSupportedException($"Unknown message Body type {value.GetType().Name}")
};
}

if (receivedMessage.ScheduledEnqueueTime != null &&
receivedMessage.ScheduledEnqueueTime != DateTimeOffset.MinValue)
private static string ReadMessageData(IEnumerable<ReadOnlyMemory<byte>>? data)
{
if (data is null)
{
sbMessage.ScheduledEnqueueTime = receivedMessage.ScheduledEnqueueTime;
return string.Empty;
}

if (receivedMessage.TimeToLive != null &&
receivedMessage.ScheduledEnqueueTime != DateTimeOffset.MinValue)
using var memoryStream = new MemoryStream();
foreach (var segment in data)
{
sbMessage.TimeToLive = receivedMessage.TimeToLive;
memoryStream.Write(segment.Span);
}

return sbMessage;
memoryStream.Position = 0;
return Encoding.UTF8.GetString(memoryStream.ToArray());
}
}
2 changes: 1 addition & 1 deletion src/Core/ServiceBus/MessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public async IAsyncEnumerable<ResendResult> ResendAsync(

await SendMessagesInternalAsync(
sender,
messages.Select(p => p.MapToServiceBusMessage()).ToList(),
messages.Select(m => new ServiceBusMessage(m)).ToList(),
cancellationToken);

batchResendCount = messages.Count;
Expand Down

0 comments on commit c250b60

Please sign in to comment.