Skip to content

Commit

Permalink
Merge pull request #1 from tarantool/dev
Browse files Browse the repository at this point in the history
Reworked lib from ffi to pure c wrapper
  • Loading branch information
RepentantGopher authored Feb 1, 2019
2 parents 9860c99 + fd93acd commit f054802
Show file tree
Hide file tree
Showing 24 changed files with 1,685 additions and 1,178 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
tests/venv
tests/.pytest_cache
tests/__*
tests/__*
cmake-build-debug
25 changes: 25 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
cmake_minimum_required(VERSION 2.8 FATAL_ERROR)

project(tnt-kafka C)

set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake" ${CMAKE_MODULE_PATH})

# Set CFLAGS
set(MY_C_FLAGS "-Wall -Wextra -Werror -std=gnu11 -fno-strict-aliasing -Wno-deprecated-declarations")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${MY_C_FLAGS}")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ${MY_C_FLAGS} -ggdb3")

set(TARANTOOL_FIND_REQUIRED ON)
find_package(Tarantool)
# Oracle Connector requires fiber_cond API added in 1.7.4-291
string(REPLACE "-" "." tntver ${TARANTOOL_VERSION})
if(${tntver} VERSION_LESS 1.7.4.291)
message(FATAL_ERROR "Tarantool >= 1.7.4-291 is required")
endif()

set(RDKAFKA_FIND_REQUIRED ON)
find_package(RdKafka)

include_directories(${TARANTOOL_INCLUDE_DIRS})

add_subdirectory(tnt-kafka)
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ docker-read-topic-data:
--net=${NETWORK} \
--rm \
confluentinc/cp-kafka:5.0.0 \
kafka-console-consumer --bootstrap-server kafka:9092 --topic test_consumer --from-beginning
kafka-console-consumer --bootstrap-server kafka:9092 --topic test_producer --from-beginning

