diff --git a/.gitignore b/.gitignore index 1eedbf0..252708f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea tests/venv tests/.pytest_cache -tests/__* \ No newline at end of file +tests/__* +cmake-build-debug \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..46393ae --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 2.8 FATAL_ERROR) + +project(tnt-kafka C) + +set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake" ${CMAKE_MODULE_PATH}) + +# Set CFLAGS +set(MY_C_FLAGS "-Wall -Wextra -Werror -std=gnu11 -fno-strict-aliasing -Wno-deprecated-declarations") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${MY_C_FLAGS}") +set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ${MY_C_FLAGS} -ggdb3") + +set(TARANTOOL_FIND_REQUIRED ON) +find_package(Tarantool) +# Oracle Connector requires fiber_cond API added in 1.7.4-291 +string(REPLACE "-" "." tntver ${TARANTOOL_VERSION}) +if(${tntver} VERSION_LESS 1.7.4.291) + message(FATAL_ERROR "Tarantool >= 1.7.4-291 is required") +endif() + +set(RDKAFKA_FIND_REQUIRED ON) +find_package(RdKafka) + +include_directories(${TARANTOOL_INCLUDE_DIRS}) + +add_subdirectory(tnt-kafka) diff --git a/Makefile b/Makefile index 64e202a..1e7c0a3 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ docker-read-topic-data: --net=${NETWORK} \ --rm \ confluentinc/cp-kafka:5.0.0 \ - kafka-console-consumer --bootstrap-server kafka:9092 --topic test_consumer --from-beginning + kafka-console-consumer --bootstrap-server kafka:9092 --topic test_producer --from-beginning APP_NAME = kafka-test APP_IMAGE = kafka-test-image diff --git a/README.md b/README.md index ac77904..e288b6f 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,28 @@ tnt-kafka ========= Full featured high performance kafka library for Tarantool based on [librdkafka](https://github.com/edenhill/librdkafka). -Can produce more then 80k messages per second and consume more then 130k messages per second. - -Library was tested with librdkafka v0.11.5 +Can produce more then 150k messages per second and consume more then 140k messages per second. # Features * Kafka producer and consumer implementations. * Fiber friendly. * Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess, some libraries throws lua native `error` while others throws `box.error` instead. `tnt-kafka` returns -errors as strings which allows you to decide how to handle it. +non critical errors as strings which allows you to decide how to handle it. + +# Requirements +* Tarantool >= 1.10.2 +* Tarantool development headers +* librdkafka >= 0.11.5 +* librdkafka development headers +* make +* cmake +* gcc + +# Installation +```bash + tarantoolctl rocks install https://raw.githubusercontent.com/tarantool/tnt-kafka/master/rockspecs/tnt-kafka-scm-1.rockspec +``` # Examples @@ -21,28 +33,17 @@ errors as strings which allows you to decide how to handle it. ```lua local fiber = require('fiber') local os = require('os') - local kafka_consumer = require('tnt-kafka.consumer') - - local config, err = kafka_consumer.ConsumerConfig.create( - {"localhost:9092"}, -- array of brokers - "test_consumer", -- consumer group - true, -- enable auto offset store - {["auto.offset.reset"] = "earliest"} -- default configuration for topics - ) - if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("queued.min.messages", "100000") -- set global consumer option - - local consumer, err = kafka_consumer.Consumer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:start() + local tnt_kafka = require('tnt-kafka') + + local consumer, err = tnt_kafka.Consumer.create({ + brokers = "localhost:9092", -- brokers for bootstrap + options = { + ["enable.auto.offset.store"] = "true", + ["group.id"] = "example_consumer", + ["auto.offset.reset"] = "earliest", + ["enable.partition.eof"] = "false" + }, -- options for librdkafka + }) if err ~= nil then print(err) os.exit(1) @@ -69,8 +70,8 @@ errors as strings which allows you to decide how to handle it. local msg = out:get() if msg ~= nil then print(string.format( - "got msg with topic='%s' partition='%s' offset='%s' value='%s'", - msg:topic(), msg:partition(), msg:offset(), msg:value() + "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", + msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value() )) end end @@ -78,7 +79,7 @@ errors as strings which allows you to decide how to handle it. fiber.sleep(10) - local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close + local err = consumer:close() -- always stop consumer to commit all pending offsets before app close if err ~= nil then print(err) os.exit(1) @@ -89,28 +90,17 @@ errors as strings which allows you to decide how to handle it. ```lua local fiber = require('fiber') local os = require('os') - local kafka_consumer = require('tnt-kafka.consumer') - - local config, err = kafka_consumer.ConsumerConfig.create( - {"localhost:9092"}, -- array of brokers - "test_consumer", -- consumer group - false, -- disable auto offset store - {["auto.offset.reset"] = "earliest"} -- default configuration for topics - ) - if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("queued.min.messages", "100000") -- set global consumer option - - local consumer, err = kafka_consumer.Consumer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:start() + local tnt_kafka = require('tnt-kafka') + + local consumer, err = tnt_kafka.Consumer.create({ + brokers = "localhost:9092", -- brokers for bootstrap + options = { + ["enable.auto.offset.store"] = "false", + ["group.id"] = "example_consumer", + ["auto.offset.reset"] = "earliest", + ["enable.partition.eof"] = "false" + }, -- options for librdkafka + }) if err ~= nil then print(err) os.exit(1) @@ -137,8 +127,8 @@ errors as strings which allows you to decide how to handle it. local msg = out:get() if msg ~= nil then print(string.format( - "got msg with topic='%s' partition='%s' offset='%s' value='%s'", - msg:topic(), msg:partition(), msg:offset(), msg:value() + "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", + msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value() )) local err = consumer:store_offset(msg) -- don't forget to commit processed messages @@ -155,7 +145,7 @@ errors as strings which allows you to decide how to handle it. fiber.sleep(10) - local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close + local err = consumer:close() -- always stop consumer to commit all pending offsets before app close if err ~= nil then print(err) os.exit(1) @@ -168,33 +158,12 @@ errors as strings which allows you to decide how to handle it. ```lua local os = require('os') - local kafka_producer = require('tnt-kafka.producer') + local tnt_kafka = require('tnt-kafka') - local config, err = kafka_producer.ProducerConfig.create( - {"localhost:9092"}, -- -- array of brokers - false -- sync_producer - ) - if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("statistics.interval.ms", "1000") -- set global producer option - config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic("test_topic", {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({ + brokers = "kafka:9092", -- brokers for bootstrap + options = {} -- options for librdkafka + }) if err ~= nil then print(err) os.exit(1) @@ -202,7 +171,8 @@ errors as strings which allows you to decide how to handle it. for i = 1, 1000 do local err = producer:produce_async({ -- don't wait until message will be delivired to kafka - topic = "test_topic", + topic = "test_topic", + key = "test_key", value = "test_value" -- only strings allowed }) if err ~= nil then @@ -211,7 +181,7 @@ errors as strings which allows you to decide how to handle it. end end - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -222,33 +192,12 @@ errors as strings which allows you to decide how to handle it. ```lua local fiber = require('fiber') local os = require('os') - local kafka_producer = require('tnt-kafka.producer') + local tnt_kafka = require('tnt-kafka') - local config, err = kafka_producer.ProducerConfig.create( - {"localhost:9092"}, -- -- array of brokers - true -- sync_producer - ) - if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("statistics.interval.ms", "1000") -- set global producer option - config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic("test_topic", {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({ + brokers = "kafka:9092", -- brokers for bootstrap + options = {} -- options for librdkafka + }) if err ~= nil then print(err) os.exit(1) @@ -258,7 +207,8 @@ errors as strings which allows you to decide how to handle it. fiber.create(function() local message = "test_value " .. tostring(i) local err = producer:produce({ -- wait until message will be delivired to kafka (using channel under the hood) - topic = "test_topic", + topic = "test_topic", + key = "test_key", value = message -- only strings allowed }) if err ~= nil then @@ -271,7 +221,7 @@ errors as strings which allows you to decide how to handle it. fiber.sleep(10) - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -279,14 +229,12 @@ errors as strings which allows you to decide how to handle it. ``` # Known issues -* Producer can use only random messages partitioning. It was done intentionally because non nil key -leads to segfault. -* Consumer leaves some non gc'able objects in memory after has been stopped. It was done intentionally +* Consumer and Producer leaves some non gc'able objects in memory after has been stopped. It was done intentionally because `rd_kafka_destroy` sometimes hangs forever. # TODO -* Rocks package * Ordered storage for offsets to prevent commits unprocessed messages +* Add poll call for librdkafka logs and errors * Fix known issues * More examples * Better documentation @@ -297,7 +245,7 @@ because `rd_kafka_destroy` sometimes hangs forever. ### Async -Result: over 80000 produced messages per second on macbook pro 2016 +Result: over 150000 produced messages per second on macbook pro 2016 Local run in docker: ```bash @@ -308,7 +256,7 @@ Local run in docker: ### Sync -Result: over 50000 produced messages per second on macbook pro 2016 +Result: over 90000 produced messages per second on macbook pro 2016 Local run in docker: ```bash @@ -321,7 +269,7 @@ Local run in docker: ### Auto offset store enabled -Result: over 130000 consumed messages per second on macbook pro 2016 +Result: over 140000 consumed messages per second on macbook pro 2016 Local run in docker: ```bash @@ -332,12 +280,12 @@ Local run in docker: ### Manual offset store -Result: over 130000 consumed messages per second on macbook pro 2016 +Result: over 140000 consumed messages per second on macbook pro 2016 Local run in docker: ```bash make docker-run-environment - docker-create-benchmark-manual-commit-consumer-topic + make docker-create-benchmark-manual-commit-consumer-topic make docker-run-benchmark-manual-commit-consumer-interactive ``` diff --git a/benchmarks/async_producer.lua b/benchmarks/async_producer.lua index 8f8ceec..33241d9 100644 --- a/benchmarks/async_producer.lua +++ b/benchmarks/async_producer.lua @@ -3,7 +3,7 @@ local box = require('box') local os = require('os') local log = require('log') local clock = require('clock') -local kafka_producer = require('tnt-kafka.producer') +local tnt_kafka = require('tnt-kafka') box.cfg{} @@ -12,28 +12,7 @@ box.once('init', function() end) local function produce() - local config, err = kafka_producer.ProducerConfig.create( - {"kafka:9092"}, -- -- array of brokers - false -- sync_producer - ) - if err ~= nil then - print(err) - os.exit(1) - end - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic("async_producer_benchmark", {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({brokers = "kafka:9092", options = {}}) if err ~= nil then print(err) os.exit(1) @@ -47,18 +26,20 @@ local function produce() value = "test_value_" .. tostring(i) -- only strings allowed }) if err ~= nil then - print(err) +-- print(err) + fiber.sleep(0.1) else break end end if i % 1000 == 0 then +-- log.info("done %d", i) fiber.yield() end end log.info("stopping") - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local ok, err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -69,4 +50,5 @@ local function produce() end log.info("starting benchmark") + produce() diff --git a/benchmarks/auto_offset_store_consumer.lua b/benchmarks/auto_offset_store_consumer.lua index ace3859..93f6c26 100644 --- a/benchmarks/auto_offset_store_consumer.lua +++ b/benchmarks/auto_offset_store_consumer.lua @@ -1,11 +1,13 @@ local fiber = require('fiber') +local log = require('log') local box = require('box') local os = require('os') local clock = require('clock') -local kafka_consumer = require('tnt-kafka.consumer') -local kafka_producer = require('tnt-kafka.producer') +local tnt_kafka = require('tnt-kafka') -box.cfg{} +box.cfg{ + memtx_memory = 524288000, -- 500 MB +} local TOPIC = "auto_offset_store_consumer_benchmark" local MSG_COUNT = 10000000 @@ -15,28 +17,7 @@ box.once('init', function() end) local function produce_initial_data() - local config, err = kafka_producer.ProducerConfig.create( - {"kafka:9092"}, - false - ) - if err ~= nil then - print(err) - os.exit(1) - end - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic(TOPIC, {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({ brokers = "kafka:9092"}) if err ~= nil then print(err) os.exit(1) @@ -49,7 +30,8 @@ local function produce_initial_data() value = "test_value_" .. tostring(i) -- only strings allowed }) if err ~= nil then - print(err) +-- print(err) + fiber.sleep(0.1) else break end @@ -59,7 +41,7 @@ local function produce_initial_data() end end - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local ok, err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -67,32 +49,19 @@ local function produce_initial_data() end local function consume() - local config, err = kafka_consumer.ConsumerConfig.create( - {"kafka:9092"}, -- array of brokers - "test_consumer", -- consumer group - true, -- enable auto offset storage - {["auto.offset.reset"] = "earliest"} -- default configuration for topics - ) - if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("queued.min.messages", "100000") -- set global consumer option - - local consumer, err = kafka_consumer.Consumer.create(config) + local consumer, err = tnt_kafka.Consumer.create({ brokers = "kafka:9092", options = { + ["enable.auto.offset.store"] = "true", + ["group.id"] = "test_consumer1", + ["auto.offset.reset"] = "earliest", + ["enable.partition.eof"] = "false", + ["queued.min.messages"] = "100000" + }}) if err ~= nil then print(err) os.exit(1) end - local err = consumer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:subscribe({TOPIC}) -- array of topics to subscribe + local err = consumer:subscribe({TOPIC}) if err ~= nil then print(err) os.exit(1) @@ -116,10 +85,14 @@ local function consume() counter = counter + 1 -- print(msg:value()) end + if counter % 10000 == 0 then + log.info("done %d", counter) + fiber.yield() + end end print("closing") - local err = consumer:stop() + local ok, err = consumer:close() if err ~= nil then print(err) os.exit(1) diff --git a/benchmarks/manual_offset_store_consumer.lua b/benchmarks/manual_offset_store_consumer.lua index 176e50f..01cf7d9 100644 --- a/benchmarks/manual_offset_store_consumer.lua +++ b/benchmarks/manual_offset_store_consumer.lua @@ -3,8 +3,7 @@ local box = require('box') local os = require('os') local log = require('log') local clock = require('clock') -local kafka_consumer = require('tnt-kafka.consumer') -local kafka_producer = require('tnt-kafka.producer') +local tnt_kafka = require('tnt-kafka') box.cfg{ memtx_memory = 524288000, @@ -18,28 +17,7 @@ box.once('init', function() end) local function produce_initial_data() - local config, err = kafka_producer.ProducerConfig.create( - {"kafka:9092"}, -- -- array of brokers - false -- sync_producer - ) - if err ~= nil then - print(err) - os.exit(1) - end - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic(TOPIC, {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({ brokers = "kafka:9092"}) if err ~= nil then print(err) os.exit(1) @@ -52,7 +30,8 @@ local function produce_initial_data() value = "test_value_" .. tostring(i) -- only strings allowed }) if err ~= nil then - print(err) + -- print(err) + fiber.sleep(0.1) else break end @@ -62,7 +41,7 @@ local function produce_initial_data() end end - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local ok, err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -70,32 +49,19 @@ local function produce_initial_data() end local function consume() - local config, err = kafka_consumer.ConsumerConfig.create( - {"kafka:9092"}, -- array of brokers - "test_consumer", -- consumer group - false, -- enable_auto_commit - {["auto.offset.reset"] = "earliest"} -- default configuration for topics - ) + local consumer, err = tnt_kafka.Consumer.create({ brokers = "kafka:9092", options = { + ["enable.auto.offset.store"] = "false", + ["group.id"] = "test_consumer1", + ["auto.offset.reset"] = "earliest", + ["enable.partition.eof"] = "false", + ["queued.min.messages"] = "100000" + }}) if err ~= nil then print(err) os.exit(1) end - config:set_option("queued.min.messages", "100000") -- set global consumer option - - local consumer, err = kafka_consumer.Consumer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:subscribe({TOPIC}) -- array of topics to subscribe + local err = consumer:subscribe({TOPIC}) if err ~= nil then print(err) os.exit(1) @@ -109,35 +75,34 @@ local function consume() return end - for i = 1, MSG_COUNT do + while counter < MSG_COUNT do if out:is_closed() then return end local msg = out:get() if msg ~= nil then - local err = consumer:store_offset(msg) + counter = counter + 1 + err = consumer:store_offset(msg) if err ~= nil then print(err) - os.exit(1) end - - counter = counter + 1 end - if i % 100000 == 0 then - log.info("done %d", i) + if counter % 10000 == 0 then + log.info("done %d", counter) + fiber.yield() end end - log.info("closing") - local err = consumer:stop() + print("closing") + local ok, err = consumer:close() if err ~= nil then print(err) os.exit(1) end local duration = clock.monotonic64() - before - log.info(string.format("done benchmark for %f seconds", tonumber(duration * 1.0 / (10 ^ 9)))) + print(string.format("done benchmark for %f seconds", tonumber(duration * 1.0 / (10 ^ 9)))) end log.info("producing initial data") diff --git a/benchmarks/sync_producer.lua b/benchmarks/sync_producer.lua index 403ce4a..c8cc800 100644 --- a/benchmarks/sync_producer.lua +++ b/benchmarks/sync_producer.lua @@ -3,10 +3,10 @@ local box = require('box') local log = require('log') local os = require('os') local clock = require('clock') -local kafka_producer = require('tnt-kafka.producer') +local tnt_kafka = require('tnt-kafka') box.cfg{ - memtx_memory = 524288000, + memtx_memory = 524288000, -- 500 MB } box.once('init', function() @@ -14,57 +14,55 @@ box.once('init', function() end) local function produce() - local config, err = kafka_producer.ProducerConfig.create( - {"kafka:9092"}, -- -- array of brokers - true -- sync_producer - ) - if err ~= nil then - print(err) - os.exit(1) - end - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic("sync_producer_benchmark", {}) -- add topic with configuration + local producer, err = tnt_kafka.Producer.create({ + brokers = "kafka:9092", + options = { + ["queue.buffering.max.ms"] = "100", + } + }) if err ~= nil then print(err) os.exit(1) end local before = clock.monotonic64() - for i = 1, 10000000 do + local input_ch = fiber.channel(); + for i = 1, 12000 do fiber.create(function() - local value = "test_value_" .. tostring(i) while true do - local err = producer:produce({ - topic = "sync_producer_benchmark", - value = value -- only strings allowed - }) - if err ~= nil then - print(err) - fiber.sleep(0.1) - else + if input_ch:is_closed() then break end + local value = input_ch:get() + if value ~= nil then + while true do + local err = producer:produce({ + topic = "sync_producer_benchmark", + value = value -- only strings allowed + }) + if err ~= nil then + -- print(err) + fiber.sleep(0.1) + else +-- if value % 10000 == 0 then +-- log.info("done %d", value) +-- end + break + end + end + end end end) - if i % 1000 == 0 then - fiber.yield() - end end + for i = 1, 10000000 do + input_ch:put(i) + end + + input_ch:close() + log.info("stopping") - local err = producer:stop() -- always stop consumer to send all pending messages before app close + local ok, err = producer:close() -- always stop consumer to send all pending messages before app close if err ~= nil then print(err) os.exit(1) @@ -75,4 +73,5 @@ local function produce() end log.info("starting benchmark") + produce() diff --git a/cmake/FindRdKafka.cmake b/cmake/FindRdKafka.cmake new file mode 100644 index 0000000..5ef53e5 --- /dev/null +++ b/cmake/FindRdKafka.cmake @@ -0,0 +1,24 @@ +find_path(RDKAFKA_ROOT_DIR + NAMES include/librdkafka/rdkafka.h + ) + +find_path(RDKAFKA_INCLUDE_DIR + NAMES librdkafka/rdkafka.h + HINTS ${RDKAFKA_ROOT_DIR}/include + ) +find_library(RDKAFKA_LIBRARY + NAMES ${CMAKE_SHARED_LIBRARY_PREFIX}rdkafka${CMAKE_SHARED_LIBRARY_SUFFIX} rdkafka + HINTS ${RDKAFKA_ROOT_DIR}/lib + ) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(RDKAFKA DEFAULT_MSG + RDKAFKA_LIBRARY + RDKAFKA_INCLUDE_DIR + ) + +mark_as_advanced( + RDKAFKA_ROOT_DIR + RDKAFKA_INCLUDE_DIR + RDKAFKA_LIBRARY +) diff --git a/cmake/FindTarantool.cmake b/cmake/FindTarantool.cmake new file mode 100644 index 0000000..f390960 --- /dev/null +++ b/cmake/FindTarantool.cmake @@ -0,0 +1,44 @@ +# Define GNU standard installation directories +include(GNUInstallDirs) + +macro(extract_definition name output input) + string(REGEX MATCH "#define[\t ]+${name}[\t ]+\"([^\"]*)\"" + _t "${input}") + string(REGEX REPLACE "#define[\t ]+${name}[\t ]+\"(.*)\"" "\\1" + ${output} "${_t}") +endmacro() + +find_path(TARANTOOL_INCLUDE_DIR tarantool/module.h + HINTS ENV TARANTOOL_DIR /usr/local/include + ) + +if(TARANTOOL_INCLUDE_DIR) + set(_config "-") + file(READ "${TARANTOOL_INCLUDE_DIR}/tarantool/module.h" _config0) + string(REPLACE "\\" "\\\\" _config ${_config0}) + unset(_config0) + extract_definition(PACKAGE_VERSION TARANTOOL_VERSION ${_config}) + extract_definition(INSTALL_PREFIX _install_prefix ${_config}) + unset(_config) +endif() + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(TARANTOOL + REQUIRED_VARS TARANTOOL_INCLUDE_DIR VERSION_VAR TARANTOOL_VERSION) +if(TARANTOOL_FOUND) + set(TARANTOOL_INCLUDE_DIRS "${TARANTOOL_INCLUDE_DIR}" + "${TARANTOOL_INCLUDE_DIR}/tarantool/" + CACHE PATH "Include directories for Tarantool") + set(TARANTOOL_INSTALL_LIBDIR "${CMAKE_INSTALL_LIBDIR}/tarantool" + CACHE PATH "Directory for storing Lua modules written in Lua") + set(TARANTOOL_INSTALL_LUADIR "${CMAKE_INSTALL_DATADIR}/tarantool" + CACHE PATH "Directory for storing Lua modules written in C") + + if (NOT TARANTOOL_FIND_QUIETLY AND NOT FIND_TARANTOOL_DETAILS) + set(FIND_TARANTOOL_DETAILS ON CACHE INTERNAL "Details about TARANTOOL") + message(STATUS "Tarantool LUADIR is ${TARANTOOL_INSTALL_LUADIR}") + message(STATUS "Tarantool LIBDIR is ${TARANTOOL_INSTALL_LIBDIR}") + endif () +endif() +mark_as_advanced(TARANTOOL_INCLUDE_DIRS TARANTOOL_INSTALL_LIBDIR + TARANTOOL_INSTALL_LUADIR) diff --git a/docker/Dockerfile b/docker/Dockerfile index 9b41000..70e0c3b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,9 +1,15 @@ FROM tarantool/tarantool:1.x-centos7 -RUN yum install -y librdkafka +RUN yum update -y + +RUN yum install -y librdkafka librdkafka-devel cmake gcc tarantool-devel ADD . /opt/tarantool WORKDIR /opt/tarantool +RUN cmake . + +RUN make + ENTRYPOINT tarantool /opt/tarantool/tests/app.lua \ No newline at end of file diff --git a/rockspecs/tnt-kafka-0.3.4-1.rockspec b/rockspecs/tnt-kafka-0.3.4-1.rockspec index 8b51357..ff8a2e1 100644 --- a/rockspecs/tnt-kafka-0.3.4-1.rockspec +++ b/rockspecs/tnt-kafka-0.3.4-1.rockspec @@ -1,12 +1,12 @@ package = "tnt-kafka" version = "0.3.4-1" source = { - url = "git://github.com/RepentantGopher/tnt-kafka.git", + url = "git://github.com/tarantool/tnt-kafka.git", tag = "v0.3.4", } description = { summary = "Kafka library for Tarantool", - homepage = "https://github.com/RepentantGopher/tnt-kafka", + homepage = "https://github.com/tarantool/tnt-kafka", license = "Apache", } dependencies = { diff --git a/rockspecs/tnt-kafka-dev-1.rockspec b/rockspecs/tnt-kafka-dev-1.rockspec new file mode 100644 index 0000000..75b529f --- /dev/null +++ b/rockspecs/tnt-kafka-dev-1.rockspec @@ -0,0 +1,31 @@ +package = "tnt-kafka" +version = "dev-1" +source = { + url = "git://github.com/tarantool/tnt-kafka.git", + branch = 'dev', +} +description = { + summary = "Kafka library for Tarantool", + homepage = "https://github.com/tarantool/tnt-kafka", + license = "Apache", +} +dependencies = { + "lua >= 5.1" -- actually tarantool > 1.6 +} +external_dependencies = { + TARANTOOL = { + header = 'tarantool/module.h' + }, + RDKAFKA = { + header = 'librdkafka/rdkafka.h' + } +} +build = { + type = 'cmake'; + variables = { + CMAKE_BUILD_TYPE="RelWithDebInfo", + TARANTOOL_DIR="$(TARANTOOL_DIR)", + TARANTOOL_INSTALL_LIBDIR="$(LIBDIR)", + TARANTOOL_INSTALL_LUADIR="$(LUADIR)" + } +} \ No newline at end of file diff --git a/rockspecs/tnt-kafka-scm-1.rockspec b/rockspecs/tnt-kafka-scm-1.rockspec new file mode 100644 index 0000000..668bae3 --- /dev/null +++ b/rockspecs/tnt-kafka-scm-1.rockspec @@ -0,0 +1,31 @@ +package = "tnt-kafka" +version = "scm-1" +source = { + url = "git://github.com/tarantool/tnt-kafka.git", + branch = 'master', +} +description = { + summary = "Kafka library for Tarantool", + homepage = "https://github.com/tarantool/tnt-kafka", + license = "Apache", +} +dependencies = { + "lua >= 5.1" -- actually tarantool > 1.6 +} +external_dependencies = { + TARANTOOL = { + header = 'tarantool/module.h' + }, + RDKAFKA = { + header = 'librdkafka/rdkafka.h' + } +} +build = { + type = 'cmake'; + variables = { + CMAKE_BUILD_TYPE="RelWithDebInfo", + TARANTOOL_DIR="$(TARANTOOL_DIR)", + TARANTOOL_INSTALL_LIBDIR="$(LIBDIR)", + TARANTOOL_INSTALL_LUADIR="$(LUADIR)" + } +} \ No newline at end of file diff --git a/tests/consumer.lua b/tests/consumer.lua index 2adbec0..178fa39 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -1,77 +1,66 @@ +local box = require("box") local log = require("log") local os = require("os") local fiber = require('fiber') -local kafka_consumer = require('tnt-kafka.consumer') +local tnt_kafka = require('tnt-kafka') -local BROKERS_ADDRESS = { "kafka" } local TOPIC_NAME = "test_consumer" local function consume() - log.info("consumer called") + log.info("consume called") - local config, err = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS, "test_consumer6", false, {["auto.offset.reset"] = "earliest"}) + local consumer, err = tnt_kafka.Consumer.create({brokers = "kafka:9092", options = { + ["enable.auto.offset.store"] = "false", + ["group.id"] = "test_consumer", + ["auto.offset.reset"] = "earliest", + ["enable.partition.eof"] = "false", + }}) if err ~= nil then - print(err) - os.exit(1) - end - --- config:set_option("check.crcs", "true") - - local consumer, err = kafka_consumer.Consumer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = consumer:start() - if err ~= nil then - print(err) - os.exit(1) + log.error("got err %s", err) + box.error{code = 500, reason = err} end + log.info("consumer created") + log.info("consumer subscribing") local err = consumer:subscribe({TOPIC_NAME}) if err ~= nil then - print(err) - os.exit(1) + log.error("got err %s", err) + box.error{code = 500, reason = err} end + log.info("consumer subscribed") + log.info("consumer polling") local consumed = {} - for i = 0, 1 do - fiber.create(function() - while true do - local out, err = consumer:output() - if err ~= nil then - print(string.format("got fatal error '%s'", err)) - return - end - log.info("got output") - - if out:is_closed() then - return - end + fiber.create(function() + local out = consumer:output() + while true do + if out:is_closed() then + break + end - local msg = out:get() - log.info("got msg") - if msg ~= nil then - print(string.format("got msg with topic='%s' partition='%s' offset='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:value())) - table.insert(consumed, msg:value()) - local err = consumer:store_offset(msg) - if err ~= nil then - print(string.format("got error '%s' while commiting msg from topic '%s'", err, msg:topic())) - end + local msg = out:get() + if msg ~= nil then + log.info(msg) + log.info("got msg with topic='%s' partition='%d' offset='%d' key='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()) + table.insert(consumed, msg:value()) + local err = consumer:store_offset(msg) + if err ~= nil then + log.error("got error '%s' while commiting msg from topic '%s'", err, msg:topic()) end end - end) - end + end + end) + log.info("consumer wait") fiber.sleep(10) + log.info("consumer ends") log.info("stopping consumer") - local err = consumer:stop() + local exists, err = consumer:close() if err ~= nil then - print(err) - os.exit(1) + log.error("got err %s", err) + box.error{code = 500, reason = err} end log.info("stopped consumer") diff --git a/tests/producer.lua b/tests/producer.lua index 1cdea98..dc76693 100644 --- a/tests/producer.lua +++ b/tests/producer.lua @@ -1,54 +1,30 @@ local os = require('os') +local box = require('box') +local log = require('log') local fiber = require('fiber') -local kafka_producer = require('tnt-kafka.producer') +local tnt_kafka = require('tnt-kafka') -local BROKERS_ADDRESS = { "kafka" } local TOPIC_NAME = "test_producer" return function(messages) - local config, err = kafka_producer.ProducerConfig.create(BROKERS_ADDRESS, true) + local producer, err = tnt_kafka.Producer.create({brokers = "kafka:9092", options = {}}) if err ~= nil then - print(err) - os.exit(1) - end - - config:set_option("statistics.interval.ms", "1000") - config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) - - local producer, err = kafka_producer.Producer.create(config) - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:start() - if err ~= nil then - print(err) - os.exit(1) - end - - local err = producer:add_topic(TOPIC_NAME, {}) - if err ~= nil then - print(err) - os.exit(1) + log.error("got err %s", err) + box.error{code = 500, reason = err} end for _, message in ipairs(messages) do - fiber.create(function() - local err = producer:produce({topic = TOPIC_NAME, value = message}) - if err ~= nil then - print(string.format("got error '%s' while sending value '%s'", err, message)) - else - print(string.format("successfully sent value '%s'", message)) - end - end) + local err = producer:produce({topic = TOPIC_NAME, key = message, value = message}) + if err ~= nil then + log.error("got error '%s' while sending value '%s'", err, message) + else + log.error("successfully sent value '%s'", message) + end end - fiber.sleep(2) - - local err = producer:stop() + local err = producer:close() if err ~= nil then - print(err) - os.exit(1) + log.error("got err %s", err) + box.error{code = 500, reason = err} end end diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 8a8618a..1c11fda 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -50,19 +50,7 @@ async def send(): reconnect_delay=1, connect_now=True) - attempts = 0 - while True: - try: - response = server.call("consumer.consume", ()) - # tarantool in docker sometimes stacks - except: - attempts += 1 - if attempts < 3: - continue - else: - assert True is False - else: - break + response = server.call("consumer.consume", ()) assert set(*response) == { "test1", diff --git a/tests/test_producer.py b/tests/test_producer.py index 32c1b9f..9a62af7 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -4,7 +4,15 @@ def test_producer(): - server = tarantool.connect("127.0.0.1", 3301) + server = tarantool.Connection( + "127.0.0.1", 3301, + user="guest", + password=None, + socket_timeout=30, + reconnect_max_attempts=3, + reconnect_delay=1, + connect_now=True + ) server.call("producer", ( ( @@ -48,15 +56,15 @@ async def consume(): assert kafka_output == [ { - "key": None, + "key": "1", "value": "1" }, { - "key": None, + "key": "2", "value": "2" }, { - "key": None, + "key": "3", "value": "3" }, ] diff --git a/tnt-kafka/CMakeLists.txt b/tnt-kafka/CMakeLists.txt new file mode 100644 index 0000000..cc7f012 --- /dev/null +++ b/tnt-kafka/CMakeLists.txt @@ -0,0 +1,12 @@ +add_library(tntkafka SHARED tnt_kafka.c ) + +if (APPLE) + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} \ + -undefined suppress -flat_namespace") +endif(APPLE) + +target_link_libraries(tntkafka ${RDKAFKA_LIBRARY} pthread) + +set_target_properties(tntkafka PROPERTIES PREFIX "" OUTPUT_NAME "tntkafka") +install(TARGETS tntkafka LIBRARY DESTINATION ${TARANTOOL_INSTALL_LIBDIR}/tnt-kafka) +install(FILES init.lua DESTINATION ${TARANTOOL_INSTALL_LUADIR}/tnt-kafka) diff --git a/tnt-kafka/consumer.lua b/tnt-kafka/consumer.lua deleted file mode 100644 index 95e9a11..0000000 --- a/tnt-kafka/consumer.lua +++ /dev/null @@ -1,329 +0,0 @@ -local log = require("log") -local ffi = require('ffi') -local fiber = require('fiber') -local librdkafka = require('tnt-kafka.librdkafka') - -local ConsumerConfig = {} - -ConsumerConfig.__index = ConsumerConfig - -function ConsumerConfig.create(brokers_list, consumer_group, auto_offset_store, default_topic_opts) - if brokers_list == nil then - return nil, "brokers list must not be nil" - end - if consumer_group == nil then - return nil, "consumer group must not be nil" - end - if auto_offset_store == nil then - return nil, "auto_offset_store flag must not be nil" - end - - if default_topic_opts == nil then - return nil, "default_topic_opts must not be nil" - end - - local config = { - _brokers_list = brokers_list, - _consumer_group = consumer_group, - _auto_offset_store = auto_offset_store, - _options = {}, - _topic_opts = default_topic_opts, - } - setmetatable(config, ConsumerConfig) - return config, nil -end - -function ConsumerConfig:get_brokers_list() - return self._brokers_list -end - -function ConsumerConfig:get_consumer_group() - return self._consumer_group -end - -function ConsumerConfig:get_auto_offset_store() - return self._auto_offset_store -end - -function ConsumerConfig:set_option(name, value) - self._options[name] = value -end - -function ConsumerConfig:get_options() - return self._options -end - -function ConsumerConfig:get_default_topic_options() - return self._topic_opts -end - -local ConsumerMessage = {} - -ConsumerMessage.__index = ConsumerMessage - -function ConsumerMessage.create(rd_message) - local msg = { - _rd_message = rd_message, - _value = nil, - _topic = nil, - _partition = nil, - _offset = nil, - } - ffi.gc(msg._rd_message, function(...) - librdkafka.rd_kafka_message_destroy(...) - end) - setmetatable(msg, ConsumerMessage) - return msg -end - -function ConsumerMessage:value() - if self._value == nil then - self._value = ffi.string(self._rd_message.payload) - end - return self._value -end - -function ConsumerMessage:topic() - if self._topic == nil then - self._topic = ffi.string(librdkafka.rd_kafka_topic_name(self._rd_message.rkt)) - end - return self._topic -end - -function ConsumerMessage:partition() - if self._partition == nil then - self._partition = tonumber(self._rd_message.partition) - end - return self._partition -end - -function ConsumerMessage:offset() - if self._offset == nil then - self._offset = tonumber64(self._rd_message.offset) - end - return self._offset -end - -local Consumer = {} - -Consumer.__index = Consumer - -function Consumer.create(config) - if config == nil then - return nil, "config must not be nil" - end - - local consumer = { - config = config, - _rd_consumer = {}, - _output_ch = nil, - } - setmetatable(consumer, Consumer) - return consumer, nil -end - -function Consumer:_get_topic_rd_config(config) - local rd_config = librdkafka.rd_kafka_topic_conf_new() - --- FIXME: sometimes got segfault here --- ffi.gc(rd_config, function (rd_config) --- librdkafka.rd_kafka_topic_conf_destroy(rd_config) --- end) - - local ERRLEN = 256 - for key, value in pairs(config) do - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - - if librdkafka.rd_kafka_topic_conf_set(rd_config, key, value, errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - end - - return rd_config, nil -end - -function Consumer:_get_consumer_rd_config() - local rd_config = librdkafka.rd_kafka_conf_new() - --- FIXME: why we got segfault here? --- ffi.gc(rd_config, function (rd_config) --- librdkafka.rd_kafka_conf_destroy(rd_config) --- end) - - local ERRLEN = 256 - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - if librdkafka.rd_kafka_conf_set(rd_config, "group.id", tostring(self.config:get_consumer_group()), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - - local auto_offset_store - if self.config:get_auto_offset_store() then - auto_offset_store = "true" - else - auto_offset_store = "false" - end - - local ERRLEN = 256 - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - if librdkafka.rd_kafka_conf_set(rd_config, "enable.auto.offset.store", auto_offset_store, errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - - for key, value in pairs(self.config:get_options()) do - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - if librdkafka.rd_kafka_conf_set(rd_config, key, tostring(value), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - end - - librdkafka.rd_kafka_conf_set_error_cb(rd_config, - function(rk, err, reason) - log.error("rdkafka error code=%d reason=%s", tonumber(err), ffi.string(reason)) - end) - - - librdkafka.rd_kafka_conf_set_log_cb(rd_config, - function(rk, level, fac, buf) - log.info("%s - %s", ffi.string(fac), ffi.string(buf)) - end) - - local rd_topic_config, err = self:_get_topic_rd_config(self.config:get_default_topic_options()) - if err ~= nil then - return nil, err - end - - librdkafka.rd_kafka_conf_set_default_topic_conf(rd_config, rd_topic_config) - - return rd_config, nil -end - -function Consumer:_poll() - while true do - -- lower timeout value can lead to broken payload - local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 10) - if rd_message ~= nil then - if rd_message.err == librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - self._output_ch:put(ConsumerMessage.create(rd_message)) - else - -- FIXME: properly log this - log.error("rdkafka poll: %s", ffi.string(librdkafka.rd_kafka_err2str(rd_message.err))) - end - end - fiber.yield() - end -end - -jit.off(Consumer._poll) - -function Consumer:start() - local rd_config, err = self:_get_consumer_rd_config() - if err ~= nil then - return err - end - - local ERRLEN = 256 - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - local rd_consumer = librdkafka.rd_kafka_new(librdkafka.RD_KAFKA_CONSUMER, rd_config, errbuf, ERRLEN) - if rd_consumer == nil then - return ffi.string(errbuf) - end - - local err_no = librdkafka.rd_kafka_poll_set_consumer(rd_consumer) - if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - return ffi.string(librdkafka.rd_kafka_err2str(err_no)) - end - - for _, broker in ipairs(self.config:get_brokers_list()) do - if librdkafka.rd_kafka_brokers_add(rd_consumer, broker) < 1 then - return "no valid brokers specified" - end - end - - self._rd_consumer = rd_consumer - - self._output_ch = fiber.channel(10000) - - self._poll_fiber = fiber.create(function() - self:_poll() - end) - - return nil -end - -function Consumer:stop(timeout_ms) - if self._rd_consumer == nil then - return "'stop' method must be called only after consumer was started " - end - - if timeout_ms == nil then - timeout_ms = 1000 - end - - self._poll_fiber:cancel() - self._output_ch:close() - - local err_no = librdkafka.rd_kafka_consumer_close(self._rd_consumer) - if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - return ffi.string(librdkafka.rd_kafka_err2str(err_no)) - end - --- -- FIXME: sometimes rd_kafka_destroy hangs forever --- librdkafka.rd_kafka_destroy(self._rd_consumer) --- librdkafka.rd_kafka_wait_destroyed(timeout_ms) - - self._rd_consumer = nil - - return nil -end - -function Consumer:subscribe(topics) - if self._rd_consumer == nil then - return "'add_topic' method must be called only after consumer was started " - end - - local list = librdkafka.rd_kafka_topic_partition_list_new(#topics) - for _, topic in ipairs(topics) do - librdkafka.rd_kafka_topic_partition_list_add(list, topic, librdkafka.RD_KAFKA_PARTITION_UA) - end - - local err = nil - local err_no = librdkafka.rd_kafka_subscribe(self._rd_consumer, list) - if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - err = ffi.string(librdkafka.rd_kafka_err2str(err_no)) - end - - - librdkafka.rd_kafka_topic_partition_list_destroy(list) - - return err -end - -function Consumer:output() - if self._output_ch == nil then - return nil, "'output' method must be called only after consumer was started " - end - - return self._output_ch, nil -end - -function Consumer:store_offset(message) - if self._rd_consumer == nil then - return "'store_offset' method must be called only after consumer was started " - end - - if self.config:get_auto_offset_store() then - return "auto offset store was enabled by configuration" - end - - local err_no = librdkafka.rd_kafka_offset_store(message._rd_message.rkt, message._rd_message.partition, message._rd_message.offset) - if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - return ffi.string(librdkafka.rd_kafka_err2str(err_no)) - end - - return nil -end - -return { - ConsumerConfig = ConsumerConfig, - Consumer = Consumer, -} diff --git a/tnt-kafka/init.lua b/tnt-kafka/init.lua new file mode 100644 index 0000000..995da49 --- /dev/null +++ b/tnt-kafka/init.lua @@ -0,0 +1,194 @@ +local log = require("log") +local fiber = require('fiber') +local tnt_kafka = require("tnt-kafka.tntkafka") + +local Consumer = {} + +Consumer.__index = Consumer + +function Consumer.create(config) + if config == nil then + return nil, "config must not be nil" + end + + local consumer, err = tnt_kafka.create_consumer(config) + if err ~= nil then + return nil, err + end + + local new = { + config = config, + _consumer = consumer, + _output_ch = fiber.channel(10000), + } + setmetatable(new, Consumer) + + new._poll_fiber = fiber.create(function() + new:_poll() + end) + + new._poll_msg_fiber = fiber.create(function() + new:_poll_msg() + end) + + return new, nil +end + +function Consumer:_poll() + local err + while true do + err = self._consumer:poll() + if err ~= nil then + log.error(err) + end + end +end + +jit.off(Consumer._poll) + +function Consumer:_poll_msg() + local msgs + while true do + msgs = self._consumer:poll_msg(100) + if #msgs > 0 then + for _, msg in ipairs(msgs) do + self._output_ch:put(msg) + end + fiber.yield() + else + -- throtling poll + fiber.sleep(0.01) + end + end +end + +jit.off(Consumer._poll_msg) + +function Consumer:close() + self._poll_msg_fiber:cancel() + self._poll_fiber:cancel() + self._output_ch:close() + + local ok, err = self._consumer:close() + self._consumer = nil + + return err +end + +function Consumer:subscribe(topics) + return self._consumer:subscribe(topics) +end + +function Consumer:output() + return self._output_ch +end + +function Consumer:store_offset(message) + return self._consumer:store_offset(message) +end + +local Producer = {} + +Producer.__index = Producer + +function Producer.create(config) + if config == nil then + return nil, "config must not be nil" + end + + local producer, err = tnt_kafka.create_producer(config) + if err ~= nil then + return nil, err + end + + local new = { + config = config, + _counter = 0, + _delivery_map = {}, + _producer = producer, + } + setmetatable(new, Producer) + + new._poll_fiber = fiber.create(function() + new:_poll() + end) + + new._msg_delivery_poll_fiber = fiber.create(function() + new:_msg_delivery_poll() + end) + + return new, nil +end + +function Producer:_poll() + local err + while true do + err = self._producer:poll() + if err ~= nil then + log.error(err) + end + end +end + +jit.off(Producer._poll) + +function Producer:_msg_delivery_poll() + local count, err + while true do + local count, err + while true do + count, err = self._producer:msg_delivery_poll(100) + if err ~= nil then + log.error(err) + -- throtling poll + fiber.sleep(0.01) + elseif count > 0 then + fiber.yield() + else + -- throtling poll + fiber.sleep(0.01) + end + end + end +end + +jit.off(Producer._msg_delivery_poll) + +function Producer:produce_async(msg) + local err = self._producer:produce(msg) + return err +end + +local function dr_callback_factory(delivery_chan) + return function(err) + delivery_chan:put(err) + end +end + +function Producer:produce(msg) + local delivery_chan = fiber.channel(1) + + msg.dr_callback = dr_callback_factory(delivery_chan) + + local err = self._producer:produce(msg) + if err == nil then + err = delivery_chan:get() + end + + return err +end + +function Producer:close() + self._poll_fiber:cancel() + self._msg_delivery_poll_fiber:cancel() + + local ok, err = self._producer:close() + self._producer = nil + + return err +end + +return { + Consumer = Consumer, + Producer = Producer, +} diff --git a/tnt-kafka/librdkafka.lua b/tnt-kafka/librdkafka.lua deleted file mode 100644 index 563a19a..0000000 --- a/tnt-kafka/librdkafka.lua +++ /dev/null @@ -1,133 +0,0 @@ -local ffi = require 'ffi' - -ffi.cdef[[ - static const int32_t RD_KAFKA_PARTITION_UA = ((int32_t)-1); - - typedef struct rd_kafka_s rd_kafka_t; - typedef struct rd_kafka_conf_s rd_kafka_conf_t; - typedef struct rd_kafka_topic_s rd_kafka_topic_t; - typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t; - - typedef enum { - RD_KAFKA_RESP_ERR__BEGIN = -200, - RD_KAFKA_RESP_ERR_NO_ERROR = 0, - RD_KAFKA_RESP_ERR__QUEUE_FULL = -184 - /* ... */ - } rd_kafka_resp_err_t; - - typedef struct rd_kafka_message_s { - rd_kafka_resp_err_t err; - rd_kafka_topic_t *rkt; - int32_t partition; - void *payload; - size_t len; - void *key; - size_t key_len; - int64_t offset; - void *_private; - } rd_kafka_message_t; - - void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage); - - typedef enum rd_kafka_type_t { - RD_KAFKA_PRODUCER, - RD_KAFKA_CONSUMER - } rd_kafka_type_t; - - typedef enum { - RD_KAFKA_CONF_UNKNOWN = -2, /* Unknown configuration name. */ - RD_KAFKA_CONF_INVALID = -1, /* Invalid configuration value. */ - RD_KAFKA_CONF_OK = 0 /* Configuration okay */ - } rd_kafka_conf_res_t; - - rd_kafka_conf_t *rd_kafka_conf_new (void); - rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf); - void rd_kafka_conf_destroy (rd_kafka_conf_t *conf); - const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp); - void rd_kafka_conf_dump_free (const char **arr, size_t cnt); - rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value, - char *errstr, size_t errstr_size); - void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, void (*dr_msg_cb) (rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, void *opaque)); - void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, void (*consume_cb) (rd_kafka_message_t *rkmessage, - void *opaque)); - void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, void (*error_cb) (rd_kafka_t *rk, int err, - const char *reason, void *opaque)); - void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, int (*stats_cb) (rd_kafka_t *rk, char *json, - size_t json_len, void *opaque)); - void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf, void (*log_cb) (const rd_kafka_t *rk, int level, - const char *fac, const char *buf)); - - rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); - void rd_kafka_destroy (rd_kafka_t *rk); - int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist); - - rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void); - rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t *conf); - rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf, const char *name, - const char *value, char *errstr, size_t errstr_size); - void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf); - const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf, size_t *cntp); - - rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf); - const char *rd_kafka_topic_name (const rd_kafka_topic_t *rkt); - void rd_kafka_topic_destroy (rd_kafka_topic_t *rkt); - - int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partitition, int msgflags, void *payload, size_t len, - const void *key, size_t keylen, void *msg_opaque); - - int rd_kafka_outq_len (rd_kafka_t *rk); - int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms); - - int rd_kafka_wait_destroyed (int timeout_ms); - - rd_kafka_resp_err_t rd_kafka_errno2err (int errnox); - const char *rd_kafka_err2str (rd_kafka_resp_err_t err); - int rd_kafka_thread_cnt (void); - - rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms); - - void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf); - - typedef struct rd_kafka_topic_partition_s { - char *topic; /**< Topic name */ - int32_t partition; /**< Partition */ - int64_t offset; /**< Offset */ - void *metadata; /**< Metadata */ - size_t metadata_size; /**< Metadata size */ - void *opaque; /**< Application opaque */ - rd_kafka_resp_err_t err; /**< Error code, depending on use. */ - void *_private; /**< INTERNAL USE ONLY, - * INITIALIZE TO ZERO, DO NOT TOUCH */ - } rd_kafka_topic_partition_t; - - typedef struct rd_kafka_topic_partition_list_s { - int cnt; /**< Current number of elements */ - int size; /**< Current allocated size */ - rd_kafka_topic_partition_t *elems; /**< Element array[] */ - } rd_kafka_topic_partition_list_t; - - rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size); - void rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rkparlist); - rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition); - - /** - * @remark Only the \c .topic field is used in the supplied \p topics list, - * all other fields are ignored. - */ - rd_kafka_resp_err_t rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics); - - rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms); - rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk); - - rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset); - - rd_kafka_resp_err_t rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async); - rd_kafka_resp_err_t rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async); - - rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk); -]] - -local librdkafka = ffi.load("librdkafka.so.1") - -return librdkafka diff --git a/tnt-kafka/producer.lua b/tnt-kafka/producer.lua deleted file mode 100644 index 9a61c4b..0000000 --- a/tnt-kafka/producer.lua +++ /dev/null @@ -1,318 +0,0 @@ -local ffi = require('ffi') -local log = require('log') -local fiber = require('fiber') -local librdkafka = require('tnt-kafka.librdkafka') - -local ProducerConfig = {} - -ProducerConfig.__index = ProducerConfig - -function ProducerConfig.create(brokers_list, sync_producer) - if brokers_list == nil then - return nil, "brokers list must not be nil" - end - if sync_producer == nil then - return nil, "sync producer variable must not be nil" - end - - local config = { - _brokers_list = brokers_list, - _sync_producer = sync_producer, - _options = {}, - _stat_cb = nil, - } - setmetatable(config, ProducerConfig) - return config, nil -end - -function ProducerConfig:get_brokers_list() - return self._brokers_list -end - -function ProducerConfig:has_sync_producer() - return self._sync_producer -end - -function ProducerConfig:set_option(name, value) - self._options[name] = value -end - -function ProducerConfig:get_options() - return self._options -end - -function ProducerConfig:set_stat_cb(callback) - self._stat_cb = callback -end - -function ProducerConfig:get_stat_cb() - return self._stat_cb -end - -local Producer = {} - -Producer.__index = Producer - -function Producer.create(config) - if config == nil then - return nil, "config must not be nil" - end - - local producer = { - config = config, - _counter = 0, - _rd_topics = {}, - _rd_producer = {}, - _delivery_map = {}, - } - setmetatable(producer, Producer) - return producer, nil -end - -function Producer:_get_producer_rd_config() - local rd_config = librdkafka.rd_kafka_conf_new() - --- FIXME: got segfault here --- ffi.gc(rd_config, function (rd_config) --- librdkafka.rd_kafka_conf_destroy(rd_config) --- end) - - local ERRLEN = 256 - for key, value in pairs(self.config:get_options()) do - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - if librdkafka.rd_kafka_conf_set(rd_config, key, tostring(value), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - end - - if self.config:has_sync_producer() then - librdkafka.rd_kafka_conf_set_dr_msg_cb(rd_config, - function(rk, rkmessage) - local delivery_chan = self._delivery_map[tonumber(ffi.cast('intptr_t', rkmessage._private))] - if delivery_chan ~= nil then - local errstr = nil - if rkmessage.err ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - errstr = ffi.string(librdkafka.rd_kafka_err2str(rkmessage.err)) - end - delivery_chan:put(errstr) - end - end) - end - - local stat_cb = self.config:get_stat_cb() - if stat_cb ~= nil then - librdkafka.rd_kafka_conf_set_stats_cb(rd_config, - function(rk, json, json_len) - stat_cb(ffi.string(json, json_len)) - return 0 --librdkafka will immediately free the 'json' pointer. - end) - end - - - librdkafka.rd_kafka_conf_set_error_cb(rd_config, - function(rk, err, reason) - log.error("rdkafka error code=%d reason=%s", tonumber(err), ffi.string(reason)) - end) - - librdkafka.rd_kafka_conf_set_log_cb(rd_config, - function(rk, level, fac, buf) - log.info("%s - %s", ffi.string(fac), ffi.string(buf)) - end) - - return rd_config, nil -end - -function Producer:_poll() - while true do - librdkafka.rd_kafka_poll(self._rd_producer, 10) - fiber.yield() - end -end - -jit.off(Producer._poll) - -function Producer:start() - local rd_config, err = self:_get_producer_rd_config() - if err ~= nil then - return err - end - - local ERRLEN = 256 - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - local rd_producer = librdkafka.rd_kafka_new(librdkafka.RD_KAFKA_PRODUCER, rd_config, errbuf, ERRLEN) - - if rd_producer == nil then - return ffi.string(errbuf) - end - - for _, broker in ipairs(self.config:get_brokers_list()) do - librdkafka.rd_kafka_brokers_add(rd_producer, broker) - end - - self._rd_producer = rd_producer - - self._poll_fiber = fiber.create(function() - self:_poll() - end) - - return nil -end - -local function len(table) - local count = 0 - for _ in pairs(table) do count = count + 1 end - return count -end - -function Producer:stop(timeout_ms) - if self._rd_producer == nil then - return "'stop' method must be called only after producer was started " - end - - if timeout_ms == nil then - timeout_ms = 3000 - end - - local err_no = librdkafka.rd_kafka_flush(self._rd_producer, timeout_ms) - if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then - return ffi.string(librdkafka.rd_kafka_err2str(err_no)) - end - - -- FIXME: potentially this can hang forever - while len(self._delivery_map) > 0 do - fiber.sleep(1) - end - - self._poll_fiber:cancel() - - for name, rd_topic in pairs(self._rd_topics) do - librdkafka.rd_kafka_topic_destroy(rd_topic) - end - self._rd_topics = nil - - librdkafka.rd_kafka_destroy(self._rd_producer) - librdkafka.rd_kafka_wait_destroyed(timeout_ms) - self._rd_producer = nil - - return nil -end - -function Producer:_get_topic_rd_config(config) - local rd_config = librdkafka.rd_kafka_topic_conf_new() - --- FIXME: sometimes got segfault here --- ffi.gc(rd_config, function (rd_config) --- librdkafka.rd_kafka_topic_conf_destroy(rd_config) --- end) - - local ERRLEN = 256 - for key, value in pairs(config) do - local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected - - if librdkafka.rd_kafka_topic_conf_set(rd_config, key, value, errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then - return nil, ffi.string(errbuf) - end - end - - return rd_config, nil -end - -function Producer:add_topic(name, config) - if self._rd_producer == nil then - return "'add_topic' method must be called only after producer was started " - end - - if self._rd_topics[name] ~= nil then - return string.format('topic "%s" already exists', name) - end - - local rd_config, err = self:_get_topic_rd_config(config) - if err ~= nil then - return err - end - - local rd_topic = librdkafka.rd_kafka_topic_new(self._rd_producer, name, rd_config) - if rd_topic == nil then - return ffi.string(librdkafka.rd_kafka_err2str(librdkafka.rd_kafka_errno2err(ffi.errno()))) - end - - self._rd_topics[name] = rd_topic - - return nil -end - -function Producer:_produce_async(msg, id) - if self._rd_producer == nil then - return "'produce' method must be called only after producer was started " - end - - if msg.value == nil or #msg.value == 0 then - return "go empty message value" - end - - local partition = -1 - if msg.partition ~= nil then - partition = msg.partition - end - - local rd_topic = self._rd_topics[msg.topic] - if rd_topic == nil then - self:add_topic(msg.topic, {}) - rd_topic = self._rd_topics[msg.topic] - end - - -- FIXME: non nil partition key produce segfault - local RD_KAFKA_MSG_F_COPY = 0x2 - while true do - local produce_result = librdkafka.rd_kafka_produce( - rd_topic, - partition, - RD_KAFKA_MSG_F_COPY, - ffi.cast("void*", msg.value), #msg.value, - nil, 0, - ffi.cast("void*", id) - ) - - if produce_result == -1 then - local errno = librdkafka.rd_kafka_errno2err(ffi.errno()) - if errno ~= librdkafka.RD_KAFKA_RESP_ERR__QUEUE_FULL then - return ffi.string(librdkafka.rd_kafka_err2str(errno)) - end - fiber.sleep(0.1) - else - return nil - end - end -end - -function Producer:produce_async(msg) - if self.config:has_sync_producer() then - return "only sync producer available via configuration" - end - - return self:_produce_async(msg, nil) -end - -function Producer:produce(msg) - if not self.config:has_sync_producer() then - return "sync producer is not available via configuration" - end - - self._counter = self._counter + 1 - local id = self._counter - local delivery_chan = fiber.channel() - self._delivery_map[id] = delivery_chan - - local err = self:_produce_async(msg, id) - if err == nil then - err = delivery_chan:get() - end - - self._delivery_map[id] = nil - return err -end - -return { - Producer = Producer, - ProducerConfig = ProducerConfig, -} diff --git a/tnt-kafka/tnt_kafka.c b/tnt-kafka/tnt_kafka.c new file mode 100644 index 0000000..f304898 --- /dev/null +++ b/tnt-kafka/tnt_kafka.c @@ -0,0 +1,1091 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +#ifdef UNUSED +#elif defined(__GNUC__) +# define UNUSED(x) UNUSED_ ## x __attribute__((unused)) +#elif defined(__LCLINT__) +# define UNUSED(x) /*@unused@*/ x +#else +# define UNUSED(x) x +#endif + +static const char consumer_label[] = "__tnt_kafka_consumer"; +static const char consumer_msg_label[] = "__tnt_kafka_consumer_msg"; +static const char producer_label[] = "__tnt_kafka_producer"; + +static int +save_pushstring_wrapped(struct lua_State *L) { + char *str = (char *)lua_topointer(L, 1); + lua_pushstring(L, str); + return 1; +} + +static int +safe_pushstring(struct lua_State *L, char *str) { + lua_pushcfunction(L, save_pushstring_wrapped); + lua_pushlightuserdata(L, str); + return lua_pcall(L, 1, 1, 0); +} + +/** + * Push native lua error with code -3 + */ +static int +lua_push_error(struct lua_State *L) +{ + lua_pushnumber(L, -3); + lua_insert(L, -2); + return 2; +} + +// FIXME: suppress warning +//static ssize_t +//kafka_destroy(va_list args) { +// rd_kafka_t *kafka = va_arg(args, rd_kafka_t *); +// +// // waiting in background while garbage collector collects all refs +// sleep(5); +// +// rd_kafka_destroy(kafka); +// while (rd_kafka_wait_destroyed(1000) == -1) {} +// return 0; +//} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/** + * Consumer Message + */ +typedef struct { + const rd_kafka_message_t *rd_message; + rd_kafka_event_t *rd_event; +} msg_t; + + +static inline msg_t * +lua_check_consumer_msg(struct lua_State *L, int index) { + msg_t **msg_p = (msg_t **)luaL_checkudata(L, index, consumer_msg_label); + if (msg_p == NULL || *msg_p == NULL) + luaL_error(L, "Kafka consumer message fatal error: failed to retrieve message from lua stack!"); + return *msg_p; +} + +static int +lua_consumer_msg_topic(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + const char *const_topic = rd_kafka_topic_name(msg->rd_message->rkt); + char topic[sizeof(const_topic)]; + strcpy(topic, const_topic); + int fail = safe_pushstring(L, topic); + return fail ? lua_push_error(L): 1; +} + +static int +lua_consumer_msg_partition(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + lua_pushnumber(L, (double)msg->rd_message->partition); + return 1; +} + +static int +lua_consumer_msg_offset(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + luaL_pushint64(L, msg->rd_message->offset); + return 1; +} + +static int +lua_consumer_msg_key(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) { + return 0; + } + + lua_pushlstring(L, (char*)msg->rd_message->key, msg->rd_message->key_len); + return 1; +} + +static int +lua_consumer_msg_value(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) { + return 0; + } + + lua_pushlstring(L, (char*)msg->rd_message->payload, msg->rd_message->len); + return 1; +} + +static int +lua_consumer_msg_tostring(struct lua_State *L) { + msg_t *msg = lua_check_consumer_msg(L, 1); + + size_t key_len = msg->rd_message->key_len <= 0 ? 5: msg->rd_message->key_len + 1; + char key[key_len]; + + if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) { + strncpy(key, "NULL", 5); + } else { + strncpy(key, msg->rd_message->key, msg->rd_message->key_len + 1); + if (key[msg->rd_message->key_len] != '\0') { + key[msg->rd_message->key_len] = '\0'; + } + } + + size_t value_len = msg->rd_message->len <= 0 ? 5: msg->rd_message->len + 1; + char value[value_len]; + + if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) { + strncpy(value, "NULL", 5); + } else { + strncpy(value, msg->rd_message->payload, msg->rd_message->len + 1); + if (value[msg->rd_message->len] != '\0') { + value[msg->rd_message->len] = '\0'; + } + } + + lua_pushfstring(L, + "Kafka Consumer Message: topic=%s partition=%d offset=%d key=%s value=%s", + rd_kafka_topic_name(msg->rd_message->rkt), + msg->rd_message->partition, + msg->rd_message->offset, + key, + value); + return 1; +} + +static int +lua_consumer_msg_gc(struct lua_State *L) { + msg_t **msg_p = (msg_t **)luaL_checkudata(L, 1, consumer_msg_label); + if (msg_p && *msg_p) { + rd_kafka_event_destroy((*msg_p)->rd_event); + } + if (msg_p) + *msg_p = NULL; + + return 0; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/** + * Consumer + */ + +typedef struct { + rd_kafka_t *rd_consumer; + rd_kafka_topic_partition_list_t *topics; + rd_kafka_queue_t *rd_event_queue; + rd_kafka_queue_t *rd_msg_queue; +} consumer_t; + +static inline consumer_t * +lua_check_consumer(struct lua_State *L, int index) { + consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, index, consumer_label); + if (consumer_p == NULL || *consumer_p == NULL) + luaL_error(L, "Kafka consumer fatal error: failed to retrieve consumer from lua stack!"); + return *consumer_p; +} + +static int +lua_consumer_subscribe(struct lua_State *L) { + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) + luaL_error(L, "Usage: err = consumer:subscribe({'topic'})"); + + consumer_t *consumer = lua_check_consumer(L, 1); + + if (consumer->topics == NULL) { + consumer->topics = rd_kafka_topic_partition_list_new(lua_objlen(L, 1)); + } + + lua_pushnil(L); + // stack now contains: -1 => nil; -2 => table; -3 => consumer + while (lua_next(L, -2)) { + // stack now contains: -1 => value; -2 => key; -3 => table; -4 => consumer + const char *value = lua_tostring(L, -1); + // pop value, leaving original key + lua_pop(L, 1); + // stack now contains: -1 => key; -2 => table; -3 => consumer + + rd_kafka_topic_partition_list_add(consumer->topics, value, -1); + } + // stack now contains: -1 => table; -2 => consumer + + rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer->rd_consumer, consumer->topics); + if (err) { + const char *const_err_str = rd_kafka_err2str(err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 1; + } + + return 0; +} + +static int +lua_consumer_tostring(struct lua_State *L) { + consumer_t *consumer = lua_check_consumer(L, 1); + lua_pushfstring(L, "Kafka Consumer: %p", consumer); + return 1; +} + +static ssize_t +consumer_poll(va_list args) { + rd_kafka_t *rd_consumer = va_arg(args, rd_kafka_t *); + rd_kafka_poll(rd_consumer, 1000); + return 0; +} + +static int +lua_consumer_poll(struct lua_State *L) { + if (lua_gettop(L) != 1) + luaL_error(L, "Usage: err = consumer:poll()"); + + consumer_t *consumer = lua_check_consumer(L, 1); + if (coio_call(consumer_poll, consumer->rd_consumer) == -1) { + lua_pushstring(L, "unexpected error on consumer poll"); + return 1; + } + return 0; +} + +static int +lua_consumer_poll_msg(struct lua_State *L) { + if (lua_gettop(L) != 2) + luaL_error(L, "Usage: msgs = consumer:poll_msg(msgs_limit)"); + + consumer_t *consumer = lua_check_consumer(L, 1); + int counter = 0; + int msgs_limit = lua_tonumber(L, 2); + rd_kafka_event_t *event = NULL; + lua_createtable(L, msgs_limit, 0); + + while (msgs_limit > counter) { + event = rd_kafka_queue_poll(consumer->rd_msg_queue, 0); + if (event == NULL) { + break; + } + if (rd_kafka_event_type(event) == RD_KAFKA_EVENT_FETCH) { + counter += 1; + + msg_t *msg; + msg = malloc(sizeof(msg_t)); + msg->rd_message = rd_kafka_event_message_next(event); + msg->rd_event = event; + + msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg)); + *msg_p = msg; + + luaL_getmetatable(L, consumer_msg_label); + lua_setmetatable(L, -2); + + lua_rawseti(L, -2, counter); + } else { + rd_kafka_event_destroy(event); + } + } + return 1; +} + +// TODO: implement logs and errors notifications +//static int +//lua_consumer_poll_logs(struct lua_State *L) { +// +// return 0; +//} + +static int +lua_consumer_store_offset(struct lua_State *L) { + if (lua_gettop(L) != 2) + luaL_error(L, "Usage: err = consumer:store_offset(msg)"); + + msg_t *msg = lua_check_consumer_msg(L, 2); + rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->rd_message->rkt, msg->rd_message->partition, msg->rd_message->offset); + if (err) { + const char *const_err_str = rd_kafka_err2str(err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 1; + } + return 0; +} + +static rd_kafka_resp_err_t +consumer_close(consumer_t *consumer) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (consumer->rd_consumer != NULL) { + err = rd_kafka_consumer_close(consumer->rd_consumer); + if (err) { + return err; + } + } + + if (consumer->rd_msg_queue != NULL) { + rd_kafka_queue_destroy(consumer->rd_msg_queue); + } + + if (consumer->rd_event_queue != NULL) { + rd_kafka_queue_destroy(consumer->rd_event_queue); + } + + if (consumer->topics != NULL) { + rd_kafka_topic_partition_list_destroy(consumer->topics); + } + + if (consumer->rd_consumer != NULL) { + /* Destroy handle */ + // FIXME: kafka_destroy hangs forever +// coio_call(kafka_destroy, consumer->rd_consumer); + } + + free(consumer); + + return err; +} + +static int +lua_consumer_close(struct lua_State *L) { + consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); + if (consumer_p == NULL || *consumer_p == NULL) { + lua_pushboolean(L, 0); + return 1; + } + + rd_kafka_resp_err_t err = consumer_close(*consumer_p); + if (err) { + lua_pushboolean(L, 1); + + const char *const_err_str = rd_kafka_err2str(err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 2; + } + + *consumer_p = NULL; + lua_pushboolean(L, 1); + return 1; +} + +static int +lua_consumer_gc(struct lua_State *L) { + consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); + if (consumer_p && *consumer_p) { + consumer_close(*consumer_p); + } + if (consumer_p) + *consumer_p = NULL; + return 0; +} + +static int +lua_create_consumer(struct lua_State *L) { + if (lua_gettop(L) != 1 || !lua_istable(L, 1)) + luaL_error(L, "Usage: consumer, err = create_consumer(conf)"); + + lua_pushstring(L, "brokers"); + lua_gettable(L, -2 ); + const char *brokers = lua_tostring(L, -1); + lua_pop(L, 1); + if (brokers == NULL) { + lua_pushnil(L); + int fail = safe_pushstring(L, "consumer config table must have non nil key 'brokers' which contains string"); + return fail ? lua_push_error(L): 2; + } + + char errstr[512]; + + rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf); + + lua_pushstring(L, "options"); + lua_gettable(L, -2 ); + if (lua_istable(L, -1)) { + lua_pushnil(L); + // stack now contains: -1 => nil; -2 => table + while (lua_next(L, -2)) { + // stack now contains: -1 => value; -2 => key; -3 => table + if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { + lua_pushnil(L); + int fail = safe_pushstring(L, "consumer config options must contains only string keys and string values"); + return fail ? lua_push_error(L): 2; + } + + const char *value = lua_tostring(L, -1); + const char *key = lua_tostring(L, -2); + if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { + lua_pushnil(L); + int fail = safe_pushstring(L, errstr); + return fail ? lua_push_error(L): 2; + } + + // pop value, leaving original key + lua_pop(L, 1); + // stack now contains: -1 => key; -2 => table + } + // stack now contains: -1 => table + } + lua_pop(L, 1); + + rd_kafka_t *rd_consumer; + if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) { + lua_pushnil(L); + int fail = safe_pushstring(L, errstr); + return fail ? lua_push_error(L): 2; + } + + if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) { + lua_pushnil(L); + int fail = safe_pushstring(L, "No valid brokers specified"); + return fail ? lua_push_error(L): 2; + } + + rd_kafka_queue_t *rd_event_queue = rd_kafka_queue_get_main(rd_consumer); + rd_kafka_queue_t *rd_msg_queue = rd_kafka_queue_get_consumer(rd_consumer); + + consumer_t *consumer; + consumer = malloc(sizeof(consumer_t)); + consumer->rd_consumer = rd_consumer; + consumer->topics = NULL; + consumer->rd_event_queue = rd_event_queue; + consumer->rd_msg_queue = rd_msg_queue; + + consumer_t **consumer_p = (consumer_t **)lua_newuserdata(L, sizeof(consumer)); + *consumer_p = consumer; + + luaL_getmetatable(L, consumer_label); + lua_setmetatable(L, -2); + return 1; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/** + * Producer + */ + +typedef struct { + rd_kafka_topic_t **elements; + int32_t count; + int32_t capacity; +} producer_topics_t; + +static producer_topics_t * +new_producer_topics(int32_t capacity) { + rd_kafka_topic_t **elements; + elements = malloc(sizeof(rd_kafka_topic_t *) * capacity); + + producer_topics_t *topics; + topics = malloc(sizeof(producer_topics_t)); + topics->capacity = capacity; + topics->count = 0; + topics->elements = elements; + + return topics; +} + +static int +add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element) { + if (topics->count >= topics->capacity) { + rd_kafka_topic_t **new_elements = realloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2); + if (new_elements == NULL) { + printf("realloc failed to relloc rd_kafka_topic_t array."); + return 1; + } + topics->elements = new_elements; + topics->capacity *= 2; + } + topics->elements[topics->count++] = element; + return 0; +} + +static rd_kafka_topic_t * +find_producer_topic_by_name(producer_topics_t *topics, const char *name) { + rd_kafka_topic_t *topic; + for (int i = 0; i < topics->count; i++) { + topic = topics->elements[i]; + if (strcmp(rd_kafka_topic_name(topic), name) == 0) { + return topic; + } + } + return NULL; +} + +static void +destroy_producer_topics(producer_topics_t *topics) { + rd_kafka_topic_t **topic_p; + rd_kafka_topic_t **end = topics->elements + topics->count; + for (topic_p = topics->elements; topic_p < end; topic_p++) { + rd_kafka_topic_destroy(*topic_p); + } + + free(topics->elements); + free(topics); +} + +// Cause `rd_kafka_conf_set_events(rd_config, RD_KAFKA_EVENT_DR)` produces segfault with queue api, we are forced to +// implement our own thread safe queue to push incoming events from callback thread to lua thread. +typedef struct { + int dr_callback; + int err; +} queue_element_t; + +queue_element_t * +new_queue_element(int dr_callback, int err) { + queue_element_t *element; + element = malloc(sizeof(queue_element_t)); + element->dr_callback = dr_callback; + element->err = err; + return element; +} + +void +destroy_queue_element(queue_element_t *element) { + free(element); +} + +typedef struct queue_node_t { + queue_element_t *element; + struct queue_node_t *next; +} queue_node_t; + +typedef struct { + pthread_mutex_t lock; + queue_node_t *head; + queue_node_t *tail; +} queue_t; + +/** + * Pop without locking mutex. + * Caller must lock and unlock queue mutex by itself. + * Use with caution! + * @param queue + * @return + */ +static queue_element_t * +queue_lockfree_pop(queue_t *queue) { + queue_element_t *output = NULL; + + if (queue->head != NULL) { + output = queue->head->element; + queue_node_t *tmp = queue->head; + queue->head = queue->head->next; + free(tmp); + if (queue->head == NULL) { + queue->tail = NULL; + } + } + + return output; +} + +static queue_element_t * +queue_pop(queue_t *queue) { + pthread_mutex_lock(&queue->lock); + + queue_element_t *output = queue_lockfree_pop(queue); + + pthread_mutex_unlock(&queue->lock); + + return output; +} + +static int +queue_push(queue_t *queue, queue_element_t *element) { + if (element == NULL || queue == NULL) { + return -1; + } + + pthread_mutex_lock(&queue->lock); + + queue_node_t *new_node; + new_node = malloc(sizeof(queue_node_t)); + if (new_node == NULL) { + return -1; + } + + new_node->element = element; + new_node->next = NULL; + + if (queue->tail != NULL) { + queue->tail->next = new_node; + } + + queue->tail = new_node; + if (queue->head == NULL) { + queue->head = new_node; + } + + pthread_mutex_unlock(&queue->lock); + + return 0; +} + +static queue_t * +new_queue() { + queue_t *queue = malloc(sizeof(queue_t)); + if (queue == NULL) { + return NULL; + } + + pthread_mutex_t lock; + if (pthread_mutex_init(&lock, NULL) != 0) { + return NULL; + } + + queue->lock = lock; + queue->head = NULL; + queue->tail = NULL; + + return queue; +} + +void +destroy_queue(queue_t *queue) { + while (true) { + queue_element_t *element = queue_pop(queue); + if (element == NULL) { + break; + } + destroy_queue_element(element); + } + + pthread_mutex_destroy(&queue->lock); + free(queue); +} + +typedef struct { + rd_kafka_conf_t *rd_config; + rd_kafka_t *rd_producer; + producer_topics_t *topics; + queue_t *delivery_queue; +} producer_t; + +static inline producer_t * +lua_check_producer(struct lua_State *L, int index) { + producer_t **producer_p = (producer_t **)luaL_checkudata(L, index, producer_label); + if (producer_p == NULL || *producer_p == NULL) + luaL_error(L, "Kafka consumer fatal error: failed to retrieve producer from lua stack!"); + return *producer_p; +} + +static int +lua_producer_tostring(struct lua_State *L) { + producer_t *producer = lua_check_producer(L, 1); + lua_pushfstring(L, "Kafka Producer: %p", producer); + return 1; +} + +static ssize_t +producer_poll(va_list args) { + rd_kafka_t *rd_producer = va_arg(args, rd_kafka_t *); + rd_kafka_poll(rd_producer, 1000); + return 0; +} + +static int +lua_producer_poll(struct lua_State *L) { + if (lua_gettop(L) != 1) + luaL_error(L, "Usage: err = producer:poll()"); + + producer_t *producer = lua_check_producer(L, 1); + if (coio_call(producer_poll, producer->rd_producer) == -1) { + lua_pushstring(L, "unexpected error on producer poll"); + return 1; + } + return 0; +} + +static int +lua_producer_msg_delivery_poll(struct lua_State *L) { + if (lua_gettop(L) != 2) + luaL_error(L, "Usage: count, err = producer:msg_delivery_poll(events_limit)"); + + producer_t *producer = lua_check_producer(L, 1); + + int events_limit = lua_tonumber(L, 2); + int callbacks_count = 0; + char *err_str = NULL; + queue_element_t *element = NULL; + + pthread_mutex_lock(&producer->delivery_queue->lock); + + while (events_limit > callbacks_count) { + element = queue_lockfree_pop(producer->delivery_queue); + if (element == NULL) { + break; + } + callbacks_count += 1; + lua_rawgeti(L, LUA_REGISTRYINDEX, element->dr_callback); + if (element->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + lua_pushstring(L, (char *)rd_kafka_err2str(element->err)); + } else { + lua_pushnil(L); + } + /* do the call (1 arguments, 0 result) */ + if (lua_pcall(L, 1, 0, 0) != 0) { + err_str = (char *)lua_tostring(L, -1); + } + luaL_unref(L, LUA_REGISTRYINDEX, element->dr_callback); + destroy_queue_element(element); + if (err_str != NULL) { + break; + } + } + + pthread_mutex_unlock(&producer->delivery_queue->lock); + + lua_pushnumber(L, (double)callbacks_count); + if (err_str != NULL) { + int fail = safe_pushstring(L, err_str); + if (fail) { + return lua_push_error(L); + } + } else { + lua_pushnil(L); + } + return 2; +} + +static int +lua_producer_produce(struct lua_State *L) { + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) + luaL_error(L, "Usage: err = producer:produce(msg)"); + + lua_pushstring(L, "topic"); + lua_gettable(L, -2 ); + const char *topic = lua_tostring(L, -1); + lua_pop(L, 1); + if (topic == NULL) { + int fail = safe_pushstring(L, "producer message must contains non nil 'topic' key"); + return fail ? lua_push_error(L): 1; + } + + lua_pushstring(L, "key"); + lua_gettable(L, -2 ); + // rd_kafka will copy key so no need to worry about this cast + char *key = (char *)lua_tostring(L, -1); + lua_pop(L, 1); + + size_t key_len = key != NULL ? strlen(key) : 0; + + lua_pushstring(L, "value"); + lua_gettable(L, -2 ); + // rd_kafka will copy value so no need to worry about this cast + char *value = (char *)lua_tostring(L, -1); + lua_pop(L, 1); + + size_t value_len = value != NULL ? strlen(value) : 0; + + if (key == NULL && value == NULL) { + int fail = safe_pushstring(L, "producer message must contains non nil key or value"); + return fail ? lua_push_error(L): 1; + } + + // create delivery callback queue if got msg id + queue_element_t *element = NULL; + lua_pushstring(L, "dr_callback"); + lua_gettable(L, -2 ); + if (lua_isfunction(L, -1)) { + element = new_queue_element(luaL_ref(L, LUA_REGISTRYINDEX), RD_KAFKA_RESP_ERR_NO_ERROR); + if (element == NULL) { + int fail = safe_pushstring(L, "failed to create callback message"); + return fail ? lua_push_error(L): 1; + } + } else { + lua_pop(L, 1); + } + lua_pop(L, 1); + + producer_t *producer = lua_check_producer(L, 1); + rd_kafka_topic_t *rd_topic = find_producer_topic_by_name(producer->topics, topic); + if (rd_topic == NULL) { + rd_topic = rd_kafka_topic_new(producer->rd_producer, topic, NULL); + if (rd_topic == NULL) { + const char *const_err_str = rd_kafka_err2str(rd_kafka_errno2err(errno)); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 1; + } + if (add_producer_topics(producer->topics, rd_topic) != 0) { + int fail = safe_pushstring(L, "Unexpected error: failed to add new topic to topic list!"); + return fail ? lua_push_error(L): 1; + } + } + + if (rd_kafka_produce(rd_topic, -1, RD_KAFKA_MSG_F_COPY, value, value_len, key, key_len, element) == -1) { + const char *const_err_str = rd_kafka_err2str(rd_kafka_errno2err(errno)); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 1; + } + return 0; +} + +static ssize_t producer_flush(va_list args) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_t *rd_producer = va_arg(args, rd_kafka_t *); + while (true) { + err = rd_kafka_flush(rd_producer, 1000); + if (err != RD_KAFKA_RESP_ERR__TIMED_OUT) { + break; + } + } + return 0; +} + +static rd_kafka_resp_err_t +producer_close(producer_t *producer) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (producer->rd_producer != NULL) { + coio_call(producer_flush, producer->rd_producer); + } + + if (producer->topics != NULL) { + destroy_producer_topics(producer->topics); + } + + if (producer->delivery_queue != NULL) { + destroy_queue(producer->delivery_queue); + } + + if (producer->rd_producer != NULL) { + // FIXME: if instance of consumer exists then kafka_destroy always hangs forever + /* Destroy handle */ +// coio_call(kafka_destroy, producer->rd_producer); + } + + free(producer); + return err; +} + +static int +lua_producer_close(struct lua_State *L) { + producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); + if (producer_p == NULL || *producer_p == NULL) { + lua_pushboolean(L, 0); + return 1; + } + + rd_kafka_resp_err_t err = producer_close(*producer_p); + if (err) { + lua_pushboolean(L, 1); + + const char *const_err_str = rd_kafka_err2str(err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + return fail ? lua_push_error(L): 2; + } + + *producer_p = NULL; + lua_pushboolean(L, 1); + return 1; +} + +static int +lua_producer_gc(struct lua_State *L) { + producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); + if (producer_p && *producer_p) { + producer_close(*producer_p); + } + if (producer_p) + *producer_p = NULL; + return 0; +} + +void +msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *msg, void *opaque) { + if (msg->_private != NULL) { + queue_element_t *element = msg->_private; + queue_t *queue = opaque; + if (element != NULL) { + if (msg->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + element->err = msg->err; + } + queue_push(queue, element); + } + } +} + +static int +lua_create_producer(struct lua_State *L) { + if (lua_gettop(L) != 1 || !lua_istable(L, 1)) + luaL_error(L, "Usage: producer, err = create_producer(conf)"); + + lua_pushstring(L, "brokers"); + lua_gettable(L, -2 ); + const char *brokers = lua_tostring(L, -1); + lua_pop(L, 1); + if (brokers == NULL) { + lua_pushnil(L); + int fail = safe_pushstring(L, "producer config table must have non nil key 'brokers' which contains string"); + return fail ? lua_push_error(L): 2; + } + + char errstr[512]; + + rd_kafka_conf_t *rd_config = rd_kafka_conf_new(); + + queue_t *delivery_queue = new_queue(); + // queue now accessible from callback + rd_kafka_conf_set_opaque(rd_config, delivery_queue); + + rd_kafka_conf_set_dr_msg_cb(rd_config, msg_delivery_callback); + + // enabling delivering events +// rd_kafka_conf_set_events(rd_config, RD_KAFKA_EVENT_STATS | RD_KAFKA_EVENT_DR); + + lua_pushstring(L, "options"); + lua_gettable(L, -2 ); + if (lua_istable(L, -1)) { + lua_pushnil(L); + // stack now contains: -1 => nil; -2 => table + while (lua_next(L, -2)) { + // stack now contains: -1 => value; -2 => key; -3 => table + if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) { + lua_pushnil(L); + int fail = safe_pushstring(L, "producer config options must contains only string keys and string values"); + return fail ? lua_push_error(L): 2; + } + + const char *value = lua_tostring(L, -1); + const char *key = lua_tostring(L, -2); + if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) { + lua_pushnil(L); + int fail = safe_pushstring(L, errstr); + return fail ? lua_push_error(L): 2; + } + + // pop value, leaving original key + lua_pop(L, 1); + // stack now contains: -1 => key; -2 => table + } + // stack now contains: -1 => table + } + lua_pop(L, 1); + + rd_kafka_t *rd_producer; + if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) { + lua_pushnil(L); + int fail = safe_pushstring(L, errstr); + return fail ? lua_push_error(L): 2; + } + + if (rd_kafka_brokers_add(rd_producer, brokers) == 0) { + lua_pushnil(L); + int fail = safe_pushstring(L, "No valid brokers specified"); + return fail ? lua_push_error(L): 2; + } + + producer_t *producer; + producer = malloc(sizeof(producer_t)); + producer->rd_config = rd_config; + producer->rd_producer = rd_producer; + producer->topics = new_producer_topics(256); + producer->delivery_queue = delivery_queue; + + producer_t **producer_p = (producer_t **)lua_newuserdata(L, sizeof(producer)); + *producer_p = producer; + + luaL_getmetatable(L, producer_label); + lua_setmetatable(L, -2); + return 1; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/** + * Entry point + */ + +LUA_API int +luaopen_kafka_tntkafka(lua_State *L) { + static const struct luaL_Reg consumer_methods [] = { + {"subscribe", lua_consumer_subscribe}, + {"poll", lua_consumer_poll}, + {"poll_msg", lua_consumer_poll_msg}, + {"store_offset", lua_consumer_store_offset}, + {"close", lua_consumer_close}, + {"__tostring", lua_consumer_tostring}, + {"__gc", lua_consumer_gc}, + {NULL, NULL} + }; + + luaL_newmetatable(L, consumer_label); + lua_pushvalue(L, -1); + luaL_register(L, NULL, consumer_methods); + lua_setfield(L, -2, "__index"); + lua_pushstring(L, consumer_label); + lua_setfield(L, -2, "__metatable"); + lua_pop(L, 1); + + static const struct luaL_Reg consumer_msg_methods [] = { + {"topic", lua_consumer_msg_topic}, + {"partition", lua_consumer_msg_partition}, + {"offset", lua_consumer_msg_offset}, + {"key", lua_consumer_msg_key}, + {"value", lua_consumer_msg_value}, + {"__tostring", lua_consumer_msg_tostring}, + {"__gc", lua_consumer_msg_gc}, + {NULL, NULL} + }; + + luaL_newmetatable(L, consumer_msg_label); + lua_pushvalue(L, -1); + luaL_register(L, NULL, consumer_msg_methods); + lua_setfield(L, -2, "__index"); + lua_pushstring(L, consumer_msg_label); + lua_setfield(L, -2, "__metatable"); + lua_pop(L, 1); + + static const struct luaL_Reg producer_methods [] = { + {"poll", lua_producer_poll}, + {"produce", lua_producer_produce}, + {"msg_delivery_poll", lua_producer_msg_delivery_poll}, + {"close", lua_producer_close}, + {"__tostring", lua_producer_tostring}, + {"__gc", lua_producer_gc}, + {NULL, NULL} + }; + + luaL_newmetatable(L, producer_label); + lua_pushvalue(L, -1); + luaL_register(L, NULL, producer_methods); + lua_setfield(L, -2, "__index"); + lua_pushstring(L, producer_label); + lua_setfield(L, -2, "__metatable"); + lua_pop(L, 1); + + lua_newtable(L); + static const struct luaL_Reg meta [] = { + {"create_consumer", lua_create_consumer}, + {"create_producer", lua_create_producer}, + {NULL, NULL} + }; + luaL_register(L, NULL, meta); + return 1; +}