Skip to content

Commit

Permalink
fix(kafka-source-connector): switch to confluent_kafka_python library
Browse files Browse the repository at this point in the history
It was decided to switch from kafka-python to confluent_kafka_python library,
as a Kafka client in sdcm.kafka.kafka_consumer module.
confluent_kafka_python is more feature reach (it has support for kafka messages schemas
serialization/deserialization out of the box) and actively maintained.

(cherry picked from commit 7cbe4d5)
  • Loading branch information
dimakr authored and soyacz committed Oct 31, 2024
1 parent e9e2ed8 commit a9f2b1f
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 49 deletions.
1 change: 0 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,5 @@ hdrhistogram==0.9.2
deepdiff==6.2.3
PyGithub==2.1.1
gimme-aws-creds==2.8.0
kafka-python==2.0.2
confluent-kafka==2.5.3
fastavro==1.9.7
4 changes: 0 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -935,10 +935,6 @@ jwcrypto==1.5.6 \
--hash=sha256:150d2b0ebbdb8f40b77f543fb44ffd2baeff48788be71f67f03566692fd55789 \
--hash=sha256:771a87762a0c081ae6166958a954f80848820b2ab066937dc8b8379d65b1b039
# via okta
kafka-python==2.0.2 \
--hash=sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3 \
--hash=sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e
# via -r requirements.in
keyring==25.4.1 \
--hash=sha256:5426f817cf7f6f007ba5ec722b1bcad95a75b27d780343772ad76b17cb47b0bf \
--hash=sha256:b07ebc55f3e8ed86ac81dd31ef14e81ace9dd9c3d4b5d77a6e9a2016d0d71a1b
Expand Down
46 changes: 25 additions & 21 deletions sdcm/kafka/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import base64
import json
import logging
import time

from threading import Event, Thread

import kafka
from confluent_kafka import Consumer

from sdcm.sct_config import SCTConfiguration
from sdcm.kafka.kafka_config import SctKafkaConfiguration
Expand Down Expand Up @@ -46,18 +47,19 @@ def __init__(self, tester, params: SCTConfiguration, kafka_addresses: list | Non
self.read_number_of_key = int(kwargs.get('read_number_of_key', 0))

connector_config: SctKafkaConfiguration = params.get("kafka_connectors")[connector_index]
consumer_config = {
'bootstrap.servers': ','.join(self.kafka_addresses),
'group.id': self.group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000,
}
self.consumer = Consumer(consumer_config)

# TODO: handle setup of multiple tables
topic = f'{connector_config.config.scylla_name}.{connector_config.config.scylla_table_names}'
self.wait_for_topic(topic, timeout=60)
self.consumer = kafka.KafkaConsumer(
topic,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
group_id=self.group_id,
bootstrap_servers=self.kafka_addresses,
)
self.consumer.subscribe([topic])

super().__init__(daemon=True)

Expand All @@ -70,8 +72,7 @@ def kafka_addresses(self):
return None

def get_topics(self):
admin_client = kafka.KafkaAdminClient(bootstrap_servers=self.kafka_addresses)
topics = admin_client.list_topics()
topics = list(self.consumer.list_topics(timeout=10).topics.keys())
LOGGER.debug(topics)
return topics

Expand All @@ -84,16 +85,19 @@ def check_topic_exists():

def run(self):
while not self.termination_event.is_set():
records = self.consumer.poll(timeout_ms=1000)
for _, consumer_records in records.items():
for msg in consumer_records:
data = json.loads(msg.value).get('payload', {}).get('after', {})
key = base64.b64decode(data.get('key')).decode()
self.keys.add(key)

if len(self.keys) >= self.read_number_of_key:
LOGGER.info("reach `read_number_of_key` stopping reader thread")
self.stop()
msgs = self.consumer.consume(num_messages=self.read_number_of_key, timeout=1.0)
if not msgs:
time.sleep(0.5)
continue
for msg in msgs:
data = json.loads(msg.value()).get('payload', {}).get('after', {})
key = base64.b64decode(data.get('key')).decode()
self.keys.add(key)

if len(self.keys) >= self.read_number_of_key:
LOGGER.info("reach `read_number_of_key` stopping reader thread")
self.stop()
break

def stop(self):
self.termination_event.set()
Expand Down
23 changes: 0 additions & 23 deletions sdcm/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2653,28 +2653,6 @@ def get_placement_groups(region):
LOGGER.info("Found total of {} instances.".format(total_items))

return placement_groups
<<<<<<< HEAD
=======


def skip_optional_stage(stage_names: str | list[str]) -> bool:
"""
Checks if the given test stage(s) is skipped for execution
:param stage_names: str or list, name of the test stage(s)
:return: bool
"""
# making import here, to work around circular import issue
from sdcm.cluster import TestConfig
stage_names = stage_names if isinstance(stage_names, list) else [stage_names]
skip_test_stages = TestConfig().tester_obj().skip_test_stages
skipped_stages = [stage for stage in stage_names if skip_test_stages[stage]]

if skipped_stages:
skipped_stages_str = ', '.join(skipped_stages)
LOGGER.warning("'%s' test stage(s) is disabled.", skipped_stages_str)
return True
return False


def parse_python_thread_command(cmd: str) -> dict:
Expand Down Expand Up @@ -2717,4 +2695,3 @@ def parse_python_thread_command(cmd: str) -> dict:
tokens_iter = iter([next_token] + list(tokens_iter))

return options
>>>>>>> 56a1d5e39 (feature(kafka-conn-sink): add tests for kafka sink connector)

0 comments on commit a9f2b1f

Please sign in to comment.