Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
Merge pull request #6 from mwielpue/fix-message-order-delayed-acks
Browse files Browse the repository at this point in the history
Fixed message order in case receiver delayed acks and all send msgs are overwritten
  • Loading branch information
fuersten authored Nov 7, 2017
2 parents 9f37e53 + 2b438b3 commit 7092afd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion broker/include/broker/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SubRequester *broker_create_sub_requester(DownstreamNode * node, const char *pat
void broker_free_sub_requester(SubRequester *req);
void broker_clear_messsage_ids(SubRequester *req);

int sendQueuedMessages(SubRequester *subReq);
uint32_t sendQueuedMessages(SubRequester *subReq);

void clear_qos_queue(SubRequester *subReq, uint8_t serialize);

Expand Down
24 changes: 12 additions & 12 deletions broker/src/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ void cleanup_queued_message(void* message) {
}
}

int sendQueuedMessages(SubRequester *subReq) {
int result = 1;
uint32_t sendQueuedMessages(SubRequester *subReq) {
uint32_t result = 0;

if(rb_count(subReq->messageQueue)) {
while (subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) {
Expand All @@ -255,9 +255,12 @@ int sendQueuedMessages(SubRequester *subReq) {
log_debug("Has been send already: %d\n", m->msg_id);
break;
}
result &= sendMessage(subReq, m->message, &m->msg_id);

sendMessage(subReq, m->message, &m->msg_id);
++result;
}
}

return result;
}

Expand All @@ -278,7 +281,7 @@ static int sendMessage(SubRequester *subReq, json_t *varray, uint32_t* msgId)
*msgId = broker_ws_send_obj(subReq->reqNode->link, top);
json_decref(top);

log_debug("Send message with msgId %d\n", *msgId);
log_debug("Send message with msgId %d\n", *msgId);

return addPendingAck(subReq, *msgId);
}
Expand Down Expand Up @@ -326,15 +329,12 @@ int broker_update_sub_req(SubRequester *subReq, json_t *varray) {
uint32_t msgId = 0;

if ( subReq->qos <= 2 ) {
// We need to send the message first to get a message id
if (subReq->reqNode->link && subReq->messageOutputQueueCount < broker_max_ws_send_queue_size) {
result = sendMessage(subReq, varray, &msgId);
log_debug("Sending with msgid: %d\n", msgId);
} else {
log_debug("Send queue full: %d\n", subReq->reqSid);
}
// Now add the message with or without its message id to the queue
// Add the message to the message queue and than try to send messages from the queue to keep message order
// in all cases
addToMessageQueue(subReq, varray, msgId);
if ( sendQueuedMessages(subReq) == 0 ) {
log_debug("Send queue full: %d\n", subReq->reqSid);
}
} else {
if (subReq->reqNode->link ) {
result = sendMessage(subReq, varray, &msgId);
Expand Down

0 comments on commit 7092afd

Please sign in to comment.