Skip to content

Commit

Permalink
add method to list_groups
Browse files Browse the repository at this point in the history
See https://github.com/edenhill/librdkafka/blob/cfc0731617ddd6858058f31f564772202f209e72/src/rdkafka.h#L4825

Syntax:
   consumer:list_groups({group = ..., timeout = ...})
   producer:list_groups({group = ..., timeout = ...})

Part of #59
  • Loading branch information
olegrok committed Jan 26, 2022
1 parent df863cb commit 83ee6e8
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 0 deletions.
109 changes: 109 additions & 0 deletions kafka/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,112 @@ lua_librdkafka_metadata(struct lua_State *L, rd_kafka_t *rk, rd_kafka_topic_t *o
rd_kafka_metadata_destroy(metadatap);
return 1;
}

static ssize_t
wait_librdkafka_list_groups(va_list args) {
rd_kafka_t *rk = va_arg(args, rd_kafka_t *);
const char *group = va_arg(args, const char *);
const struct rd_kafka_group_list **grplistp = va_arg(args, const struct rd_kafka_group_list **);
int timeout_ms = va_arg(args, int);
return rd_kafka_list_groups(rk, group, grplistp, timeout_ms);
}

int
lua_librdkafka_list_groups(struct lua_State *L, rd_kafka_t *rk, const char *group, int timeout_ms) {
const struct rd_kafka_group_list *grplistp;
rd_kafka_resp_err_t err = coio_call(wait_librdkafka_list_groups, rk, group, &grplistp, timeout_ms);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
lua_pushnil(L);
lua_pushstring(L, rd_kafka_err2str(err));
return 2;
}

lua_createtable(L, grplistp->group_cnt, 0);
for (int i = 0; i < grplistp->group_cnt; i++) {
lua_pushnumber(L, i + 1);
lua_createtable(L, 0, 8);

lua_pushliteral(L, "broker");
lua_createtable(L, 0, 3);

lua_pushliteral(L, "id");
lua_pushnumber(L, grplistp->groups[i].broker.id);
lua_settable(L, -3);

lua_pushliteral(L, "port");
lua_pushnumber(L, grplistp->groups[i].broker.port);
lua_settable(L, -3);

lua_pushliteral(L, "host");
lua_pushstring(L, grplistp->groups[i].broker.host);
lua_settable(L, -3);

lua_settable(L, -3);

lua_pushstring(L, "group");
lua_pushstring(L, grplistp->groups[i].group);
lua_settable(L, -3);

if (grplistp->groups[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
lua_pushliteral(L, "error_code");
lua_pushnumber(L, grplistp->groups[i].err);
lua_settable(L, -3);

lua_pushliteral(L, "error");
lua_pushstring(L, rd_kafka_err2str(grplistp->groups[i].err));
lua_settable(L, -3);
}

lua_pushliteral(L, "state");
lua_pushstring(L, grplistp->groups[i].state);
lua_settable(L, -3);

lua_pushliteral(L, "protocol_type");
lua_pushstring(L, grplistp->groups[i].protocol_type);
lua_settable(L, -3);

lua_pushliteral(L, "protocol");
lua_pushstring(L, grplistp->groups[i].protocol);
lua_settable(L, -3);

lua_pushliteral(L, "members");
lua_createtable(L, grplistp->groups[i].member_cnt, 0);
for (int j = 0; j < grplistp->groups[i].member_cnt; j++) {
lua_pushnumber(L, j + 1);
lua_createtable(L, 0, 8);

lua_pushliteral(L, "member_id");
lua_pushstring(L, grplistp->groups[i].members[j].member_id);
lua_settable(L, -3);

lua_pushliteral(L, "client_id");
lua_pushstring(L, grplistp->groups[i].members[j].client_id);
lua_settable(L, -3);

lua_pushliteral(L, "client_host");
lua_pushstring(L, grplistp->groups[i].members[j].client_host);
lua_settable(L, -3);

lua_pushliteral(L, "member_metadata");
lua_pushlstring(L,
grplistp->groups[i].members[j].member_metadata,
grplistp->groups[i].members[j].member_metadata_size);
lua_settable(L, -3);

lua_pushliteral(L, "member_assignment");
lua_pushlstring(L,
grplistp->groups[i].members[j].member_assignment,
grplistp->groups[i].members[j].member_assignment_size);
lua_settable(L, -3);

lua_settable(L, -3);
}
lua_settable(L, -3);

lua_settable(L, -3);
}

rd_kafka_group_list_destroy(grplistp);
return 1;
}
3 changes: 3 additions & 0 deletions kafka/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ int lua_librdkafka_dump_conf(struct lua_State *L, rd_kafka_t *rk);
int
lua_librdkafka_metadata(struct lua_State *L, rd_kafka_t *rk, rd_kafka_topic_t *only_rkt, int timeout_ms);

int
lua_librdkafka_list_groups(struct lua_State *L, rd_kafka_t *rk, const char *group, int timeout_ms);

/**
* Push native lua error with code -3
*/
Expand Down
14 changes: 14 additions & 0 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,17 @@ lua_consumer_metadata(struct lua_State *L) {
}
return 0;
}

int
lua_consumer_list_groups(struct lua_State *L) {
consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label);
if (consumer_p == NULL || *consumer_p == NULL)
return 0;

