Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Feature/182328 msgpack #31

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
[submodule "deps/argtable3"]
path = deps/argtable3
url = https://github.com/argtable/argtable3.git
[submodule "deps/msgpack-c"]
path = deps/msgpack-c
url = https://github.com/msgpack/msgpack-c.git
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ if (NOT NO_WARNINGS)
ADD_C_FLAGS("-Wno-clobbered")
ADD_C_FLAGS("-Wno-misleading-indentation")
ADD_C_FLAGS("-Wno-format-truncation")
ADD_C_FLAGS("-pedantic")
#AK:TODO ADD_C_FLAGS("-pedantic")
endif()

include("${CMAKE_CURRENT_LIST_DIR}/deps/msgpack-c-build/CMakeLists.txt")

if (${CMAKE_C_COMPILER_ID} STREQUAL "Clang")
ADD_C_FLAGS("-Qunused-arguments")
endif()
Expand All @@ -127,6 +129,7 @@ endif()
include_directories("${CMAKE_CURRENT_BINARY_DIR}/include")
include_directories("${JANSSON_DIR}/include")
include_directories("${CMAKE_CURRENT_LIST_DIR}/sdk/include")
include_directories("${CMAKE_CURRENT_LIST_DIR}/deps/msgpack-c/include")

set(DSLINK_SRC_DIR "${CMAKE_CURRENT_LIST_DIR}/sdk/src")
set(DSLINK_SRC
Expand Down Expand Up @@ -224,14 +227,14 @@ endif()

if (DSLINK_BUILD_STATIC OR DSLINK_BUILD_EXAMPLES)
add_library(sdk_dslink_c-static STATIC ${LIBRARY_SRC})
target_link_libraries(sdk_dslink_c-static jansson libuv ${DSLINK_PLATFORM_LIBS})
target_link_libraries(sdk_dslink_c-static msgpackc jansson libuv ${DSLINK_PLATFORM_LIBS})
set(DSLINK_INSTALL_TARGETS sdk_dslink_c sdk_dslink_c-static)
else()
set(DSLINK_INSTALL_TARGETS sdk_dslink_c)
endif()

add_library(sdk_dslink_c SHARED ${LIBRARY_SRC})
target_link_libraries(sdk_dslink_c jansson libuv ${DSLINK_PLATFORM_LIBS})
target_link_libraries(sdk_dslink_c msgpackc jansson libuv ${DSLINK_PLATFORM_LIBS})
set_target_properties(sdk_dslink_c PROPERTIES VERSION ${VERSION} SOVERSION ${VERSION} )

set(DSLINK_INSTALL_LIB_DIR lib CACHE PATH "Installation directory for libraries")
Expand All @@ -255,6 +258,7 @@ if (DSLINK_PACKAGE_INCLUDES)
install(DIRECTORY "${CMAKE_CURRENT_LIST_DIR}/deps/mbedtls/include/" DESTINATION "${DSLINK_INSTALL_INCLUDE_DIR}" COMPONENT dev FILES_MATCHING PATTERN "*.h")
install(DIRECTORY "${CMAKE_CURRENT_LIST_DIR}/deps/wslay/lib/includes/" DESTINATION "${DSLINK_INSTALL_INCLUDE_DIR}" COMPONENT dev FILES_MATCHING PATTERN "*.h")
install(DIRECTORY "${CMAKE_CURRENT_LIST_DIR}/deps/wslay/lib/" DESTINATION "${DSLINK_INSTALL_INCLUDE_DIR}" COMPONENT dev FILES_MATCHING PATTERN "*.h")
install(DIRECTORY "${CMAKE_CURRENT_LIST_DIR}/deps/msgpack-c/include/" DESTINATION "${DSLINK_INSTALL_INCLUDE_DIR}" COMPONENT dev FILES_MATCHING PATTERN "*.h")
install(DIRECTORY "${CMAKE_CURRENT_LIST_DIR}/deps/jansson/src/" DESTINATION "${DSLINK_INSTALL_INCLUDE_DIR}" COMPONENT dev FILES_MATCHING PATTERN "*.h")
endif()

Expand Down
4 changes: 3 additions & 1 deletion broker/include/broker/net/ws.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ extern "C" {


void broker_ws_send_init(Socket *sock, const char *accept);

uint32_t broker_ws_send_obj(RemoteDSLink *link, json_t *obj);
uint32_t broker_ws_send_obj_link_id(struct Broker* broker, const char *link_name, int upstream, json_t *obj);
int broker_ws_send(RemoteDSLink *link, const char *data);
int broker_ws_send_ping(RemoteDSLink *link);
int broker_ws_send(RemoteDSLink *link, const char *data, int len, int opcode);
int broker_ws_generate_accept_key(const char *buf, size_t bufLen,
char *out, size_t outLen);
int broker_count_json_msg(json_t *json);
Expand Down
2 changes: 2 additions & 0 deletions broker/include/broker/remote_dslink.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ typedef struct RemoteDSLink {
Map responder_streams;

PermissionGroups permission_groups;

int is_msgpack;
} RemoteDSLink;

int broker_remote_dslink_init(RemoteDSLink *link);
Expand Down
25 changes: 22 additions & 3 deletions broker/src/handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,25 @@ json_t *broker_handshake_handle_conn(Broker *broker,
}
json_object_set_new_nocheck(resp, "path", json_string_nocheck(buf));

// FORMATS
link->is_msgpack = 0;
json_t* formats_from_link = json_object_get(handshake, "formats");

if(formats_from_link != NULL)
{
int arr_size = json_array_size(formats_from_link);

for(int i = 0; i < arr_size; i++)
{
int ret = strcmp("msgpack", json_string_value(json_array_get(formats_from_link, i)));
if(ret == 0)
link->is_msgpack = 1;
}
}

json_object_set_new_nocheck(resp, "format", json_string_nocheck(
link->is_msgpack == 1?"msgpack":"json"));

link->path = dslink_strdup(buf);
if (!link->path) {
goto fail;
Expand Down Expand Up @@ -256,12 +275,12 @@ void dslink_handle_ping(uv_timer_t* handle) {
struct timeval current_time;
gettimeofday(&current_time, NULL);
long time_diff = current_time.tv_sec - link->lastWriteTime->tv_sec;
if (time_diff >= 60) {
if (time_diff >= 30) {
log_debug("dslink_handle_ping send heartbeat to %s\n", link->name );
broker_ws_send_obj(link, json_object());
broker_ws_send_ping(link);
}
} else {
broker_ws_send_obj(link, json_object());
broker_ws_send_ping(link);
}

if (link->lastReceiveTime) {
Expand Down
106 changes: 99 additions & 7 deletions broker/src/net/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

#include <dslink/utils.h>

#include <msgpack.h>
#include <dslink/ws.h>

#define BROKER_WS_RESP "HTTP/1.1 101 Switching Protocols\r\n" \
"Upgrade: websocket\r\n" \
"Connection: Upgrade\r\n" \
Expand Down Expand Up @@ -75,13 +78,47 @@ uint32_t broker_ws_send_obj(RemoteDSLink *link, json_t *obj) {
link->msgId = 0;
}
json_object_set_new_nocheck(obj, "msg", json_integer(id));
char *data = json_dumps(obj, JSON_PRESERVE_ORDER | JSON_COMPACT);

// DECODE OBJ
char *data = NULL;
int len;
int opcode;

log_debug("Message(as %s) is trying sent to %s: %s\n",
(link->is_msgpack==1)?"msgpack":"json",
(char *) link->dsId->data,
json_dumps(obj,JSON_INDENT(0)));

if(link->is_msgpack) {
msgpack_sbuffer* buff = dslink_ws_json_to_msgpack(obj);
data = malloc(buff->size);
len = buff->size;
memcpy(data, buff->data, len);
msgpack_sbuffer_free(buff);
opcode = WSLAY_BINARY_FRAME;
}
else {
data = json_dumps(obj, JSON_PRESERVE_ORDER | JSON_COMPACT);
len = strlen(data);
opcode = WSLAY_TEXT_FRAME;
}

json_object_del(obj, "msg");

if (!data) {
return DSLINK_ALLOC_ERR;
}
int sentBytes = broker_ws_send(link, data);

int sentBytes = broker_ws_send(link, data, len, opcode);

if(sentBytes == -1)
{
log_err("Message(as %s) is failed sent to %s: %s\n",
(link->is_msgpack==1)?"msgpack":"json",
(char *) link->dsId->data,
json_dumps(obj,JSON_INDENT(0)));
}

if (throughput_output_needed()) {
int sentMessages = broker_count_json_msg(obj);
throughput_add_output(sentBytes, sentMessages);
Expand All @@ -90,23 +127,78 @@ uint32_t broker_ws_send_obj(RemoteDSLink *link, json_t *obj) {
return id;
}

int broker_ws_send(RemoteDSLink *link, const char *data) {
int broker_ws_send_ping(RemoteDSLink *link) {
json_t *obj = json_object();

// DECODE OBJ
char* data = NULL;
int len;
int opcode;

log_debug("Message (Ping)(as %s) is trying sent to %s\n",
(link->is_msgpack==1)?"msgpack":"json",
(char *) link->dsId->data);

if(link->is_msgpack) {
msgpack_sbuffer* buff = dslink_ws_json_to_msgpack(obj);
data = malloc(buff->size);
len = buff->size;
memcpy(data, buff->data, len);
msgpack_sbuffer_free(buff);
opcode = WSLAY_BINARY_FRAME;
}
else {
data = json_dumps(obj, JSON_PRESERVE_ORDER | JSON_COMPACT);
len = strlen(data);
opcode = WSLAY_TEXT_FRAME;
}

if (!data) {
return DSLINK_ALLOC_ERR;
}

int sentBytes = broker_ws_send(link, data, len, opcode);

if(sentBytes == -1)
{
log_err("Message (Ping)(as %s) is failed sent to %s\n",
(link->is_msgpack==1)?"msgpack":"json",
(char *) link->dsId->data);
}

if (throughput_output_needed()) {
int sentMessages = broker_count_json_msg(obj);
throughput_add_output(sentBytes, sentMessages);
}


json_delete(obj);

dslink_free(data);
return 0;
}

int broker_ws_send(RemoteDSLink *link, const char *data, int len, int opcode) {
if (!link->ws || !link->client) {
return -1;
}
struct wslay_event_msg msg;
msg.msg = (const uint8_t *) data;
msg.msg_length = strlen(data);
msg.opcode = WSLAY_TEXT_FRAME;
msg.msg_length = len;
msg.opcode = opcode;
wslay_event_queue_msg(link->ws, &msg);

if(link->client->poll && !uv_is_closing((uv_handle_t*)link->client->poll)) {
uv_poll_start(link->client->poll, UV_READABLE | UV_WRITABLE, link->client->poll_cb);

if (link->isUpstream) {
log_debug("Message sent to upstrem %s: %s\n", (char *) link->name, data);
log_debug("Message(%s) sent to upstream %s: %s\n",
(opcode==WSLAY_TEXT_FRAME)?"text":"binary",
(char *) link->name, data);
} else {
log_debug("Message sent to %s: %s\n", (char *) link->dsId->data, data);
log_debug("Message(%s) sent to %s: %s\n",
(opcode==WSLAY_TEXT_FRAME)?"text":"binary",
(char *) link->dsId->data, data);
}

return (int)msg.msg_length;
Expand Down
90 changes: 61 additions & 29 deletions broker/src/net/ws_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

#include "broker/msg/msg_handler.h"
#include "broker/net/ws.h"
#include <msgpack.h>
#include <dslink/ws.h>
#include <dslink/utils.h>

ssize_t broker_want_read_cb(wslay_event_context_ptr ctx,
uint8_t *buf, size_t len,
Expand Down Expand Up @@ -112,40 +115,69 @@ void broker_on_ws_data(wslay_event_context_ptr ctx,
}
gettimeofday(link->lastReceiveTime, NULL);

if (arg->opcode == WSLAY_TEXT_FRAME) {
if (arg->msg_length == 2
&& arg->msg[0] == '{'
&& arg->msg[1] == '}') {
broker_ws_send(link, "{}");
return;
}
if (arg->opcode == WSLAY_CONNECTION_CLOSE) {
link->pendingClose = 1;
return;
}

json_t *data = NULL;
int is_recv_data_msg_pack = 0;

if (arg->opcode == WSLAY_TEXT_FRAME) {
json_error_t err;
json_t *data = json_loadb((char *) arg->msg,
arg->msg_length, 0, &err);
if (throughput_input_needed()) {
int receiveMessages = 0;
if (data) {
receiveMessages = broker_count_json_msg(data);
}
throughput_add_input(arg->msg_length, receiveMessages);
}
if (!data) {
return;
}
if (link->isUpstream) {
log_debug("Received data from upstream %s: %.*s\n", (char *) link->name,
(int) arg->msg_length, arg->msg);
} else {
log_debug("Received data from %s: %.*s\n", (char *) link->dsId->data,
(int) arg->msg_length, arg->msg);
data = json_loadb((char *) arg->msg,
arg->msg_length, 0, &err);
}
else if(arg->opcode == WSLAY_BINARY_FRAME)
{
msgpack_unpacked msg;
msgpack_unpacked_init(&msg);
msgpack_unpack_next(&msg, (char *) arg->msg, arg->msg_length, NULL);

/* prints the deserialized object. */
msgpack_object obj = msg.data;

data = dslink_ws_msgpack_to_json(&obj);
is_recv_data_msg_pack = 1;
}

// Check whether it is ping or not
if(data != NULL && json_object_iter(data) == NULL)
{
log_debug("Ping received (as %s), responding back...\n", is_recv_data_msg_pack?"msgpack":"json");
broker_ws_send_ping(link);
return;
}



if (throughput_input_needed()) {
int receiveMessages = 0;
if (data) {
receiveMessages = broker_count_json_msg(data);
}
throughput_add_input(arg->msg_length, receiveMessages);
}
if (!data) {
return;
}

broker_msg_handle(link, data);
json_decref(data);
} else if (arg->opcode == WSLAY_CONNECTION_CLOSE) {
link->pendingClose = 1;
if (link->isUpstream) {
log_debug("Received data (as %s) from upstream %s: %.*s\n",
(is_recv_data_msg_pack==1)?"msgpack":"json",
(char *) link->name,
(int) arg->msg_length, json_dumps(data, JSON_INDENT(0)));
} else {
log_debug("Received data (as %s) from %s: %.*s\n",
(is_recv_data_msg_pack==1)?"msgpack":"json",
(char *) link->dsId->data,
(int) arg->msg_length, json_dumps(data, JSON_INDENT(0)));
}

broker_msg_handle(link, data);
json_decref(data);

return;
}

const struct wslay_event_callbacks *broker_ws_callbacks() {
Expand Down
19 changes: 18 additions & 1 deletion broker/src/upstream/upstream_handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,25 @@ void connect_conn_callback(uv_poll_t *handle, int status, int events) {
goto exit;
}

const char *format = json_string_value(json_object_get(handshake, "format"));

// TODO: only remoteDSLink also works, but idk about client, should check in refactor.
upstreamPoll->clientDslink->is_msgpack = 0;
upstreamPoll->remoteDSLink->is_msgpack = 0;

if((format != NULL) && (strcmp(format, "msgpack") == 0)) {
upstreamPoll->clientDslink->is_msgpack = 0;
upstreamPoll->remoteDSLink->is_msgpack = 1;
}

const char* format_str = upstreamPoll->remoteDSLink->is_msgpack?"msgpack":"json";

log_info("Format was decided as %s by server\n", format_str);



if ((dslink_handshake_connect_ws(upstreamPoll->clientDslink->config.broker_url, &upstreamPoll->clientDslink->key, uri,
tKey, salt, upstreamPoll->dsId, NULL, &upstreamPoll->sock)) != 0) {
tKey, salt, upstreamPoll->dsId, NULL, format_str, &upstreamPoll->sock)) != 0) {
upstream_reconnect(upstreamPoll);
goto exit;
} else {
Expand Down
Loading