Skip to content

Commit

Permalink
feature(kafka): introduce kafka consumer thread
Browse files Browse the repository at this point in the history
with this thread we'll be able to read the data written
by the connector, and validate we are getting the information
we expect (number of rows as the first validation)
  • Loading branch information
fruch committed May 27, 2024
1 parent b249638 commit d2bf776
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docker/env/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.65-docker-compose
1.65-docker-compose-python-kafka
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ hdrhistogram==0.9.2
deepdiff==6.2.3
PyGithub==2.1.1
gimme-aws-creds==2.8.0
kafka-python==2.0.2
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ jmespath==1.0.1 \
# via
# boto3
# botocore
kafka-python==2.0.2 \
--hash=sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3 \
--hash=sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e
# via -r requirements.in
keyring==25.2.1 \
--hash=sha256:2458681cdefc0dbc0b7eb6cf75d0b98e59f9ad9b2d4edd319d18f68bdca95e50 \
--hash=sha256:daaffd42dbda25ddafb1ad5fec4024e5bbcfe424597ca1ca452b299861e49f1b
Expand Down
3 changes: 0 additions & 3 deletions sdcm/kafka/kafka_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
LOGGER = logging.getLogger(__name__)





class LocalKafkaCluster(cluster.BaseCluster):
def __init__(self, remoter=LOCALRUNNER):
super().__init__(cluster_prefix="kafka", add_nodes=False)
Expand Down
109 changes: 109 additions & 0 deletions sdcm/kafka/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2024 ScyllaDB

import base64
import json
import logging

from threading import Event, Thread

import kafka

from sdcm.sct_config import SCTConfiguration
from sdcm.kafka.kafka_config import SctKafkaConfiguration
from sdcm.utils.common import generate_random_string
from sdcm.wait import wait_for


LOGGER = logging.getLogger(__name__)


class KafkaCDCReaderThread(Thread): # pylint: disable=too-many-instance-attributes
"""
thread that listen on kafka topic, and list all the unique key
received, so we can validate how many unique key we got
"""

def __init__(self, tester, params: SCTConfiguration, kafka_addresses: list | None = None, # pylint: disable=too-many-arguments
connector_index: int = 0, group_id: str = None, duration: int | None = None, **kwargs):
self.keys = set()
self.termination_event = Event()
self.params = params
self.tester = tester
self.duration = duration
self._kafka_addresses = kafka_addresses
self.group_id = group_id or generate_random_string(16)
self.read_number_of_key = int(kwargs.get('read_number_of_key', 0))

connector_config: SctKafkaConfiguration = params.get("kafka_connectors")[connector_index]

# TODO: handle setup of multiple tables
topic = f'{connector_config.config.scylla_name}.{connector_config.config.scylla_table_names}'
self.wait_for_topic(topic, timeout=60)
self.consumer = kafka.KafkaConsumer(
topic,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
group_id=self.group_id,
bootstrap_servers=self.kafka_addresses,
)

super().__init__(daemon=True)

@property
def kafka_addresses(self):
if self.params.get('kafka_backend') == 'localstack':
return ['localhost']
elif self._kafka_addresses:
return self._kafka_addresses
return None

def get_topics(self):
admin_client = kafka.KafkaAdminClient(bootstrap_servers=self.kafka_addresses)
topics = admin_client.list_topics()
LOGGER.debug(topics)
return topics

def wait_for_topic(self, topic, timeout):
def check_topic_exists():
topics = self.get_topics()
return topic in topics

wait_for(check_topic_exists, text=f"waiting for topic={topic}", timeout=timeout)

def run(self):
while not self.termination_event.is_set():
records = self.consumer.poll(timeout_ms=1000)
for _, consumer_records in records.items():
for msg in consumer_records:
data = json.loads(msg.value).get('payload', {}).get('after', {})
key = base64.b64decode(data.get('key')).decode()
self.keys.add(key)
LOGGER.debug('read %d unique keys, so far', len(self.keys))

if len(self.keys) >= self.read_number_of_key:
LOGGER.info("reach `read_number_of_key` stopping reader thread")
self.stop()

def stop(self):
self.termination_event.set()
self.consumer.close()

def kill(self):
self.termination_event.set()
self.consumer.close()

def verify_results(self) -> (list[dict | None], list[str | None]):
self.join(self.duration)
return [], []
20 changes: 20 additions & 0 deletions sdcm/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@
make_cs_range_histogram_summary_by_interval
from sdcm.utils.raft.common import validate_raft_on_nodes
from sdcm.commit_log_check_thread import CommitLogCheckThread
from sdcm.kafka.kafka_consumer import KafkaCDCReaderThread
from test_lib.compaction import CompactionStrategy


CLUSTER_CLOUD_IMPORT_ERROR = ""
try:
import cluster_cloud
Expand All @@ -162,6 +164,8 @@

TEST_LOG = logging.getLogger(__name__)

PYTHON_THREAD_LIST = (KafkaCDCReaderThread,)


