diff --git a/kafka/common.c b/kafka/common.c index 390ad1b..f9ff761 100644 --- a/kafka/common.c +++ b/kafka/common.c @@ -186,3 +186,112 @@ lua_librdkafka_metadata(struct lua_State *L, rd_kafka_t *rk, rd_kafka_topic_t *o rd_kafka_metadata_destroy(metadatap); return 1; } + +static ssize_t +wait_librdkafka_list_groups(va_list args) { + rd_kafka_t *rk = va_arg(args, rd_kafka_t *); + const char *group = va_arg(args, const char *); + const struct rd_kafka_group_list **grplistp = va_arg(args, const struct rd_kafka_group_list **); + int timeout_ms = va_arg(args, int); + return rd_kafka_list_groups(rk, group, grplistp, timeout_ms); +} + +int +lua_librdkafka_list_groups(struct lua_State *L, rd_kafka_t *rk, const char *group, int timeout_ms) { + const struct rd_kafka_group_list *grplistp; + rd_kafka_resp_err_t err = coio_call(wait_librdkafka_list_groups, rk, group, &grplistp, timeout_ms); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + lua_pushnil(L); + lua_pushstring(L, rd_kafka_err2str(err)); + return 2; + } + + lua_createtable(L, grplistp->group_cnt, 0); + for (int i = 0; i < grplistp->group_cnt; i++) { + lua_pushnumber(L, i + 1); + lua_createtable(L, 0, 8); + + lua_pushliteral(L, "broker"); + lua_createtable(L, 0, 3); + + lua_pushliteral(L, "id"); + lua_pushnumber(L, grplistp->groups[i].broker.id); + lua_settable(L, -3); + + lua_pushliteral(L, "port"); + lua_pushnumber(L, grplistp->groups[i].broker.port); + lua_settable(L, -3); + + lua_pushliteral(L, "host"); + lua_pushstring(L, grplistp->groups[i].broker.host); + lua_settable(L, -3); + + lua_settable(L, -3); + + lua_pushstring(L, "group"); + lua_pushstring(L, grplistp->groups[i].group); + lua_settable(L, -3); + + if (grplistp->groups[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) { + lua_pushliteral(L, "error_code"); + lua_pushnumber(L, grplistp->groups[i].err); + lua_settable(L, -3); + + lua_pushliteral(L, "error"); + lua_pushstring(L, rd_kafka_err2str(grplistp->groups[i].err)); + lua_settable(L, -3); + } + + lua_pushliteral(L, "state"); + lua_pushstring(L, grplistp->groups[i].state); + lua_settable(L, -3); + + lua_pushliteral(L, "protocol_type"); + lua_pushstring(L, grplistp->groups[i].protocol_type); + lua_settable(L, -3); + + lua_pushliteral(L, "protocol"); + lua_pushstring(L, grplistp->groups[i].protocol); + lua_settable(L, -3); + + lua_pushliteral(L, "members"); + lua_createtable(L, grplistp->groups[i].member_cnt, 0); + for (int j = 0; j < grplistp->groups[i].member_cnt; j++) { + lua_pushnumber(L, j + 1); + lua_createtable(L, 0, 8); + + lua_pushliteral(L, "member_id"); + lua_pushstring(L, grplistp->groups[i].members[j].member_id); + lua_settable(L, -3); + + lua_pushliteral(L, "client_id"); + lua_pushstring(L, grplistp->groups[i].members[j].client_id); + lua_settable(L, -3); + + lua_pushliteral(L, "client_host"); + lua_pushstring(L, grplistp->groups[i].members[j].client_host); + lua_settable(L, -3); + + lua_pushliteral(L, "member_metadata"); + lua_pushlstring(L, + grplistp->groups[i].members[j].member_metadata, + grplistp->groups[i].members[j].member_metadata_size); + lua_settable(L, -3); + + lua_pushliteral(L, "member_assignment"); + lua_pushlstring(L, + grplistp->groups[i].members[j].member_assignment, + grplistp->groups[i].members[j].member_assignment_size); + lua_settable(L, -3); + + lua_settable(L, -3); + } + lua_settable(L, -3); + + lua_settable(L, -3); + } + + rd_kafka_group_list_destroy(grplistp); + return 1; +} diff --git a/kafka/common.h b/kafka/common.h index c024aeb..e26b7a1 100644 --- a/kafka/common.h +++ b/kafka/common.h @@ -28,6 +28,9 @@ int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk); int lua_librdkafka_metadata(struct lua_State *L, rd_kafka_t *rk, rd_kafka_topic_t *only_rkt, int timeout_ms); +int +lua_librdkafka_list_groups(struct lua_State *L, rd_kafka_t *rk, const char *group, int timeout_ms); + /** * Push native lua error with code -3 */ diff --git a/kafka/consumer.c b/kafka/consumer.c index 7ac8a4d..8c1c1bd 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -660,3 +660,17 @@ lua_consumer_metadata(struct lua_State *L) { } return 0; } + +int +lua_consumer_list_groups(struct lua_State *L) { + consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); + if (consumer_p == NULL || *consumer_p == NULL) + return 0; + + if ((*consumer_p)->rd_consumer != NULL) { + const char *group = lua_tostring(L, 2); + int timeout_ms = lua_tointeger(L, 3); + return lua_librdkafka_list_groups(L, (*consumer_p)->rd_consumer, group, timeout_ms); + } + return 0; +} diff --git a/kafka/consumer.h b/kafka/consumer.h index 03ee3dc..05904c0 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -60,4 +60,6 @@ int lua_consumer_dump_conf(struct lua_State *L); int lua_consumer_metadata(struct lua_State *L); +int lua_consumer_list_groups(struct lua_State *L); + #endif //TNT_KAFKA_CONSUMER_H diff --git a/kafka/init.lua b/kafka/init.lua index 8e23c43..a429c86 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -218,6 +218,24 @@ function Consumer:metadata(options) return self._consumer:metadata(timeout_ms) end +function Consumer:list_groups(options) + if self._consumer == nil then + return + end + + local timeout_ms = DEFAULT_TIMEOUT_MS + if options ~= nil and options.timeout_ms ~= nil then + timeout_ms = options.timeout_ms + end + + local group + if options ~= nil and options.timeout_ms ~= nil then + group = options.group + end + + return self._consumer:list_groups(group, timeout_ms) +end + local Producer = {} Producer.__index = Producer @@ -392,6 +410,24 @@ function Producer:metadata(options) return self._producer:metadata(topic, timeout_ms) end +function Producer:list_groups(options) + if self._producer == nil then + return + end + + local timeout_ms = DEFAULT_TIMEOUT_MS + if options ~= nil and options.timeout_ms ~= nil then + timeout_ms = options.timeout_ms + end + + local group + if options ~= nil and options.timeout_ms ~= nil then + group = options.group + end + + return self._producer:list_groups(group, timeout_ms) +end + function Producer:close() if self._producer == nil then return false diff --git a/kafka/producer.c b/kafka/producer.c index 73ab4f6..87acda7 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -547,3 +547,17 @@ lua_producer_metadata(struct lua_State *L) { } return 0; } + +int +lua_producer_list_groups(struct lua_State *L) { + producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); + if (producer_p == NULL || *producer_p == NULL) + return 0; + + if ((*producer_p)->rd_producer != NULL) { + const char *group = lua_tostring(L, 2); + int timeout_ms = lua_tointeger(L, 3); + return lua_librdkafka_list_groups(L, (*producer_p)->rd_producer, group, timeout_ms); + } + return 0; +} diff --git a/kafka/producer.h b/kafka/producer.h index bb76627..7e7667f 100644 --- a/kafka/producer.h +++ b/kafka/producer.h @@ -63,4 +63,6 @@ int lua_producer_dump_conf(struct lua_State *L); int lua_producer_metadata(struct lua_State *L); +int lua_producer_list_groups(struct lua_State *L); + #endif //TNT_KAFKA_PRODUCER_H diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index b7c0cb1..db138c3 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -27,6 +27,7 @@ luaopen_kafka_tntkafka(lua_State *L) { {"store_offset", lua_consumer_store_offset}, {"dump_conf", lua_consumer_dump_conf}, {"metadata", lua_consumer_metadata}, + {"list_groups", lua_consumer_list_groups}, {"close", lua_consumer_close}, {"destroy", lua_consumer_destroy}, {"__tostring", lua_consumer_tostring}, @@ -68,6 +69,7 @@ luaopen_kafka_tntkafka(lua_State *L) { {"poll_errors", lua_producer_poll_errors}, {"dump_conf", lua_producer_dump_conf}, {"metadata", lua_producer_metadata}, + {"list_groups", lua_producer_list_groups}, {"close", lua_producer_close}, {"destroy", lua_producer_destroy}, {"__tostring", lua_producer_tostring}, diff --git a/tests/consumer.lua b/tests/consumer.lua index d5b8a3e..a6c839d 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -145,6 +145,10 @@ local function metadata(timeout_ms) return consumer:metadata({timeout_ms = timeout_ms}) end +local function list_groups(timeout_ms) + return consumer:list_groups({timeout_ms = timeout_ms}) +end + local function close() log.info("closing consumer") local _, err = consumer:close() @@ -167,4 +171,5 @@ return { get_rebalances = get_rebalances, dump_conf = dump_conf, metadata = metadata, + list_groups = list_groups, } diff --git a/tests/producer.lua b/tests/producer.lua index 3a5e919..d937c2a 100644 --- a/tests/producer.lua +++ b/tests/producer.lua @@ -83,6 +83,10 @@ local function metadata(timeout_ms, topic) return producer:metadata({timeout_ms = timeout_ms, topic = topic}) end +local function list_groups(timeout_ms) + return producer:list_groups({timeout_ms = timeout_ms}) +end + local function close() local _, err = producer:close() if err ~= nil then @@ -100,4 +104,5 @@ return { close = close, dump_conf = dump_conf, metadata = metadata, + list_groups = list_groups, } diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f357d60..5c51007 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -264,10 +264,18 @@ def test_consumer_metadata(): response = server.call("consumer.metadata", [0]) assert tuple(response) == (None, 'Local: Timed out') + response = server.call("consumer.list_groups", []) + assert response[0] is not None + response = server.call("consumer.list_groups", [0]) + assert tuple(response) == (None, 'Local: Timed out') + with create_consumer(server, "badhost:9090"): response = server.call("consumer.metadata", [0]) assert tuple(response) == (None, 'Local: Broker transport failure') + response = server.call("consumer.metadata", [0]) + assert tuple(response) == (None, 'Local: Broker transport failure') + def test_consumer_should_log_debug(): server = get_server() diff --git a/tests/test_producer.py b/tests/test_producer.py index 0c54666..335d00f 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -152,6 +152,11 @@ def test_producer_metadata(): assert 'port' in response[0]['brokers'][0] assert 'id' in response[0]['brokers'][0] + response = server.call("producer.list_groups", []) + assert response[0] is not None + response = server.call("producer.list_groups", [0]) + assert tuple(response) == (None, 'Local: Timed out') + response = server.call("producer.metadata", [0]) assert tuple(response) == (None, 'Local: Timed out') @@ -160,6 +165,8 @@ def test_producer_metadata(): server.call("producer.create", ["badhost:8080"]) response = server.call("producer.metadata", [200]) assert tuple(response) == (None, 'Local: Broker transport failure') + response = server.call("producer.list_groups", [200]) + assert response[0] is None server.call("producer.close", [])