diff --git a/defaults/test_default.yaml b/defaults/test_default.yaml index 5d32541e748..93bf8eac9d5 100644 --- a/defaults/test_default.yaml +++ b/defaults/test_default.yaml @@ -240,3 +240,6 @@ validate_large_collections: false scylla_d_overrides_files: [] run_commit_log_check_thread: true + +kafka_backend: none +kafka_connectors: [] diff --git a/docker/env/Dockerfile b/docker/env/Dockerfile index 1095a709ec1..cbd88504658 100644 --- a/docker/env/Dockerfile +++ b/docker/env/Dockerfile @@ -49,7 +49,8 @@ RUN DEBIAN_FRONTEND=noninteractive apt-get update && \ wget \ psmisc \ procps \ - docker-ce-cli && \ + docker-ce-cli \ + docker-compose-plugin && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* RUN curl -fsSLo /usr/local/bin/kubectl https://dl.k8s.io/release/v$KUBECTL_VERSION/bin/linux/amd64/kubectl && \ diff --git a/docker/env/version b/docker/env/version index 7383843b898..2af465cfd2f 100644 --- a/docker/env/version +++ b/docker/env/version @@ -1 +1 @@ -1.59-pygithub +1.60-docker-compose diff --git a/longevity_test.py b/longevity_test.py index 06b7887ec5e..71e0e06f1e7 100644 --- a/longevity_test.py +++ b/longevity_test.py @@ -132,6 +132,8 @@ def test_custom_time(self): self.run_pre_create_keyspace() self.run_pre_create_schema() + self.kafka_configure() + if scan_operation_params := self._get_scan_operation_params(): for scan_param in scan_operation_params: self.log.info("Starting fullscan operation thread with the following params: %s", scan_param) diff --git a/requirements.txt b/requirements.txt index 54206989740..6eaf5500b36 100644 --- a/requirements.txt +++ b/requirements.txt @@ -132,9 +132,9 @@ botocore==1.31.4 \ # awscli # boto3 # s3transfer -botocore-stubs==1.34.34 \ - --hash=sha256:34f4d446f9eed92bee62a0fc303b712d112a84aa356e3dccea2863ac79d9ef14 \ - --hash=sha256:38baf7068641edf57a215970caf273aad2e959ce0c5b93b39e9fa37ba49c30db +botocore-stubs==1.34.40 \ + --hash=sha256:50870a5ac8b638dac575aa26d92506dda8cc38e179f93ce2ffe48197ce054dbf \ + --hash=sha256:d6322bfb9aa882006fe7f8fc3d8a933a66c5627dcf571e65d65a5233c6c172be # via boto3-stubs build==1.0.3 \ --hash=sha256:538aab1b64f9828977f84bc63ae570b060a8ed1be419e7870b8b4fc5e6ea553b \ @@ -410,9 +410,9 @@ geomet==0.2.1.post1 \ --hash=sha256:91d754f7c298cbfcabd3befdb69c641c27fe75e808b27aa55028605761d17e95 \ --hash=sha256:a41a1e336b381416d6cbed7f1745c848e91defaa4d4c1bdc1312732e46ffad2b # via scylla-driver -google-api-core[grpc]==2.16.2 \ - --hash=sha256:032d37b45d1d6bdaf68fb11ff621e2593263a239fa9246e2e94325f9c47876d2 \ - --hash=sha256:449ca0e3f14c179b4165b664256066c7861610f70b6ffe54bb01a04e9b466929 +google-api-core[grpc]==2.17.0 \ + --hash=sha256:08ed79ed8e93e329de5e3e7452746b734e6bf8438d8d64dd3319d21d3164890c \ + --hash=sha256:de7ef0450faec7c75e0aea313f29ac870fdc44cfaec9d6499a9a17305980ef66 # via # google-api-python-client # google-cloud-compute @@ -604,9 +604,9 @@ humanreadable==0.4.0 \ --hash=sha256:2879a146f0602512addfcfba227956a3f1d23b99e9f938ff91b2085a170519ba \ --hash=sha256:5b70257a8e88856f9b64a1f0a6fb7535c9002818465e298ca27d745b25e5675d # via tcconfig -identify==2.5.33 \ - --hash=sha256:161558f9fe4559e1557e1bff323e8631f6a0e4837f7497767c1782832f16b62d \ - --hash=sha256:d40ce5fcd762817627670da8a7d8d8e65f24342d14539c59488dc603bf662e34 +identify==2.5.34 \ + --hash=sha256:a4316013779e433d08b96e5eabb7f641e6c7942e4ab5d4c509ebd2e7a8994aed \ + --hash=sha256:ee17bc9d499899bc9eaec1ac7bf2dc9eedd480db9d88b96d123d3b64a9d34f5d # via pre-commit idna==3.6 \ --hash=sha256:9ecdbbd083b06798ae1e86adcbfe8ab1479cf864e4ee30fe4e46a003d12491ca \ @@ -1123,8 +1123,9 @@ pyproject-hooks==1.0.0 \ --hash=sha256:283c11acd6b928d2f6a7c73fa0d01cb2bdc5f07c57a2eeb6e83d5e56b97976f8 \ --hash=sha256:f271b298b97f5955d53fb12b72c1fb1948c22c1a6b70b315c54cedaca0264ef5 # via build -pyroute2==0.7.11 \ - --hash=sha256:95852e702149b3d6abc8484d3291c38c45660168e8db76e5566a60ef0e133d5b +pyroute2==0.7.12 \ + --hash=sha256:54d226fc3ff2732f49bac9b26853c50c9d05be05a4d9daf09c7cf6d77301eff3 \ + --hash=sha256:9df8d0fcb5fb0a724603bcfdef76ffbd287f00f69e9fb660c20a06962b24691a # via tcconfig pytest==7.2.0 \ --hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \ @@ -1568,9 +1569,9 @@ pip==24.0 \ --hash=sha256:ba0d021a166865d2265246961bec0152ff124de910c5cc39f1156ce3fa7c69dc \ --hash=sha256:ea9bd1a847e8c5774a5777bb398c19e80bcd4e2aa16a4b301b718fe6f593aba2 # via pip-tools -setuptools==69.0.3 \ - --hash=sha256:385eb4edd9c9d5c17540511303e39a147ce2fc04bc55289c322b9e5904fe2c05 \ - --hash=sha256:be1af57fc409f93647f2e8e4573a142ed38724b8cdd389706a867bb4efcf1e78 +setuptools==69.1.0 \ + --hash=sha256:850894c4195f09c4ed30dba56213bf7c3f21d86ed6bdaafb5df5972593bfc401 \ + --hash=sha256:c054629b81b946d63a9c6e732bc8b2513a7c3ea645f11d0139a2191d735c60c6 # via # anyconfig # astroid diff --git a/sdcm/kafka/__init__.py b/sdcm/kafka/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdcm/kafka/kafka_cluster.py b/sdcm/kafka/kafka_cluster.py new file mode 100644 index 00000000000..434f7426ac7 --- /dev/null +++ b/sdcm/kafka/kafka_cluster.py @@ -0,0 +1,184 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright (c) 2023 ScyllaDB +import logging +from pathlib import Path +from functools import cached_property + +import requests + +from sdcm import cluster +from sdcm.wait import wait_for +from sdcm.remote import LOCALRUNNER +from sdcm.utils.git import clone_repo +from sdcm.utils.common import get_sct_root_path +from sdcm.utils.remote_logger import CommandClusterLoggerBase +from sdcm.kafka.kafka_config import SctKafkaConfiguration + +# TODO: write/think more about the consumers + +logger = logging.getLogger(__name__) + + +class KafkaLogger(CommandClusterLoggerBase): # pylint: disable=too-few-public-methods + # pylint: disable=invalid-overridden-method + @cached_property + def _logger_cmd(self) -> str: + return f"{self._cluster.compose_context} logs --no-color --tail=1000 >>{self._target_log_file}" + + +class LocalKafkaCluster(cluster.BaseCluster): + def __init__(self, remoter=LOCALRUNNER): + super().__init__(cluster_prefix="kafka", add_nodes=False) + self.remoter = remoter + self.docker_compose_path = ( + Path(get_sct_root_path()) / "kafka-stack-docker-compose" + ) + self._journal_thread: KafkaLogger | None = None + self.init_repository() + + def init_repository(self): + # TODO: make the url configurable + # TODO: get the version after install, and send out to Argus + repo_url = "https://github.com/fruch/kafka-stack-docker-compose.git" + branch = 'master' + clone_repo( + remoter=self.remoter, + repo_url=repo_url, + branch=branch, + destination_dir_name=str(self.docker_compose_path), + clone_as_root=False, + ) + self.remoter.run(f'mkdir -p {self.docker_compose_path / "connectors"}') + + @property + def compose_context(self): + return f"cd {self.docker_compose_path}; docker compose -f full-stack.yml" + + @property + def kafka_connect_url(self): + return "http://localhost:8083" + + def compose(self, cmd): + self.remoter.run(f"{self.compose_context} {cmd}") + + def start(self): + self.compose("up -d") + self.start_journal_thread() + + def stop(self): + self._journal_thread.stop(timeout=120) + self.compose("down") + + def install_connector(self, connector_version: str): + if connector_version.startswith("hub:"): + self.install_connector_from_hub(connector_version.replace("hub:", "")) + else: + self.install_connector_from_url(connector_version) + + def install_connector_from_hub( + self, connector_version: str = "scylladb/scylla-cdc-source-connector:latest" + ): + self.compose( + f"exec kafka-connect confluent-hub install --no-prompt {connector_version}" + ) + self.compose("restart kafka-connect") + + def install_connector_from_url(self, connector_url: str): + if connector_url.startswith("http"): + if connector_url.endswith('.jar'): + self.remoter.run( + f'curl -L --create-dirs -O --output-dir {self.docker_compose_path / "connectors"} {connector_url} ' + ) + if connector_url.endswith('.zip'): + self.remoter.run( + f'curl -L -o /tmp/connector.zip {connector_url} && ' + f'unzip /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip' + ) + if connector_url.startswith("file://"): + connector_local_path = connector_url.replace("file://", "") + if connector_url.endswith('.jar'): + self.remoter.run( + f'cp {connector_local_path} {self.docker_compose_path / "connectors"}' + ) + if connector_url.endswith('.zip'): + self.remoter.run( + f'unzip {connector_local_path} -d {self.docker_compose_path / "connectors"}' + ) + self.compose("restart kafka-connect") + + # TODO: find release based on 'curl https://api.github.com/repos/scylladb/scylla-cdc-source-connector/releases' + + def create_connector( + self, + db_cluster: cluster.BaseScyllaCluster, + connector_config: SctKafkaConfiguration, + ): + # TODO: extend the number of tasks + # TODO: handle user/password + # TODO: handle client encryption SSL + + connector_data = connector_config.dict(by_alias=True, exclude_none=True) + match connector_config.config.connector_class: + case "io.connect.scylladb.ScyllaDbSinkConnector": + scylla_addresses = ",".join( + [node.cql_address for node in db_cluster.nodes] + ) + connector_data["config"]["scylladb.contact.points"] = scylla_addresses + case "com.scylladb.cdc.debezium.connector.ScyllaConnector": + scylla_addresses = ",".join( + [f"{node.cql_address}:{node.CQL_PORT}" for node in db_cluster.nodes] + ) + connector_data["config"][ + "scylla.cluster.ip.addresses" + ] = scylla_addresses + + self.install_connector(connector_config.version) + + def kafka_connect_api_available(): + res = requests.head(url=self.kafka_connect_url) + res.raise_for_status() + return True + + wait_for( + func=kafka_connect_api_available, + step=2, + text="waiting for kafka-connect api", + timeout=120, + throw_exc=True, + ) + logger.debug(connector_data) + res = requests.post( + url=f"{self.kafka_connect_url}/connectors", json=connector_data + ) + logger.debug(res) + logger.debug(res.text) + res.raise_for_status() + + @property + def kafka_log(self) -> Path: + return Path(self.logdir) / "kafka.log" + + def start_journal_thread(self) -> None: + self._journal_thread = KafkaLogger(self, str(self.kafka_log)) + self._journal_thread.start() + + def add_nodes( + self, + count, + ec2_user_data="", + dc_idx=0, + rack=0, + enable_auto_bootstrap=False, + instance_type=None, + ): # pylint: disable=too-many-arguments + raise NotImplementedError diff --git a/sdcm/kafka/kafka_config.py b/sdcm/kafka/kafka_config.py new file mode 100644 index 00000000000..bda954835fe --- /dev/null +++ b/sdcm/kafka/kafka_config.py @@ -0,0 +1,37 @@ +from typing import Optional + +from pydantic import Field, BaseModel, Extra # pylint: disable=no-name-in-module + +# pylint: disable=too-few-public-methods + + +class ConnectorConfiguration(BaseModel): + # general options + connector_class: str = Field(alias="connector.class") + topics: Optional[str] + + # scylla-cdc-source-connector options + # see https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#configuration + # and https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#advanced-administration + scylla_name: Optional[str] = Field(alias="scylla.name") + scylla_table_names: Optional[str] = Field(alias="scylla.table.names") + scylla_user: Optional[str] = Field(alias="scylla.user") + scylla_password: Optional[str] = Field(alias="scylla.password") + + # kafka-connect-scylladb + # see https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/CONFIG.md + scylladb_contact_points: Optional[str] = Field(alias="scylladb.contact.points") + scylladb_keyspace: Optional[str] = Field(alias="scylladb.keyspace") + scylladb_user: Optional[str] = Field(alias="scylladb.user") + scylladb_password: Optional[str] = Field(alias="scylladb.password") + + class Config: + extra = Extra.allow + + +class SctKafkaConfiguration(BaseModel): + version: str = Field( + exclude=True + ) # url to specific release or hub version ex. 'hub:scylladb/scylla-cdc-source-connector:1.1.2' + name: str # connector name, each one should be named differently + config: ConnectorConfiguration diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py index a07987b231e..a9eb50a28fb 100644 --- a/sdcm/sct_config.py +++ b/sdcm/sct_config.py @@ -23,6 +23,7 @@ import getpass import pathlib import tempfile +from copy import deepcopy from typing import List, Union, Set from distutils.util import strtobool @@ -56,6 +57,7 @@ from sdcm.sct_events.base import add_severity_limit_rules, print_critical_events from sdcm.utils.gce_utils import get_gce_image_tags from sdcm.remote import LOCALRUNNER, shell_script_cmd +from sdcm.kafka.kafka_config import SctKafkaConfiguration def _str(value: str) -> str: @@ -1523,6 +1525,12 @@ class SCTConfiguration(dict): dict(name="run_commit_log_check_thread", env="SCT_RUN_COMMIT_LOG_CHECK_THREAD", type=boolean, help="""Run commit log check thread if commitlog_use_hard_size_limit is True"""), + dict(name="kafka_backend", env="SCT_KAFKA_BACKEND", type=str, + help="Enable validation for large cells in system table and logs", + choices=("none", "localstack", "vm", "msk")), + + dict(name="kafka_connectors", env="SCT_KAFKA_CONNECTORS", type=str_or_list_or_eval, + help="configuration for setup up kafka connectors"), ] required_params = ['cluster_backend', 'test_duration', 'n_db_nodes', 'n_loaders', 'use_preinstalled_scylla', @@ -1909,6 +1917,11 @@ def __init__(self): if len(nics) > 1 and len(self.region_names) >= 2: raise ValueError("Multiple network interfaces aren't supported for multi region use cases") + # 18: validate kafka configuration + if kafka_connectors := self.get('kafka_connectors'): + self['kafka_connectors'] = [SctKafkaConfiguration(**connector) + for connector in kafka_connectors] + def log_config(self): self.log.info(self.dump_config()) @@ -2415,7 +2428,15 @@ def dump_config(self): :return: str """ - return anyconfig.dumps(self, ac_parser="yaml") + out = deepcopy(self) + + # handle pydantic object, and convert them back to dicts + # TODO: automate the process if we gonna keep using them more, or replace the whole configuration with pydantic/dataclasses + if kafka_connectors := self.get('kafka_connectors'): + out['kafka_connectors'] = [connector.dict(by_alias=True, exclude_none=True) + for connector in kafka_connectors] + + return anyconfig.dumps(out, ac_parser="yaml") def dump_help_config_markdown(self): """ diff --git a/sdcm/tester.py b/sdcm/tester.py index 7cd0b4afa18..ffcc02e6447 100644 --- a/sdcm/tester.py +++ b/sdcm/tester.py @@ -56,6 +56,7 @@ from sdcm.cluster_k8s import mini_k8s, gke, eks from sdcm.cluster_k8s.eks import MonitorSetEKS from sdcm.cql_stress_cassandra_stress_thread import CqlStressCassandraStressThread +from sdcm.kafka.kafka_cluster import LocalKafkaCluster from sdcm.provision.azure.provisioner import AzureProvisioner from sdcm.provision.network_configuration import ssh_connection_ip_type from sdcm.provision.provisioner import provisioner_factory @@ -840,6 +841,12 @@ def prepare_kms_host(self) -> None: self.params["append_scylla_yaml"] = yaml.safe_dump(append_scylla_yaml) return None + def kafka_configure(self): + if self.kafka_cluster: + for connector_config in self.params.get('kafka_connectors'): + self.kafka_cluster.create_connector(db_cluster=self.db_cluster, + connector_config=connector_config) + @teardown_on_exception @log_run_info def setUp(self): # pylint: disable=too-many-branches,too-many-statements @@ -856,6 +863,7 @@ def setUp(self): # pylint: disable=too-many-branches,too-many-statements self.loaders = None self.monitors = None self.siren_manager = None + self.kafka_cluster = None self.k8s_clusters = [] self.connections = [] make_threads_be_daemonic_by_default() @@ -1554,6 +1562,14 @@ def get_cluster_k8s_local_kind_cluster(self): else: self.monitors = NoMonitorSet() + def get_cluster_kafka(self): + if kafka_backend := self.params.get('kafka_backend'): + if kafka_backend == 'localstack': + self.kafka_cluster = LocalKafkaCluster() + self.kafka_cluster.start() + else: + raise NotImplementedError(f"{kafka_backend=} not implemented") + @staticmethod def _add_and_wait_for_cluster_nodes_in_parallel(clusters): def _add_and_wait_for_cluster_nodes(cluster): @@ -1779,6 +1795,8 @@ def init_resources(self, loader_info=None, db_info=None, if cluster_backend is None: cluster_backend = 'aws' + self.get_cluster_kafka() + if cluster_backend in ('aws', 'aws-siren'): self.get_cluster_aws(loader_info=loader_info, db_info=db_info, monitor_info=monitor_info) @@ -2795,6 +2813,9 @@ def tearDown(self): k8s_cluster.gather_k8s_logs_by_operator() k8s_cluster.gather_k8s_logs() + if self.kafka_cluster: + with silence(parent=self, name='stopping kafka'): + self.kafka_cluster.stop() if self.params.get('collect_logs'): self.collect_logs() self.clean_resources() diff --git a/sdcm/utils/git.py b/sdcm/utils/git.py index 21608894c52..82b24baa3ee 100644 --- a/sdcm/utils/git.py +++ b/sdcm/utils/git.py @@ -89,13 +89,14 @@ def get_git_status_info() -> GitStatus: return git_status -def clone_repo(remoter, repo_url: str, destination_dir_name: str = "", clone_as_root=True): +def clone_repo(remoter, repo_url: str, destination_dir_name: str = "", clone_as_root=True, branch=None): # pylint: disable=broad-except try: LOGGER.debug("Cloning from %s...", repo_url) rm_cmd = f"rm -rf ./{repo_url.split('/')[-1].split('.')[0]}" remoter.sudo(rm_cmd, ignore_status=False) - clone_cmd = f"git clone {repo_url} {destination_dir_name}" + branch = f'--branch {branch}' if branch else '' + clone_cmd = f"git clone {branch} {repo_url} {destination_dir_name}" if clone_as_root: remoter.sudo(clone_cmd) else: diff --git a/test-cases/kafka/longevity-kafka-cdc-docker.yaml b/test-cases/kafka/longevity-kafka-cdc-docker.yaml new file mode 100644 index 00000000000..1cfdbb9769e --- /dev/null +++ b/test-cases/kafka/longevity-kafka-cdc-docker.yaml @@ -0,0 +1,35 @@ +test_duration: 60 +pre_create_keyspace: [ + "CREATE KEYSPACE IF NOT EXISTS keyspace1 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1 };", + "CREATE TABLE IF NOT EXISTS keyspace1.standard1 (key blob PRIMARY KEY, \"C0\" blob, \"C1\" blob, \"C2\" blob, \"C3\" blob, \"C4\" blob) WITH cdc = {'enabled': true, 'preimage': false, 'postimage': true, 'ttl': 600}" +] + +stress_cmd: ["cassandra-stress write cl=ONE duration=1m -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=1) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=10 -pop seq=1..10000000 -log interval=5", + ] + +n_loaders: 1 +n_monitor_nodes: 1 +n_db_nodes: 1 + +instance_type_runner: c7.2xlarge + +nemesis_class_name: 'NoOpMonkey' +nemesis_interval: 1 +nemesis_filter_seeds: false + +user_prefix: 'kafka' +use_mgmt: false + +monitor_swap_size: 0 + +docker_network: 'kafka-stack-docker-compose_default' + +kafka_backend: 'localstack' + +kafka_connectors: + - version: 'hub:scylladb/scylla-cdc-source-connector:1.1.2' + name: "QuickstartConnector" + config: + "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector" + "scylla.name": "SCTConnectorNamespace" + "scylla.table.names": 'keyspace1.standard1' diff --git a/unit_tests/test_data/kafka_connectors/kafka-connect-scylladb.yaml b/unit_tests/test_data/kafka_connectors/kafka-connect-scylladb.yaml new file mode 100644 index 00000000000..bd4f828163c --- /dev/null +++ b/unit_tests/test_data/kafka_connectors/kafka-connect-scylladb.yaml @@ -0,0 +1,16 @@ +kafka_backend: localstack + +kafka_connectors: + - version: 'https://github.com/scylladb/kafka-connect-scylladb/releases/download/1.1.1/scylladb-kafka-connect-scylladb-1.1.1.zip' + name: "scylladb-sink-connector-url" + config: + "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector" + "scylladb.keyspace": 'keyspace1' + topics: "topic1" + + - version: 'hub:scylladb/kafka-connect-scylladb:1.1.1' + name: "scylladb-sink-connector" + config: + "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector" + "scylladb.keyspace": 'keyspace1' + topics: "topic1" diff --git a/unit_tests/test_data/kafka_connectors/scylla-cdc-source-connector.yaml b/unit_tests/test_data/kafka_connectors/scylla-cdc-source-connector.yaml new file mode 100644 index 00000000000..14f7fbeabe0 --- /dev/null +++ b/unit_tests/test_data/kafka_connectors/scylla-cdc-source-connector.yaml @@ -0,0 +1,26 @@ +kafka_backend: localstack + +kafka_connectors: + - version: 'https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.1.2/scylla-cdc-source-connector-1.1.2-jar-with-dependencies.jar' + name: "QuickstartConnector-url" + config: + "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector" + "scylla.name": "SCTConnectorNamespace" + "scylla.table.names": 'keyspace1.standard1' + "key.converter": 'org.apache.kafka.connect.json.JsonConverter' + "value.converter": 'org.apache.kafka.connect.json.JsonConverter' + "key.converter.schemas.enable": true + "value.converter.schemas.enable": true + "auto.create.topics.enable": true + + - version: 'hub:scylladb/scylla-cdc-source-connector:1.1.2' + name: "QuickstartConnector" + config: + "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector" + "scylla.name": "SCTConnectorNamespace" + "scylla.table.names": 'keyspace1.standard1' + "key.converter": 'org.apache.kafka.connect.json.JsonConverter' + "value.converter": 'org.apache.kafka.connect.json.JsonConverter' + "key.converter.schemas.enable": true + "value.converter.schemas.enable": true + "auto.create.topics.enable": true diff --git a/unit_tests/test_kafka.py b/unit_tests/test_kafka.py new file mode 100644 index 00000000000..998a1c3c734 --- /dev/null +++ b/unit_tests/test_kafka.py @@ -0,0 +1,82 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright (c) 2023 ScyllaDB +import os +import pytest + +from sdcm.kafka.kafka_cluster import LocalKafkaCluster + +pytestmark = [ + pytest.mark.integration, +] + + +@pytest.fixture(name="kafka_cluster", scope="session") +def fixture_kafka_cluster(tmp_path_factory): + os.environ["_SCT_TEST_LOGDIR"] = str(tmp_path_factory.mktemp("logs")) + kafka = LocalKafkaCluster() + + kafka.start() + + yield kafka + + kafka.stop() + + +@pytest.mark.docker_scylla_args(docker_network="kafka-stack-docker-compose_default") +@pytest.mark.sct_config( + files="unit_tests/test_data/kafka_connectors/scylla-cdc-source-connector.yaml" +) +def test_01_kafka_cdc_source_connector(docker_scylla, kafka_cluster, params): + """ + setup kafka with scylla-cdc-source-connector with docker based scylla node + - from confluent-hub + - from GitHub release url + """ + docker_scylla.run_cqlsh( + "CREATE KEYSPACE IF NOT EXISTS keyspace1 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1 };" + ) + docker_scylla.run_cqlsh( + 'CREATE TABLE IF NOT EXISTS keyspace1.standard1 (key blob PRIMARY KEY, ' + '"C0" blob, "C1" blob, "C2" blob, "C3" blob, "C4" blob) WITH ' + "cdc = {'enabled': true, 'preimage': false, 'postimage': true, 'ttl': 600}" + ) + + docker_scylla.parent_cluster.nodes = [docker_scylla] + for connector_config in params.get("kafka_connectors"): + kafka_cluster.create_connector( + db_cluster=docker_scylla.parent_cluster, connector_config=connector_config + ) + + +@pytest.mark.docker_scylla_args(docker_network="kafka-stack-docker-compose_default") +@pytest.mark.sct_config( + files="unit_tests/test_data/kafka_connectors/kafka-connect-scylladb.yaml" +) +def test_02_kafka_scylla_sink_connector(docker_scylla, kafka_cluster, params): + """ + setup kafka with kafka-connect-scylladb with docker based scylla node + - from confluent-hub + - from GitHub release url + """ + + # docker_scylla.run_cqlsh( + # "CREATE KEYSPACE IF NOT EXISTS keyspace1 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1 };") + # docker_scylla.run_cqlsh( + # "CREATE TABLE IF NOT EXISTS keyspace1.default1 (key blob PRIMARY KEY, C0 blob, C1 blob, C2 blob, C3 blob, C4 blob) WITH " + # "cdc = {'enabled': true, 'preimage': false, 'postimage': true, 'ttl': 600}") + + docker_scylla.parent_cluster.nodes = [docker_scylla] + for connector_config in params.get("kafka_connectors"): + kafka_cluster.create_connector( + db_cluster=docker_scylla.parent_cluster, connector_config=connector_config + )