Skip to content

Commit

Permalink
Merge pull request #30 from tarantool/fix-yields-in-gc
Browse files Browse the repository at this point in the history
Removed _gc methods with yields
  • Loading branch information
RepentantGopher authored Dec 10, 2020
2 parents 7fe2a09 + 1e55f1e commit 44c01aa
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 76 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ tests-run:
pytest -W ignore -vv && \
deactivate

test-run-with-docker: tests-dep docker-run-all
test-sleep: tests-dep docker-run-all
sleep 10

test-run-with-docker: test-sleep
docker run \
--net=${NETWORK} \
--rm confluentinc/cp-kafka:5.0.0 \
Expand Down Expand Up @@ -151,6 +152,8 @@ test-run-with-docker: tests-dep docker-run-all
kafka-topics --create --topic test_consuming_from_last_committed_offset --partitions 1 --replication-factor 1 \
--if-not-exists --zookeeper zookeeper:2181

sleep 5

cd ./tests && \
python3 -m venv venv && \
. venv/bin/activate && \
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ Local run in docker:
# Developing

## Tests
Before run any test you should add to `/etc/hosts` entry
```
127.0.0.1 kafka
```

You can run docker based integration tests via makefile target
```bash
make test-run-with-docker
Expand Down
2 changes: 2 additions & 0 deletions kafka/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <lualib.h>
#include <lauxlib.h>

#include <tarantool/module.h>

#include <common.h>

const char* const consumer_label = "__tnt_kafka_consumer";
Expand Down
2 changes: 2 additions & 0 deletions kafka/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <lualib.h>
#include <lauxlib.h>

#include <tarantool/module.h>

const char* const consumer_label;
const char* const consumer_msg_label;
const char* const producer_label;
Expand Down
44 changes: 23 additions & 21 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ consumer_poll_loop(void *arg) {
// FIXME: push errors to error queue?
rd_kafka_message_destroy(rd_msg);
} else {
msg_t *msg = new_consumer_msg(rd_msg);
// free rdkafka message instantly to prevent hang on close / destroy consumer
rd_kafka_message_destroy(rd_msg);
rd_msg = NULL;

pthread_mutex_lock(&event_queues->consume_queue->lock);

queue_lockfree_push(event_queues->consume_queue, rd_msg);
queue_lockfree_push(event_queues->consume_queue, msg);
count = event_queues->consume_queue->count;

pthread_mutex_unlock(&event_queues->consume_queue->lock);
Expand Down Expand Up @@ -225,18 +230,14 @@ lua_consumer_poll_msg(struct lua_State *L) {
int msgs_limit = lua_tonumber(L, 2);

lua_createtable(L, msgs_limit, 0);
rd_kafka_message_t *rd_msg = NULL;
msg_t *msg = NULL;
while (msgs_limit > counter) {
rd_msg = queue_pop(consumer->event_queues->consume_queue);
if (rd_msg == NULL) {
msg = queue_pop(consumer->event_queues->consume_queue);
if (msg == NULL) {
break;
}
counter += 1;

msg_t *msg;
msg = malloc(sizeof(msg_t));
msg->rd_message = rd_msg;

msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg));
*msg_p = msg;

Expand Down Expand Up @@ -500,7 +501,7 @@ lua_consumer_store_offset(struct lua_State *L) {
luaL_error(L, "Usage: err = consumer:store_offset(msg)");

msg_t *msg = lua_check_consumer_msg(L, 2);
rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->rd_message->rkt, msg->rd_message->partition, msg->rd_message->offset);
rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->topic, msg->partition, msg->offset);
if (err) {
const char *const_err_str = rd_kafka_err2str(err);
char err_str[512];
Expand All @@ -523,6 +524,14 @@ wait_consumer_close(va_list args) {
return 0;
}

static ssize_t
wait_consumer_destroy(va_list args) {
rd_kafka_t *rd_kafka = va_arg(args, rd_kafka_t *);
// prevents hanging forever
rd_kafka_destroy_flags(rd_kafka, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
return 0;
}

static rd_kafka_resp_err_t
consumer_destroy(struct lua_State *L, consumer_t *consumer) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Expand All @@ -531,11 +540,6 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
rd_kafka_topic_partition_list_destroy(consumer->topics);
}

// trying to close in background until success
while (coio_call(wait_consumer_close, consumer->rd_consumer) == -1) {
// FIXME: maybe send errors to error queue?
}

if (consumer->poller != NULL) {
destroy_consumer_poller(consumer->poller);
}
Expand All @@ -547,7 +551,7 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
if (consumer->rd_consumer != NULL) {
/* Destroy handle */
// FIXME: kafka_destroy hangs forever
// coio_call(kafka_destroy, consumer->rd_consumer);
coio_call(wait_consumer_destroy, consumer->rd_consumer);
}

free(consumer);
Expand All @@ -563,22 +567,20 @@ lua_consumer_close(struct lua_State *L) {
return 1;
}

rd_kafka_commit((*consumer_p)->rd_consumer, NULL, 0); // sync commit of current offsets
rd_kafka_unsubscribe((*consumer_p)->rd_consumer);

// trying to close in background until success
while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) {
// FIXME: maybe send errors to error queue?
}

if ((*consumer_p)->poller != NULL) {
destroy_consumer_poller((*consumer_p)->poller);
(*consumer_p)->poller = NULL;
}

lua_pushboolean(L, 1);
return 1;
}

int
lua_consumer_gc(struct lua_State *L) {
lua_consumer_destroy(struct lua_State *L) {
consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label);
if (consumer_p && *consumer_p) {
consumer_destroy(L, *consumer_p);
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ int lua_consumer_store_offset(struct lua_State *L);

int lua_consumer_close(struct lua_State *L);

int lua_consumer_gc(struct lua_State *L);
int lua_consumer_destroy(struct lua_State *L);

int lua_create_consumer(struct lua_State *L);

Expand Down
90 changes: 66 additions & 24 deletions kafka/consumer_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,83 +22,83 @@ lua_check_consumer_msg(struct lua_State *L, int index) {
int
lua_consumer_msg_topic(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);
lua_pushstring(L, rd_kafka_topic_name(msg->rd_message->rkt));
lua_pushstring(L, rd_kafka_topic_name(msg->topic));
return 1;
}

int
lua_consumer_msg_partition(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);

lua_pushnumber(L, (double)msg->rd_message->partition);
lua_pushnumber(L, (double)msg->partition);
return 1;
}

int
lua_consumer_msg_offset(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);

luaL_pushint64(L, msg->rd_message->offset);
luaL_pushint64(L, msg->offset);
return 1;
}

int
lua_consumer_msg_key(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);

if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) {
if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) {
return 0;
}

lua_pushlstring(L, (char*)msg->rd_message->key, msg->rd_message->key_len);
lua_pushlstring(L, msg->key, msg->key_len);
return 1;
}

int
lua_consumer_msg_value(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);

if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) {
if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) {
return 0;
}

lua_pushlstring(L, (char*)msg->rd_message->payload, msg->rd_message->len);
lua_pushlstring(L, msg->value, msg->value_len);
return 1;
}

int
lua_consumer_msg_tostring(struct lua_State *L) {
msg_t *msg = lua_check_consumer_msg(L, 1);

size_t key_len = msg->rd_message->key_len <= 0 ? 5: msg->rd_message->key_len + 1;
size_t key_len = msg->key_len <= 0 ? 5: msg->key_len + 1;
char key[key_len];

if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) {
if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) {
strncpy(key, "NULL", 5);
} else {
strncpy(key, msg->rd_message->key, msg->rd_message->key_len + 1);
if (key[msg->rd_message->key_len] != '\0') {
key[msg->rd_message->key_len] = '\0';
strncpy(key, msg->key, msg->key_len + 1);
if (key[msg->key_len] != '\0') {
key[msg->key_len] = '\0';
}
}

size_t value_len = msg->rd_message->len <= 0 ? 5: msg->rd_message->len + 1;
size_t value_len = msg->value_len <= 0 ? 5: msg->value_len + 1;
char value[value_len];

if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) {
if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) {
strncpy(value, "NULL", 5);
} else {
strncpy(value, msg->rd_message->payload, msg->rd_message->len + 1);
if (value[msg->rd_message->len] != '\0') {
value[msg->rd_message->len] = '\0';
strncpy(value, msg->value, msg->value_len + 1);
if (value[msg->value_len] != '\0') {
value[msg->value_len] = '\0';
}
}

lua_pushfstring(L,
"Kafka Consumer Message: topic=%s partition=%d offset=%d key=%s value=%s",
rd_kafka_topic_name(msg->rd_message->rkt),
msg->rd_message->partition,
msg->rd_message->offset,
rd_kafka_topic_name(msg->topic),
msg->partition,
msg->offset,
key,
value);
return 1;
Expand All @@ -108,13 +108,55 @@ int
lua_consumer_msg_gc(struct lua_State *L) {
msg_t **msg_p = (msg_t **)luaL_checkudata(L, 1, consumer_msg_label);
if (msg_p && *msg_p) {
if ((*msg_p)->rd_message != NULL) {
rd_kafka_message_destroy((*msg_p)->rd_message);
}
free(*msg_p);
destroy_consumer_msg(*msg_p);
}
if (msg_p)
*msg_p = NULL;

return 0;
}

msg_t *
new_consumer_msg(rd_kafka_message_t *rd_message) {
msg_t *msg;
msg = malloc(sizeof(msg_t));
msg->topic = rd_message->rkt;
msg->partition = rd_message->partition;

// value
if (rd_message->len > 0) {
msg->value = malloc(rd_message->len);
memcpy(msg->value, rd_message->payload, rd_message->len);
}
msg->value_len = rd_message->len;

// key
if (rd_message->key_len > 0) {
msg->key = malloc(rd_message->key_len);
memcpy(msg->key, rd_message->key, rd_message->key_len);
}
msg->key_len = rd_message->key_len;

msg->offset = rd_message->offset;

return msg;
}

void
destroy_consumer_msg(msg_t *msg) {
if (msg == NULL) {
return;
}

if (msg->key != NULL) {
free(msg->key);
}

if (msg->value != NULL) {
free(msg->value);
}

free(msg);

return;
}
12 changes: 11 additions & 1 deletion kafka/consumer_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@
* Consumer Message
*/
typedef struct {
rd_kafka_message_t *rd_message;
rd_kafka_topic_t *topic;
int32_t partition;
char *value;
size_t value_len;
char *key;
size_t key_len;
int64_t offset;
} msg_t;

msg_t *lua_check_consumer_msg(struct lua_State *L, int index);

msg_t *new_consumer_msg(rd_kafka_message_t *rd_message);

void destroy_consumer_msg(msg_t *msg);

int lua_consumer_msg_topic(struct lua_State *L);

int lua_consumer_msg_partition(struct lua_State *L);
Expand Down
Loading

0 comments on commit 44c01aa

Please sign in to comment.