Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
Fixed message order in case receiver not sending acks and all send me…
Browse files Browse the repository at this point in the history
…ssages are overwritten in queue.
  • Loading branch information
Norbert Heusser committed Oct 20, 2017
1 parent 9f37e53 commit 2b438b3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion broker/include/broker/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SubRequester *broker_create_sub_requester(DownstreamNode * node, const char *pat
void broker_free_sub_requester(SubRequester *req);
void broker_clear_messsage_ids(SubRequester *req);

int sendQueuedMessages(SubRequester *subReq);
uint32_t sendQueuedMessages(SubRequester *subReq);

void clear_qos_queue(SubRequester *subReq, uint8_t serialize);

Expand Down
24 changes: 12 additions & 12 deletions broker/src/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ void cleanup_queued_message(void* message) {
}
}

int sendQueuedMessages(SubRequester *subReq) {
int result = 1;
uint32_t sendQueuedMessages(SubRequester *subReq) {
uint32_t result = 0;

if(rb_count(subReq->messageQueue)) {
while (subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) {
Expand All @@ -255,9 +255,12 @@ int sendQueuedMessages(SubRequester *subReq) {
log_debug("Has been send already: %d\n", m->msg_id);
break;
}
result &= sendMessage(subReq, m->message, &m->msg_id);

sendMessage(subReq, m->message, &m->msg_id);
++result;
}
}

return result;
}

Expand All @@ -278,7 +281,7 @@ static int sendMessage(SubRequester *subReq, json_t *varray, uint32_t* msgId)
*msgId = broker_ws_send_obj(subReq->reqNode->link, top);
json_decref(top);

log_debug("Send message with msgId %d\n", *msgId);
log_debug("Send message with msgId %d\n", *msgId);

return addPendingAck(subReq, *msgId);
}
Expand Down Expand Up @@ -326,15 +329,12 @@ int broker_update_sub_req(SubRequester *subReq, json_t *varray) {
uint32_t msgId = 0;

if ( subReq->qos <= 2 ) {
// We need to send the message first to get a message id
if (subReq->reqNode->link && subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) {
result = sendMessage(subReq, varray, &msgId);
log_debug("Sending with msgid: %d\n", msgId);
} else {
log_debug("Send queue full: %d\n", subReq->reqSid);
}
// Now add the message with or without its message id to the queue
// Add the message to the message queue and than try to send messages from the queue to keep message order
// in all cases
addToMessageQueue(subReq, varray, msgId);
if ( sendQueuedMessages(subReq) == 0 ) {
log_debug("Send queue full: %d\n", subReq->reqSid);
}
} else {
if (subReq->reqNode->link ) {
result = sendMessage(subReq, varray, &msgId);
Expand Down

0 comments on commit 2b438b3

Please sign in to comment.