diff --git a/Makefile b/Makefile index 65a3e5b..8019584 100644 --- a/Makefile +++ b/Makefile @@ -100,9 +100,10 @@ tests-run: pytest -W ignore -vv && \ deactivate -test-run-with-docker: tests-dep docker-run-all +test-sleep: tests-dep docker-run-all sleep 10 +test-run-with-docker: test-sleep docker run \ --net=${NETWORK} \ --rm confluentinc/cp-kafka:5.0.0 \ @@ -151,6 +152,8 @@ test-run-with-docker: tests-dep docker-run-all kafka-topics --create --topic test_consuming_from_last_committed_offset --partitions 1 --replication-factor 1 \ --if-not-exists --zookeeper zookeeper:2181 + sleep 5 + cd ./tests && \ python3 -m venv venv && \ . venv/bin/activate && \ diff --git a/README.md b/README.md index 067295c..8b8717c 100644 --- a/README.md +++ b/README.md @@ -379,6 +379,11 @@ Local run in docker: # Developing ## Tests +Before run any test you should add to `/etc/hosts` entry +``` +127.0.0.1 kafka +``` + You can run docker based integration tests via makefile target ```bash make test-run-with-docker diff --git a/kafka/common.c b/kafka/common.c index e09a450..7476290 100644 --- a/kafka/common.c +++ b/kafka/common.c @@ -2,6 +2,8 @@ #include #include +#include + #include const char* const consumer_label = "__tnt_kafka_consumer"; diff --git a/kafka/common.h b/kafka/common.h index a42f769..7b160a4 100644 --- a/kafka/common.h +++ b/kafka/common.h @@ -14,6 +14,8 @@ #include #include +#include + const char* const consumer_label; const char* const consumer_msg_label; const char* const producer_label; diff --git a/kafka/consumer.c b/kafka/consumer.c index b124006..5f68b4c 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -46,9 +46,14 @@ consumer_poll_loop(void *arg) { // FIXME: push errors to error queue? rd_kafka_message_destroy(rd_msg); } else { + msg_t *msg = new_consumer_msg(rd_msg); + // free rdkafka message instantly to prevent hang on close / destroy consumer + rd_kafka_message_destroy(rd_msg); + rd_msg = NULL; + pthread_mutex_lock(&event_queues->consume_queue->lock); - queue_lockfree_push(event_queues->consume_queue, rd_msg); + queue_lockfree_push(event_queues->consume_queue, msg); count = event_queues->consume_queue->count; pthread_mutex_unlock(&event_queues->consume_queue->lock); @@ -225,18 +230,14 @@ lua_consumer_poll_msg(struct lua_State *L) { int msgs_limit = lua_tonumber(L, 2); lua_createtable(L, msgs_limit, 0); - rd_kafka_message_t *rd_msg = NULL; + msg_t *msg = NULL; while (msgs_limit > counter) { - rd_msg = queue_pop(consumer->event_queues->consume_queue); - if (rd_msg == NULL) { + msg = queue_pop(consumer->event_queues->consume_queue); + if (msg == NULL) { break; } counter += 1; - msg_t *msg; - msg = malloc(sizeof(msg_t)); - msg->rd_message = rd_msg; - msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg)); *msg_p = msg; @@ -500,7 +501,7 @@ lua_consumer_store_offset(struct lua_State *L) { luaL_error(L, "Usage: err = consumer:store_offset(msg)"); msg_t *msg = lua_check_consumer_msg(L, 2); - rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->rd_message->rkt, msg->rd_message->partition, msg->rd_message->offset); + rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->topic, msg->partition, msg->offset); if (err) { const char *const_err_str = rd_kafka_err2str(err); char err_str[512]; @@ -523,6 +524,14 @@ wait_consumer_close(va_list args) { return 0; } +static ssize_t +wait_consumer_destroy(va_list args) { + rd_kafka_t *rd_kafka = va_arg(args, rd_kafka_t *); + // prevents hanging forever + rd_kafka_destroy_flags(rd_kafka, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); + return 0; +} + static rd_kafka_resp_err_t consumer_destroy(struct lua_State *L, consumer_t *consumer) { rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; @@ -531,11 +540,6 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) { rd_kafka_topic_partition_list_destroy(consumer->topics); } - // trying to close in background until success - while (coio_call(wait_consumer_close, consumer->rd_consumer) == -1) { - // FIXME: maybe send errors to error queue? - } - if (consumer->poller != NULL) { destroy_consumer_poller(consumer->poller); } @@ -547,7 +551,7 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) { if (consumer->rd_consumer != NULL) { /* Destroy handle */ // FIXME: kafka_destroy hangs forever -// coio_call(kafka_destroy, consumer->rd_consumer); + coio_call(wait_consumer_destroy, consumer->rd_consumer); } free(consumer); @@ -563,22 +567,20 @@ lua_consumer_close(struct lua_State *L) { return 1; } + rd_kafka_commit((*consumer_p)->rd_consumer, NULL, 0); // sync commit of current offsets + rd_kafka_unsubscribe((*consumer_p)->rd_consumer); + // trying to close in background until success while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) { // FIXME: maybe send errors to error queue? } - if ((*consumer_p)->poller != NULL) { - destroy_consumer_poller((*consumer_p)->poller); - (*consumer_p)->poller = NULL; - } - lua_pushboolean(L, 1); return 1; } int -lua_consumer_gc(struct lua_State *L) { +lua_consumer_destroy(struct lua_State *L) { consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); if (consumer_p && *consumer_p) { consumer_destroy(L, *consumer_p); diff --git a/kafka/consumer.h b/kafka/consumer.h index 9949dec..8d87880 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -50,7 +50,7 @@ int lua_consumer_store_offset(struct lua_State *L); int lua_consumer_close(struct lua_State *L); -int lua_consumer_gc(struct lua_State *L); +int lua_consumer_destroy(struct lua_State *L); int lua_create_consumer(struct lua_State *L); diff --git a/kafka/consumer_msg.c b/kafka/consumer_msg.c index 750bf3a..6e3216c 100644 --- a/kafka/consumer_msg.c +++ b/kafka/consumer_msg.c @@ -22,7 +22,7 @@ lua_check_consumer_msg(struct lua_State *L, int index) { int lua_consumer_msg_topic(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - lua_pushstring(L, rd_kafka_topic_name(msg->rd_message->rkt)); + lua_pushstring(L, rd_kafka_topic_name(msg->topic)); return 1; } @@ -30,7 +30,7 @@ int lua_consumer_msg_partition(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - lua_pushnumber(L, (double)msg->rd_message->partition); + lua_pushnumber(L, (double)msg->partition); return 1; } @@ -38,7 +38,7 @@ int lua_consumer_msg_offset(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - luaL_pushint64(L, msg->rd_message->offset); + luaL_pushint64(L, msg->offset); return 1; } @@ -46,11 +46,11 @@ int lua_consumer_msg_key(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) { + if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) { return 0; } - lua_pushlstring(L, (char*)msg->rd_message->key, msg->rd_message->key_len); + lua_pushlstring(L, msg->key, msg->key_len); return 1; } @@ -58,11 +58,11 @@ int lua_consumer_msg_value(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) { + if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) { return 0; } - lua_pushlstring(L, (char*)msg->rd_message->payload, msg->rd_message->len); + lua_pushlstring(L, msg->value, msg->value_len); return 1; } @@ -70,35 +70,35 @@ int lua_consumer_msg_tostring(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - size_t key_len = msg->rd_message->key_len <= 0 ? 5: msg->rd_message->key_len + 1; + size_t key_len = msg->key_len <= 0 ? 5: msg->key_len + 1; char key[key_len]; - if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) { + if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) { strncpy(key, "NULL", 5); } else { - strncpy(key, msg->rd_message->key, msg->rd_message->key_len + 1); - if (key[msg->rd_message->key_len] != '\0') { - key[msg->rd_message->key_len] = '\0'; + strncpy(key, msg->key, msg->key_len + 1); + if (key[msg->key_len] != '\0') { + key[msg->key_len] = '\0'; } } - size_t value_len = msg->rd_message->len <= 0 ? 5: msg->rd_message->len + 1; + size_t value_len = msg->value_len <= 0 ? 5: msg->value_len + 1; char value[value_len]; - if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) { + if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) { strncpy(value, "NULL", 5); } else { - strncpy(value, msg->rd_message->payload, msg->rd_message->len + 1); - if (value[msg->rd_message->len] != '\0') { - value[msg->rd_message->len] = '\0'; + strncpy(value, msg->value, msg->value_len + 1); + if (value[msg->value_len] != '\0') { + value[msg->value_len] = '\0'; } } lua_pushfstring(L, "Kafka Consumer Message: topic=%s partition=%d offset=%d key=%s value=%s", - rd_kafka_topic_name(msg->rd_message->rkt), - msg->rd_message->partition, - msg->rd_message->offset, + rd_kafka_topic_name(msg->topic), + msg->partition, + msg->offset, key, value); return 1; @@ -108,13 +108,55 @@ int lua_consumer_msg_gc(struct lua_State *L) { msg_t **msg_p = (msg_t **)luaL_checkudata(L, 1, consumer_msg_label); if (msg_p && *msg_p) { - if ((*msg_p)->rd_message != NULL) { - rd_kafka_message_destroy((*msg_p)->rd_message); - } - free(*msg_p); + destroy_consumer_msg(*msg_p); } if (msg_p) *msg_p = NULL; return 0; +} + +msg_t * +new_consumer_msg(rd_kafka_message_t *rd_message) { + msg_t *msg; + msg = malloc(sizeof(msg_t)); + msg->topic = rd_message->rkt; + msg->partition = rd_message->partition; + + // value + if (rd_message->len > 0) { + msg->value = malloc(rd_message->len); + memcpy(msg->value, rd_message->payload, rd_message->len); + } + msg->value_len = rd_message->len; + + // key + if (rd_message->key_len > 0) { + msg->key = malloc(rd_message->key_len); + memcpy(msg->key, rd_message->key, rd_message->key_len); + } + msg->key_len = rd_message->key_len; + + msg->offset = rd_message->offset; + + return msg; +} + +void +destroy_consumer_msg(msg_t *msg) { + if (msg == NULL) { + return; + } + + if (msg->key != NULL) { + free(msg->key); + } + + if (msg->value != NULL) { + free(msg->value); + } + + free(msg); + + return; } \ No newline at end of file diff --git a/kafka/consumer_msg.h b/kafka/consumer_msg.h index 36c138b..6975745 100644 --- a/kafka/consumer_msg.h +++ b/kafka/consumer_msg.h @@ -12,11 +12,21 @@ * Consumer Message */ typedef struct { - rd_kafka_message_t *rd_message; + rd_kafka_topic_t *topic; + int32_t partition; + char *value; + size_t value_len; + char *key; + size_t key_len; + int64_t offset; } msg_t; msg_t *lua_check_consumer_msg(struct lua_State *L, int index); +msg_t *new_consumer_msg(rd_kafka_message_t *rd_message); + +void destroy_consumer_msg(msg_t *msg); + int lua_consumer_msg_topic(struct lua_State *L); int lua_consumer_msg_partition(struct lua_State *L); diff --git a/kafka/init.lua b/kafka/init.lua index 2c1d3e3..56da486 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -48,18 +48,6 @@ function Consumer.create(config) return new, nil end -function Consumer:_poll() - local err - while true do - err = self._consumer:poll() - if err ~= nil then - log.error(err) - end - end -end - -jit.off(Consumer._poll) - function Consumer:_poll_msg() local msgs while true do @@ -136,16 +124,16 @@ end jit.off(Consumer._poll_rebalances) function Consumer:close() - self._poll_msg_fiber:cancel() - self._output_ch:close() - - fiber.yield() - local ok, err = self._consumer:close() if err ~= nil then return ok, err end + self._poll_msg_fiber:cancel() + self._output_ch:close() + + fiber.yield() + if self._poll_logs_fiber ~= nil then self._poll_logs_fiber:cancel() end @@ -156,6 +144,8 @@ function Consumer:close() self._poll_rebalances_fiber:cancel() end + self._consumer:destroy() + self._consumer = nil return ok, err @@ -313,6 +303,8 @@ function Producer:close() self._poll_errors_fiber:cancel() end + self._producer:destroy() + self._producer = nil return ok, err diff --git a/kafka/producer.c b/kafka/producer.c index 3bb7375..3ce5118 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -408,6 +408,13 @@ producer_flush(va_list args) { return 0; } +static ssize_t +wait_producer_destroy(va_list args) { + rd_kafka_t *rd_kafka = va_arg(args, rd_kafka_t *); + rd_kafka_destroy(rd_kafka); + return 0; +} + void destroy_producer(struct lua_State *L, producer_t *producer) { if (producer->rd_producer != NULL) { @@ -427,9 +434,8 @@ destroy_producer(struct lua_State *L, producer_t *producer) { } if (producer->rd_producer != NULL) { - // FIXME: if instance of producer exists then kafka_destroy always hangs forever /* Destroy handle */ -// coio_call(kafka_destroy, producer->rd_producer); + coio_call(wait_producer_destroy, producer->rd_producer); } free(producer); @@ -451,12 +457,13 @@ lua_producer_close(struct lua_State *L) { destroy_producer_poller((*producer_p)->poller); (*producer_p)->poller = NULL; } + lua_pushboolean(L, 1); return 1; } int -lua_producer_gc(struct lua_State *L) { +lua_producer_destroy(struct lua_State *L) { producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); if (producer_p && *producer_p) { destroy_producer(L, *producer_p); diff --git a/kafka/producer.h b/kafka/producer.h index 662e558..fd2d9c4 100644 --- a/kafka/producer.h +++ b/kafka/producer.h @@ -55,8 +55,8 @@ int lua_producer_produce(struct lua_State *L); int lua_producer_close(struct lua_State *L); -int lua_producer_gc(struct lua_State *L); - int lua_create_producer(struct lua_State *L); +int lua_producer_destroy(struct lua_State *L); + #endif //TNT_KAFKA_PRODUCER_H diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index 3afc933..0369cef 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -25,8 +25,8 @@ luaopen_kafka_tntkafka(lua_State *L) { {"poll_rebalances", lua_consumer_poll_rebalances}, {"store_offset", lua_consumer_store_offset}, {"close", lua_consumer_close}, + {"destroy", lua_consumer_destroy}, {"__tostring", lua_consumer_tostring}, - {"__gc", lua_consumer_gc}, {NULL, NULL} }; @@ -63,8 +63,8 @@ luaopen_kafka_tntkafka(lua_State *L) { {"poll_logs", lua_producer_poll_logs}, {"poll_errors", lua_producer_poll_errors}, {"close", lua_producer_close}, + {"destroy", lua_producer_destroy}, {"__tostring", lua_producer_tostring}, - {"__gc", lua_producer_gc}, {NULL, NULL} }; diff --git a/librdkafka b/librdkafka index 3bdf2d0..dbafbb7 160000 --- a/librdkafka +++ b/librdkafka @@ -1 +1 @@ -Subproject commit 3bdf2d05fad1ef699910f7f9e7152c58aa24375e +Subproject commit dbafbb748c116b03a18b42a10b845eeb5517d03e diff --git a/tests/consumer.lua b/tests/consumer.lua index 609fe72..1f2dea4 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -100,6 +100,8 @@ local function consume(timeout) if err ~= nil then log.error("got error '%s' while commiting msg from topic '%s'", err, msg:topic()) end + else + fiber.sleep(0.2) end end end) diff --git a/tests/requirements.txt b/tests/requirements.txt index 4b89f49..94ad953 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,4 +1,4 @@ pytest -kafka-python==1.4.3 -aiokafka -tarantool \ No newline at end of file +kafka-python==2.0.0 +aiokafka==0.7.0 +tarantool==0.6.6 \ No newline at end of file