def teardown_on_exception(method):
"""
Expand Down Expand Up @@ -1915,6 +1919,8 @@ def run_stress_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_nu
return self.run_nosqlbench_thread(**params)
elif stress_cmd.startswith('table_compare'):
return self.run_table_compare_thread(**params)
elif stress_cmd.startswith('python_thread'):
return self.run_python_thread(**params)
else:
raise ValueError(f'Unsupported stress command: "{stress_cmd[:50]}..."')

Expand Down Expand Up @@ -2180,6 +2186,20 @@ def run_gemini(self, cmd, duration=None):
timeout=timeout,
params=self.params).run()

# pylint: disable=too-many-arguments
def run_python_thread(self, stress_cmd, duration=None, **_):
timeout = self.get_duration(duration)

options = dict(item.strip().split("=") for item in stress_cmd.replace('python_thread', '').strip().split(";"))
klass_thread = next(iter([t for t in PYTHON_THREAD_LIST if options.get('thread') == t.__name__]), None)
assert klass_thread
thread = klass_thread(tester=self,
stress_cmd=stress_cmd,
timeout=timeout,
params=self.params, **options)
thread.start()
return thread

def kill_stress_thread(self):
if self.loaders: # the test can fail on provision step and loaders are still not provisioned
self.loaders.kill_stress_thread()
Expand Down
1 change: 1 addition & 0 deletions sdcm/utils/remote_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ def get_system_logging_thread(logs_transport, node, target_log_file): # pylint:
return SSHGeneralFileLogger(node, target_log_file)
return None


class DockerComposeLogger(CommandClusterLoggerBase): # pylint: disable=too-few-public-methods
# pylint: disable=invalid-overridden-method
@cached_property
Expand Down
9 changes: 7 additions & 2 deletions test-cases/kafka/longevity-kafka-cdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pre_create_keyspace: [
"CREATE TABLE IF NOT EXISTS keyspace1.standard1 (key blob PRIMARY KEY, \"C0\" blob, \"C1\" blob, \"C2\" blob, \"C3\" blob, \"C4\" blob) WITH cdc = {'enabled': true, 'preimage': false, 'postimage': true, 'ttl': 600}"
]

stress_cmd: ["cassandra-stress write cl=ONE duration=1m -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=1) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=10 -pop seq=1..10000000 -log interval=5",
]
stress_cmd: ["cassandra-stress write cl=ONE n=1000 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=1) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=10 -pop seq=1..10000000 -log interval=5",
"python_thread thread=KafkaCDCReaderThread ; read_number_of_key=1000"]

n_loaders: 1
n_monitor_nodes: 1
Expand All @@ -32,3 +32,8 @@ kafka_connectors:
"connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector"
"scylla.name": "SCTConnectorNamespace"
"scylla.table.names": 'keyspace1.standard1'
"key.converter": 'org.apache.kafka.connect.json.JsonConverter'
"value.converter": 'org.apache.kafka.connect.json.JsonConverter'
"key.converter.schemas.enable": true
"value.converter.schemas.enable": true
"auto.create.topics.enable": true
46 changes: 39 additions & 7 deletions unit_tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@
#
# Copyright (c) 2023 ScyllaDB
import os
import logging

import pytest

from sdcm.stress_thread import CassandraStressThread
from sdcm.kafka.kafka_cluster import LocalKafkaCluster
from sdcm.kafka.kafka_consumer import KafkaCDCReaderThread
from unit_tests.dummy_remote import LocalLoaderSetDummy

pytestmark = [
pytest.mark.integration,
]

LOGGER = logging.getLogger(__name__)


@pytest.fixture(name="kafka_cluster", scope="session")
def fixture_kafka_cluster(tmp_path_factory):
Expand All @@ -32,16 +39,21 @@ def fixture_kafka_cluster(tmp_path_factory):
kafka.stop()


@pytest.mark.docker_scylla_args(docker_network="kafka-stack-docker-compose_default")
@pytest.mark.docker_scylla_args(docker_network="kafka-stack-docker-compose_default",
image="scylladb/scylla-nightly:latest")
@pytest.mark.sct_config(
files="unit_tests/test_data/kafka_connectors/scylla-cdc-source-connector.yaml"
)
def test_01_kafka_cdc_source_connector(docker_scylla, kafka_cluster, params):
def test_01_kafka_cdc_source_connector(request, docker_scylla, kafka_cluster, params, events):
"""
setup kafka with scylla-cdc-source-connector with docker based scylla node
- from confluent-hub
- from GitHub release url
"""

# pylint: disable=unused-argument,unnecessary-lambda
params['kafka_backend'] = 'localstack'

docker_scylla.run_cqlsh(
"CREATE KEYSPACE IF NOT EXISTS keyspace1 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1 };"
)
Expand All @@ -50,12 +62,32 @@ def test_01_kafka_cdc_source_connector(docker_scylla, kafka_cluster, params):
'"C0" blob, "C1" blob, "C2" blob, "C3" blob, "C4" blob) WITH '
"cdc = {'enabled': true, 'preimage': false, 'postimage': true, 'ttl': 600}"
)

docker_scylla.parent_cluster.nodes = [docker_scylla]
for connector_config in params.get("kafka_connectors"):
kafka_cluster.create_connector(
db_cluster=docker_scylla.parent_cluster, connector_config=connector_config
)
connector_config = params.get("kafka_connectors")[0]
kafka_cluster.create_connector(
db_cluster=docker_scylla.parent_cluster, connector_config=connector_config
)

loader_set = LocalLoaderSetDummy()

cmd = (
"""cassandra-stress write cl=ONE n=500 -rate threads=10 """
)

cs_thread = CassandraStressThread(
loader_set, cmd, node_list=[docker_scylla], timeout=120, params=params
)
request.addfinalizer(lambda: cs_thread.kill())

cs_thread.run()
LOGGER.info(cs_thread.get_results())

reader_thread = KafkaCDCReaderThread(tester=None, params=params, connector_index=0, read_number_of_key=500)
request.addfinalizer(lambda: reader_thread.stop())

reader_thread.start()

reader_thread.join()


@pytest.mark.docker_scylla_args(docker_network="kafka-stack-docker-compose_default")
Expand Down

0 comments on commit d2bf776

Please sign in to comment.