From 2b438b3ab7c25424bcccd9c3766c76b5e9b91033 Mon Sep 17 00:00:00 2001 From: Norbert Heusser Date: Fri, 20 Oct 2017 14:26:28 +0200 Subject: [PATCH] Fixed message order in case receiver not sending acks and all send messages are overwritten in queue. --- broker/include/broker/subscription.h | 2 +- broker/src/subscription.c | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/broker/include/broker/subscription.h b/broker/include/broker/subscription.h index abeddc7b..9d5d7867 100644 --- a/broker/include/broker/subscription.h +++ b/broker/include/broker/subscription.h @@ -49,7 +49,7 @@ SubRequester *broker_create_sub_requester(DownstreamNode * node, const char *pat void broker_free_sub_requester(SubRequester *req); void broker_clear_messsage_ids(SubRequester *req); -int sendQueuedMessages(SubRequester *subReq); +uint32_t sendQueuedMessages(SubRequester *subReq); void clear_qos_queue(SubRequester *subReq, uint8_t serialize); diff --git a/broker/src/subscription.c b/broker/src/subscription.c index 14832ada..ae894b26 100644 --- a/broker/src/subscription.c +++ b/broker/src/subscription.c @@ -242,8 +242,8 @@ void cleanup_queued_message(void* message) { } } -int sendQueuedMessages(SubRequester *subReq) { - int result = 1; +uint32_t sendQueuedMessages(SubRequester *subReq) { + uint32_t result = 0; if(rb_count(subReq->messageQueue)) { while (subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) { @@ -255,9 +255,12 @@ int sendQueuedMessages(SubRequester *subReq) { log_debug("Has been send already: %d\n", m->msg_id); break; } - result &= sendMessage(subReq, m->message, &m->msg_id); + + sendMessage(subReq, m->message, &m->msg_id); + ++result; } } + return result; } @@ -278,7 +281,7 @@ static int sendMessage(SubRequester *subReq, json_t *varray, uint32_t* msgId) *msgId = broker_ws_send_obj(subReq->reqNode->link, top); json_decref(top); - log_debug("Send message with msgId %d\n", *msgId); + log_debug("Send message with msgId %d\n", *msgId); return addPendingAck(subReq, *msgId); } @@ -326,15 +329,12 @@ int broker_update_sub_req(SubRequester *subReq, json_t *varray) { uint32_t msgId = 0; if ( subReq->qos <= 2 ) { - // We need to send the message first to get a message id - if (subReq->reqNode->link && subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) { - result = sendMessage(subReq, varray, &msgId); - log_debug("Sending with msgid: %d\n", msgId); - } else { - log_debug("Send queue full: %d\n", subReq->reqSid); - } - // Now add the message with or without its message id to the queue + // Add the message to the message queue and than try to send messages from the queue to keep message order + // in all cases addToMessageQueue(subReq, varray, msgId); + if ( sendQueuedMessages(subReq) == 0 ) { + log_debug("Send queue full: %d\n", subReq->reqSid); + } } else { if (subReq->reqNode->link ) { result = sendMessage(subReq, varray, &msgId);