diff --git a/src/Core/ServiceBus/Mappings/MessageMappings.cs b/src/Core/ServiceBus/Mappings/MessageMappings.cs index 13a78b6..694e5e4 100644 --- a/src/Core/ServiceBus/Mappings/MessageMappings.cs +++ b/src/Core/ServiceBus/Mappings/MessageMappings.cs @@ -1,6 +1,8 @@ using System.Text; +using Azure.Core.Amqp; using Azure.Messaging.ServiceBus; using CrossBusExplorer.ServiceBus.Contracts.Types; + namespace CrossBusExplorer.ServiceBus.Mappings; public static class MessageMappings @@ -10,7 +12,7 @@ public static Message MapToMessage(this ServiceBusReceivedMessage receivedMessag return new Message( receivedMessage.MessageId, receivedMessage.Subject, - Encoding.UTF8.GetString(receivedMessage.Body), + ReadBody(receivedMessage), new MessageSystemProperties( receivedMessage.ContentType, receivedMessage.CorrelationId, @@ -38,6 +40,22 @@ public static Message MapToMessage(this ServiceBusReceivedMessage receivedMessag p => p.Value?.ToString())); } + private static string ReadBody(ServiceBusReceivedMessage receivedMessage) + { + AmqpAnnotatedMessage amqpMessage = receivedMessage.GetRawAmqpMessage(); + if (amqpMessage.Body.TryGetValue(out object? value)) + { + return ReadMessageValue(value); + } + + if (amqpMessage.Body.TryGetData(out IEnumerable>? data)) + { + return ReadMessageData(data); + } + + throw new NotSupportedException("Cannot read the message Body"); + } + public static ServiceBusMessage ToServiceBusMessage(this SendMessage message) { var sbMessage = new ServiceBusMessage(message.Body); @@ -105,73 +123,30 @@ public static ServiceBusMessage ToServiceBusMessage(this SendMessage message) return sbMessage; } - public static ServiceBusMessage MapToServiceBusMessage( - this ServiceBusReceivedMessage receivedMessage) + private static string ReadMessageValue(object? value) { - var sbMessage = new ServiceBusMessage(receivedMessage.Body); - - if (!string.IsNullOrEmpty(receivedMessage.Subject)) - { - sbMessage.Subject = receivedMessage.Subject; - } - - if (receivedMessage.ApplicationProperties != null) - { - foreach (var applicationProperty in receivedMessage.ApplicationProperties) - { - sbMessage.ApplicationProperties.Add( - applicationProperty.Key, - applicationProperty.Value); - } - } - - if (!string.IsNullOrEmpty(receivedMessage.To)) + return value switch { - sbMessage.To = receivedMessage.To; - } - - if (!string.IsNullOrEmpty(receivedMessage.ContentType)) - { - sbMessage.ContentType = receivedMessage.ContentType; - } - - if (!string.IsNullOrEmpty(receivedMessage.CorrelationId)) - { - sbMessage.CorrelationId = receivedMessage.CorrelationId; - } - - if (!string.IsNullOrEmpty(receivedMessage.MessageId)) - { - sbMessage.MessageId = receivedMessage.MessageId; - } - - if (!string.IsNullOrEmpty(receivedMessage.PartitionKey)) - { - sbMessage.PartitionKey = receivedMessage.PartitionKey; - } - - if (!string.IsNullOrEmpty(receivedMessage.ReplyTo)) - { - sbMessage.ReplyTo = receivedMessage.ReplyTo; - } - - if (!string.IsNullOrEmpty(receivedMessage.SessionId)) - { - sbMessage.SessionId = receivedMessage.SessionId; - } + null => string.Empty, + string stringValue => stringValue, + _ => throw new NotSupportedException($"Unknown message Body type {value.GetType().Name}") + }; + } - if (receivedMessage.ScheduledEnqueueTime != null && - receivedMessage.ScheduledEnqueueTime != DateTimeOffset.MinValue) + private static string ReadMessageData(IEnumerable>? data) + { + if (data is null) { - sbMessage.ScheduledEnqueueTime = receivedMessage.ScheduledEnqueueTime; + return string.Empty; } - if (receivedMessage.TimeToLive != null && - receivedMessage.ScheduledEnqueueTime != DateTimeOffset.MinValue) + using var memoryStream = new MemoryStream(); + foreach (var segment in data) { - sbMessage.TimeToLive = receivedMessage.TimeToLive; + memoryStream.Write(segment.Span); } - return sbMessage; + memoryStream.Position = 0; + return Encoding.UTF8.GetString(memoryStream.ToArray()); } } \ No newline at end of file diff --git a/src/Core/ServiceBus/MessageService.cs b/src/Core/ServiceBus/MessageService.cs index 567cf20..88908d3 100644 --- a/src/Core/ServiceBus/MessageService.cs +++ b/src/Core/ServiceBus/MessageService.cs @@ -149,7 +149,7 @@ public async IAsyncEnumerable ResendAsync( await SendMessagesInternalAsync( sender, - messages.Select(p => p.MapToServiceBusMessage()).ToList(), + messages.Select(m => new ServiceBusMessage(m)).ToList(), cancellationToken); batchResendCount = messages.Count;