Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

qos queue's disk space limit #28

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions broker/include/broker/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ json_t *broker_config_get();

extern uint8_t broker_enable_token;
extern size_t broker_max_qos_queue_size;
extern size_t broker_max_qos_queue_file_size;
extern size_t broker_max_ws_send_queue_size;

int broker_config_load(json_t *json);
Expand Down
2 changes: 2 additions & 0 deletions broker/include/broker/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ void serialize_qos_queue(SubRequester *subReq, uint8_t deleteFlag);

int check_subscription_ack(RemoteDSLink *link, uint32_t ack);

int check_queue_size_limit(json_t* qosQueue);

#ifdef __cplusplus
}
#endif
Expand Down
13 changes: 13 additions & 0 deletions broker/src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ json_t *broker_config_gen() {
json_object_set_new_nocheck(broker_config, "log_level", json_string_nocheck("info"));
json_object_set_new_nocheck(broker_config, "allowAllLinks", json_true());
json_object_set_new_nocheck(broker_config, "maxQueue", json_integer(1024));
json_object_set_new_nocheck(broker_config, "maxQueueFileSize", json_integer(0));
json_object_set_new_nocheck(broker_config, "maxSendQueue", json_integer(8));
json_object_set_new_nocheck(broker_config, "defaultPermission", json_null());

Expand Down Expand Up @@ -126,6 +127,7 @@ json_t *broker_config_get() {

uint8_t broker_enable_token = 1;
size_t broker_max_qos_queue_size = 1024;
size_t broker_max_qos_queue_file_size = 0;
size_t broker_max_ws_send_queue_size = 8;
char *broker_storage_path = ".";

Expand Down Expand Up @@ -194,6 +196,17 @@ int broker_config_load(json_t* json) {
}
}

{
// load maxQueueFileSize
json_t* maxQueueFileSize = json_object_get(json, "maxQueueFileSize");
if (json_is_integer(maxQueueFileSize)) {
broker_max_qos_queue_file_size = (size_t)json_integer_value(maxQueueFileSize);
if (broker_max_qos_queue_file_size > 0xFFFFFFFF) {
broker_max_qos_queue_file_size = 0xFFFFFFFF;
}
}
}

json_t *storage = json_object_get(json, "storage");

if (json_is_object(storage)) {
Expand Down
25 changes: 24 additions & 1 deletion broker/src/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ int broker_update_sub_req(SubRequester *subReq, json_t *varray) {
if (!subReq->qosQueue) {
subReq->qosQueue = json_array();
}
if (json_array_size(subReq->qosQueue) >= broker_max_qos_queue_size) {
if (check_queue_size_limit(subReq->qosQueue)) {
// destroy qos queue when exceed max queue size
clear_qos_queue(subReq, 1);
return result;
Expand Down Expand Up @@ -433,3 +433,26 @@ void broker_update_sub_qos(SubRequester *req, uint8_t qos) {
}
}
}

int check_queue_size_limit(json_t *qosQueue) {
/*
return 1 if size limit is exceeded, otherwise return 0

limit is set via maxQueue and/or maxQueueFileSize in broker.json
. limit is exceeded when either maxQueue/maxQueueFileSize is exceeded

Note that when maxQueueFileSize is set to 0, no limit on size of qos queue file
*/

int rc = 0;

rc |= (json_array_size(qosQueue) >= broker_max_qos_queue_size);

if (broker_max_qos_queue_file_size) {
char* dump = json_dumps(qosQueue , JSON_ENCODE_ANY);
rc |= (strlen(dump)+5 >= broker_max_qos_queue_file_size);
dslink_free(dump);
}

return rc;
}