if ((*consumer_p)->rd_consumer != NULL) {
const char *group = lua_tostring(L, 2);
int timeout_ms = lua_tointeger(L, 3);
return lua_librdkafka_list_groups(L, (*consumer_p)->rd_consumer, group, timeout_ms);
}
return 0;
}
2 changes: 2 additions & 0 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ int lua_consumer_dump_conf(struct lua_State *L);

int lua_consumer_metadata(struct lua_State *L);

int lua_consumer_list_groups(struct lua_State *L);

#endif //TNT_KAFKA_CONSUMER_H
36 changes: 36 additions & 0 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ function Consumer:metadata(options)
return self._consumer:metadata(timeout_ms)
end

function Consumer:list_groups(options)
if self._consumer == nil then
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end

local group
if options ~= nil and options.timeout_ms ~= nil then
group = options.group
end

return self._consumer:list_groups(group, timeout_ms)
end

local Producer = {}

Producer.__index = Producer
Expand Down Expand Up @@ -392,6 +410,24 @@ function Producer:metadata(options)
return self._producer:metadata(topic, timeout_ms)
end

function Producer:list_groups(options)
if self._producer == nil then
return
end

local timeout_ms = DEFAULT_TIMEOUT_MS
if options ~= nil and options.timeout_ms ~= nil then
timeout_ms = options.timeout_ms
end

local group
if options ~= nil and options.timeout_ms ~= nil then
group = options.group
end

return self._producer:list_groups(group, timeout_ms)
end

function Producer:close()
if self._producer == nil then
return false
Expand Down
14 changes: 14 additions & 0 deletions kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,17 @@ lua_producer_metadata(struct lua_State *L) {
}
return 0;
}

int
lua_producer_list_groups(struct lua_State *L) {
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
if (producer_p == NULL || *producer_p == NULL)
return 0;

if ((*producer_p)->rd_producer != NULL) {
const char *group = lua_tostring(L, 2);
int timeout_ms = lua_tointeger(L, 3);
return lua_librdkafka_list_groups(L, (*producer_p)->rd_producer, group, timeout_ms);
}
return 0;
}
2 changes: 2 additions & 0 deletions kafka/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ int lua_producer_dump_conf(struct lua_State *L);

int lua_producer_metadata(struct lua_State *L);

int lua_producer_list_groups(struct lua_State *L);

#endif //TNT_KAFKA_PRODUCER_H
2 changes: 2 additions & 0 deletions kafka/tnt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"store_offset", lua_consumer_store_offset},
{"dump_conf", lua_consumer_dump_conf},
{"metadata", lua_consumer_metadata},
{"list_groups", lua_consumer_list_groups},
{"close", lua_consumer_close},
{"destroy", lua_consumer_destroy},
{"__tostring", lua_consumer_tostring},
Expand Down Expand Up @@ -68,6 +69,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"poll_errors", lua_producer_poll_errors},
{"dump_conf", lua_producer_dump_conf},
{"metadata", lua_producer_metadata},
{"list_groups", lua_producer_list_groups},
{"close", lua_producer_close},
{"destroy", lua_producer_destroy},
{"__tostring", lua_producer_tostring},
Expand Down
5 changes: 5 additions & 0 deletions tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ local function metadata(timeout_ms)
return consumer:metadata({timeout_ms = timeout_ms})
end

local function list_groups(timeout_ms)
return consumer:list_groups({timeout_ms = timeout_ms})
end

local function close()
log.info("closing consumer")
local _, err = consumer:close()
Expand All @@ -167,4 +171,5 @@ return {
get_rebalances = get_rebalances,
dump_conf = dump_conf,
metadata = metadata,
list_groups = list_groups,
}
5 changes: 5 additions & 0 deletions tests/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ local function metadata(timeout_ms, topic)
return producer:metadata({timeout_ms = timeout_ms, topic = topic})
end

local function list_groups(timeout_ms)
return producer:list_groups({timeout_ms = timeout_ms})
end

local function close()
local _, err = producer:close()
if err ~= nil then
Expand All @@ -100,4 +104,5 @@ return {
close = close,
dump_conf = dump_conf,
metadata = metadata,
list_groups = list_groups,
}
8 changes: 8 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,18 @@ def test_consumer_metadata():
response = server.call("consumer.metadata", [0])
assert tuple(response) == (None, 'Local: Timed out')

response = server.call("consumer.list_groups", [])
assert response[0] is not None
response = server.call("consumer.list_groups", [0])
assert tuple(response) == (None, 'Local: Timed out')

with create_consumer(server, "badhost:9090"):
response = server.call("consumer.metadata", [0])
assert tuple(response) == (None, 'Local: Broker transport failure')

response = server.call("consumer.metadata", [0])
assert tuple(response) == (None, 'Local: Broker transport failure')


def test_consumer_should_log_debug():
server = get_server()
Expand Down
7 changes: 7 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def test_producer_metadata():
assert 'port' in response[0]['brokers'][0]
assert 'id' in response[0]['brokers'][0]

response = server.call("producer.list_groups", [])
assert response[0] is not None
response = server.call("producer.list_groups", [0])
assert tuple(response) == (None, 'Local: Timed out')

response = server.call("producer.metadata", [0])
assert tuple(response) == (None, 'Local: Timed out')

Expand All @@ -160,6 +165,8 @@ def test_producer_metadata():
server.call("producer.create", ["badhost:8080"])
response = server.call("producer.metadata", [200])
assert tuple(response) == (None, 'Local: Broker transport failure')
response = server.call("producer.list_groups", [200])
assert response[0] is None
server.call("producer.close", [])


Expand Down

0 comments on commit 83ee6e8

Please sign in to comment.