APP_NAME = kafka-test
APP_IMAGE = kafka-test-image
Expand Down
186 changes: 67 additions & 119 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,28 @@ tnt-kafka
=========
Full featured high performance kafka library for Tarantool based on [librdkafka](https://github.com/edenhill/librdkafka).

Can produce more then 80k messages per second and consume more then 130k messages per second.

Library was tested with librdkafka v0.11.5
Can produce more then 150k messages per second and consume more then 140k messages per second.

# Features
* Kafka producer and consumer implementations.
* Fiber friendly.
* Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess,
some libraries throws lua native `error` while others throws `box.error` instead. `tnt-kafka` returns
errors as strings which allows you to decide how to handle it.
non critical errors as strings which allows you to decide how to handle it.

# Requirements
* Tarantool >= 1.10.2
* Tarantool development headers
* librdkafka >= 0.11.5
* librdkafka development headers
* make
* cmake
* gcc

# Installation
```bash
tarantoolctl rocks install https://raw.githubusercontent.com/tarantool/tnt-kafka/master/rockspecs/tnt-kafka-scm-1.rockspec
```

# Examples

Expand All @@ -21,28 +33,17 @@ errors as strings which allows you to decide how to handle it.
```lua
local fiber = require('fiber')
local os = require('os')
local kafka_consumer = require('tnt-kafka.consumer')

local config, err = kafka_consumer.ConsumerConfig.create(
{"localhost:9092"}, -- array of brokers
"test_consumer", -- consumer group
true, -- enable auto offset store
{["auto.offset.reset"] = "earliest"} -- default configuration for topics
)
if err ~= nil then
print(err)
os.exit(1)
end

config:set_option("queued.min.messages", "100000") -- set global consumer option

local consumer, err = kafka_consumer.Consumer.create(config)
if err ~= nil then
print(err)
os.exit(1)
end

local err = consumer:start()
local tnt_kafka = require('tnt-kafka')

local consumer, err = tnt_kafka.Consumer.create({
brokers = "localhost:9092", -- brokers for bootstrap
options = {
["enable.auto.offset.store"] = "true",
["group.id"] = "example_consumer",
["auto.offset.reset"] = "earliest",
["enable.partition.eof"] = "false"
}, -- options for librdkafka
})
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -69,16 +70,16 @@ errors as strings which allows you to decide how to handle it.
local msg = out:get()
if msg ~= nil then
print(string.format(
"got msg with topic='%s' partition='%s' offset='%s' value='%s'",
msg:topic(), msg:partition(), msg:offset(), msg:value()
"got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'",
msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()
))
end
end
end)

fiber.sleep(10)

local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close
local err = consumer:close() -- always stop consumer to commit all pending offsets before app close
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -89,28 +90,17 @@ errors as strings which allows you to decide how to handle it.
```lua
local fiber = require('fiber')
local os = require('os')
local kafka_consumer = require('tnt-kafka.consumer')

local config, err = kafka_consumer.ConsumerConfig.create(
{"localhost:9092"}, -- array of brokers
"test_consumer", -- consumer group
false, -- disable auto offset store
{["auto.offset.reset"] = "earliest"} -- default configuration for topics
)
if err ~= nil then
print(err)
os.exit(1)
end

config:set_option("queued.min.messages", "100000") -- set global consumer option

local consumer, err = kafka_consumer.Consumer.create(config)
if err ~= nil then
print(err)
os.exit(1)
end

local err = consumer:start()
local tnt_kafka = require('tnt-kafka')

local consumer, err = tnt_kafka.Consumer.create({
brokers = "localhost:9092", -- brokers for bootstrap
options = {
["enable.auto.offset.store"] = "false",
["group.id"] = "example_consumer",
["auto.offset.reset"] = "earliest",
["enable.partition.eof"] = "false"
}, -- options for librdkafka
})
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -137,8 +127,8 @@ errors as strings which allows you to decide how to handle it.
local msg = out:get()
if msg ~= nil then
print(string.format(
"got msg with topic='%s' partition='%s' offset='%s' value='%s'",
msg:topic(), msg:partition(), msg:offset(), msg:value()
"got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'",
msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()
))

local err = consumer:store_offset(msg) -- don't forget to commit processed messages
Expand All @@ -155,7 +145,7 @@ errors as strings which allows you to decide how to handle it.

fiber.sleep(10)

local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close
local err = consumer:close() -- always stop consumer to commit all pending offsets before app close
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -168,41 +158,21 @@ errors as strings which allows you to decide how to handle it.

```lua
local os = require('os')
local kafka_producer = require('tnt-kafka.producer')
local tnt_kafka = require('tnt-kafka')

local config, err = kafka_producer.ProducerConfig.create(
{"localhost:9092"}, -- -- array of brokers
false -- sync_producer
)
if err ~= nil then
print(err)
os.exit(1)
end

config:set_option("statistics.interval.ms", "1000") -- set global producer option
config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats

local producer, err = kafka_producer.Producer.create(config)
if err ~= nil then
print(err)
os.exit(1)
end

local err = producer:start()
if err ~= nil then
print(err)
os.exit(1)
end

local err = producer:add_topic("test_topic", {}) -- add topic with configuration
local producer, err = tnt_kafka.Producer.create({
brokers = "kafka:9092", -- brokers for bootstrap
options = {} -- options for librdkafka
})
if err ~= nil then
print(err)
os.exit(1)
end

for i = 1, 1000 do
local err = producer:produce_async({ -- don't wait until message will be delivired to kafka
topic = "test_topic",
topic = "test_topic",
key = "test_key",
value = "test_value" -- only strings allowed
})
if err ~= nil then
Expand All @@ -211,7 +181,7 @@ errors as strings which allows you to decide how to handle it.
end
end

local err = producer:stop() -- always stop consumer to send all pending messages before app close
local err = producer:close() -- always stop consumer to send all pending messages before app close
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -222,33 +192,12 @@ errors as strings which allows you to decide how to handle it.
```lua
local fiber = require('fiber')
local os = require('os')
local kafka_producer = require('tnt-kafka.producer')
local tnt_kafka = require('tnt-kafka')

local config, err = kafka_producer.ProducerConfig.create(
{"localhost:9092"}, -- -- array of brokers
true -- sync_producer
)
if err ~= nil then
print(err)
os.exit(1)
end

config:set_option("statistics.interval.ms", "1000") -- set global producer option
config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats

local producer, err = kafka_producer.Producer.create(config)
if err ~= nil then
print(err)
os.exit(1)
end

local err = producer:start()
if err ~= nil then
print(err)
os.exit(1)
end

local err = producer:add_topic("test_topic", {}) -- add topic with configuration
local producer, err = tnt_kafka.Producer.create({
brokers = "kafka:9092", -- brokers for bootstrap
options = {} -- options for librdkafka
})
if err ~= nil then
print(err)
os.exit(1)
Expand All @@ -258,7 +207,8 @@ errors as strings which allows you to decide how to handle it.
fiber.create(function()
local message = "test_value " .. tostring(i)
local err = producer:produce({ -- wait until message will be delivired to kafka (using channel under the hood)
topic = "test_topic",
topic = "test_topic",
key = "test_key",
value = message -- only strings allowed
})
if err ~= nil then
Expand All @@ -271,22 +221,20 @@ errors as strings which allows you to decide how to handle it.

fiber.sleep(10)

local err = producer:stop() -- always stop consumer to send all pending messages before app close
local err = producer:close() -- always stop consumer to send all pending messages before app close
if err ~= nil then
print(err)
os.exit(1)
end
```

# Known issues
* Producer can use only random messages partitioning. It was done intentionally because non nil key
leads to segfault.
* Consumer leaves some non gc'able objects in memory after has been stopped. It was done intentionally
* Consumer and Producer leaves some non gc'able objects in memory after has been stopped. It was done intentionally
because `rd_kafka_destroy` sometimes hangs forever.

# TODO
* Rocks package
* Ordered storage for offsets to prevent commits unprocessed messages
* Add poll call for librdkafka logs and errors
* Fix known issues
* More examples
* Better documentation
Expand All @@ -297,7 +245,7 @@ because `rd_kafka_destroy` sometimes hangs forever.

### Async

Result: over 80000 produced messages per second on macbook pro 2016
Result: over 150000 produced messages per second on macbook pro 2016

Local run in docker:
```bash
Expand All @@ -308,7 +256,7 @@ Local run in docker:

### Sync

Result: over 50000 produced messages per second on macbook pro 2016
Result: over 90000 produced messages per second on macbook pro 2016

Local run in docker:
```bash
Expand All @@ -321,7 +269,7 @@ Local run in docker:

### Auto offset store enabled

Result: over 130000 consumed messages per second on macbook pro 2016
Result: over 140000 consumed messages per second on macbook pro 2016

Local run in docker:
```bash
Expand All @@ -332,12 +280,12 @@ Local run in docker:

### Manual offset store

Result: over 130000 consumed messages per second on macbook pro 2016
Result: over 140000 consumed messages per second on macbook pro 2016

Local run in docker:
```bash
make docker-run-environment
docker-create-benchmark-manual-commit-consumer-topic
make docker-create-benchmark-manual-commit-consumer-topic
make docker-run-benchmark-manual-commit-consumer-interactive
```

Expand Down
Loading

0 comments on commit f054802

Please sign in to comment.