Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HiveMQTT subscriber dies, never recovers, after receiving large burst of messages #144

Closed
Aaronontheweb opened this issue Apr 10, 2024 · 9 comments
Labels
bug Something isn't working

Comments

@Aaronontheweb
Copy link

🐛 Bug Report

I'm working on an MQTT PoC that needs to be able to process 30,000 3k-7kb packets per second per node (large scale network) with QoS=0. Tried doing this with MQTTNet and ran into issues with it dotnet/MQTTnet#1962 - so I thought I would give HiveMQ a try.

Here is what I was able to produce with the same sample I used for MQTTnet:

[INFO][04/10/2024 22:04:14.925Z][Thread 0004][akka://MyActorSystem/user/supervisor] RabbitMQ supervisor started - MQTT reader and writer running
[INFO][04/10/2024 22:04:20.385Z][Thread 0029][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [0] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:20.401Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Subscription running
[INFO][04/10/2024 22:04:20.402Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [0] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:24.950Z][Thread 0011][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [0] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:25.136Z][Thread 0019][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [0] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:29.949Z][Thread 0012][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [8164] messages from MQTT [1633.1718078937852 msg/s]
[INFO][04/10/2024 22:04:30.135Z][Thread 0022][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [9093] messages to Rabbit [1819.0071665641635 msg/s]
[INFO][04/10/2024 22:04:35.227Z][Thread 0077][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [4050.8370577319124 msg/s]
[INFO][04/10/2024 22:04:36.252Z][Thread 0012][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [25324] messages to Rabbit [2653.277448258188 msg/s]
[INFO][04/10/2024 22:04:39.946Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:40.132Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [1087.8009516041482 msg/s]
[INFO][04/10/2024 22:04:44.948Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:45.135Z][Thread 0029][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:49.953Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:50.138Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:54.958Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:55.127Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:59.944Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:00.130Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:04.951Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:05.137Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:09.958Z][Thread 0044][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:10.129Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:14.947Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]

Received about ~30k messages and then the client dies - no disconnect message or anything is received. The OnMessage event stops firing and according to EMQX, the client is still alive but it's no longer ACKing any of the published messages.

🔬 How To Reproduce

Steps to reproduce the behavior:

  1. ...

Code sample

Fairly simply client setup - we're just writing any of the messages we receive to a ChannelWriter<MQTT5PublishMessage> - those messages will get picked up by Akka.NET's Streaming library which will pipe the messages to RabbitMQ. Akka.Streams hasn't had any trouble keeping up.

private async Task ConfigureMqttClientAsync()
    {
        var mqttClientOptions = new HiveMQClientOptionsBuilder()
            .WithClientId(_appSetting.MQTT_ClientId)
            .WithBroker(_appSetting.MQTT_Host)
            .WithPort(_appSetting.MQTT_Port)
            .WithUserName(_appSetting.MQTT_User)
            .WithPassword(_appSetting.MQTT_Password)
            .WithCleanStart(false)
            .WithMaximumPacketSize(7 * 1024)
            //.WithUseTls(true)
            .Build();

        _mqttClient = new HiveMQClient(mqttClientOptions);

      var qosSetting = _appSetting.MQTT_QoS switch
        {
            0 => QualityOfService.AtMostOnceDelivery,
            1 => QualityOfService.AtLeastOnceDelivery,
            2 => QualityOfService.ExactlyOnceDelivery,
            _ => QualityOfService
                .AtMostOnceDelivery // Handle invalid QoS setting (assuming default to QoS 0 in this example)
        };

        var topicFilter = new TopicFilter(_appSetting.MQTT_Topic, qosSetting);

        var mqttSubscribeOptions = new SubscribeOptionsBuilder()
            .WithSubscription(topicFilter).Build();

        _mqttClient.OnMessageReceived += ManagedMqttClient_ApplicationMessageReceived2Async;
        _mqttClient.OnDisconnectReceived += ManagedMqttClient_DisconnectedAsync;
        _mqttClient.OnPublishReceived +=  (sender, args) =>
        {
            _self.Tell(ClientRecv.Instance);
        };
        
        var connectResult = await _mqttClient.ConnectAsync();
        await _mqttClient.SubscribeAsync(mqttSubscribeOptions);
        _log.Info("Subscription running");
    }

private void ManagedMqttClient_DisconnectedAsync(object? sender, OnDisconnectReceivedEventArgs e)
    {
        _log.Warning("MQTT client disconnected for reason: {0}", e.DisconnectPacket.DisconnectReasonCode);
        _mqttClient.Dispose(); // dispose the client
        _mqttClient.OnMessageReceived -= ManagedMqttClient_ApplicationMessageReceived2Async;
        _mqttClient.OnDisconnectReceived -= ManagedMqttClient_DisconnectedAsync;
        _self.Tell(ConfigureMqttClient.Instance);
    }

    private void ManagedMqttClient_ApplicationMessageReceived2Async(object? sender, OnMessageReceivedEventArgs e)
    {
        try
        {
            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
            _channelWriter.WriteAsync(e.PublishMessage, cts.Token);
            _self.Tell(Processed.Instance);
        }
        catch (Exception ex)
        {
            _log.Error(ex, "Error acknowledging message {0}", e.PublishMessage.CorrelationData);
        }
    }

Environment

Windows, .NET 8, using an EMQX 5.5.1 broker running on Ubuntu WSL2

Screenshots

What I can see in the EMQX broker logs is that HiveMQTT fails to keep up not long after the load starts - notice the lack of PUBACK responses here:

2024-04-10T16:45:59.156931-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6712, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.156974-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6713, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.156993-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6714, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157038-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6715, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157064-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6716, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157083-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6717, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157101-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6718, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157120-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6719, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157136-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6720, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157156-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6721, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157176-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6722, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157195-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6723, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157212-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6724, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157253-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6725, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157329-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6726, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin

PUBACK events, eventually, do get fired back at the server but only after the message is no longer retained in the fixed-size buffer.

What I can't figure out is - _why doesn't my OnMessageReceived handler get fired when this is happening?

📈 Expected behavior

I'd expect the MQTT client to keep up with the events being thrown at it - the project should really add some event processing benchmarks; that's a much more important measure than event publishing throughput IMHO.

@Aaronontheweb Aaronontheweb added the bug Something isn't working label Apr 10, 2024
Copy link

Hello @Aaronontheweb, thanks for contributing to the HiveMQ community! We will respond as soon as possible.

@pglombardo
Copy link
Collaborator

Hi @Aaronontheweb - I'm traveling today but will take a closer look at this tomorrow first thing. I agree on the benchmarks & event processing. This is planned soon next time I loop back to them.

@pglombardo
Copy link
Collaborator

Hey @Aaronontheweb - The client should definitely not die - I'll aim to recreate this here locally and post back soon.

A couple of thoughts:

.WithMaximumPacketSize(7 * 1024)

This is unnecessary and probably only slows down/complicates the issue.

Considering that you've tried both this library and MQTTNet, have you tried swapping brokers just to identify any difference in either client behavior? That may provide some useful information.

If events processing seems like the issue, you could try merging the OnPublishReceived and OnMessageReceived handlers since they are essentially the same event just at different levels. But this would only be an efficiency change.

I'll post back here soon with updates.

@pglombardo
Copy link
Collaborator

Hi @Aaronontheweb - have you made any progress on this issue?

I tested locally and haven't been able to reproduce this yet.

I wanted to get a base line so I used a tool to blast publish QoS 1 messages endlessly.

With this client/single subscriber, I got about ~10k msgs/second for processing. HiveMQ v4 broker in the middle though.

I recorded the session here:

https://asciinema.org/a/hGOgBiYMDWNgOu5uLse7M1wzf

With QoS 0 we peak out at about ~80k msgs/sec:

Screenshot 2024-04-15 at 16 55 37

In any case, I just wanted to update you. I have a couple more tests planned. Let me know if you've made any progress on your side.

@Aaronontheweb
Copy link
Author

@pglombardo thanks! I'll retry this without the packet size setting

@pglombardo
Copy link
Collaborator

Hi @Aaronontheweb - have you had a chance to revisit this? With v0.18.1, we've added a bunch of health checks and back-pressure support that should resolve this issue. Let me know.

@Aaronontheweb
Copy link
Author

Hi @pglombardo - I ended up writing my own MQTT library in C# https://github.com/petabridge/TurboMqtt

@pglombardo
Copy link
Collaborator

Excellent - that looks like a great client!

I'll close out this issue but If you ever need anything else, don't hesitate - I'd be happy to help out.

@Aaronontheweb
Copy link
Author

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants