Skip to content

Commit

Permalink
feature(kafka-localstack): introducing docker-compose base kafka setup
Browse files Browse the repository at this point in the history
Since we want to be able to run scylla kafka connectors with scylla clusters
create by SCT, we are introducing here the first of kafka backend that
would be used for local development (with SCT docker backend)

* inculde a way to configure the connector as needed (also multi ones)
* get it intsall from hub or by url

**Note**: this doesn't yet include any code that can read out of kafka
  • Loading branch information
fruch committed Feb 12, 2024
1 parent ff98c60 commit 7d8333d
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 199 deletions.
3 changes: 3 additions & 0 deletions defaults/test_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,6 @@ validate_large_collections: false
scylla_d_overrides_files: []

run_commit_log_check_thread: true

kafka_backend: none
kafka_connectors: []
3 changes: 2 additions & 1 deletion docker/env/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
wget \
psmisc \
procps \
docker-ce-cli && \
docker-ce-cli \
docker-compose-plugin && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN curl -fsSLo /usr/local/bin/kubectl https://dl.k8s.io/release/v$KUBECTL_VERSION/bin/linux/amd64/kubectl && \
Expand Down
2 changes: 1 addition & 1 deletion docker/env/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.58-jinja2-3.1.3
1.59-docker-compose
2 changes: 2 additions & 0 deletions longevity_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def test_custom_time(self):
self.run_pre_create_keyspace()
self.run_pre_create_schema()

self.kafka_configure()

if scan_operation_params := self._get_scan_operation_params():
for scan_param in scan_operation_params:
self.log.info("Starting fullscan operation thread with the following params: %s", scan_param)
Expand Down
388 changes: 194 additions & 194 deletions requirements.txt

Large diffs are not rendered by default.

Empty file added sdcm/kafka/__init__.py
Empty file.
184 changes: 184 additions & 0 deletions sdcm/kafka/kafka_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2023 ScyllaDB
import logging
from pathlib import Path
from functools import cached_property

import requests

from sdcm import cluster
from sdcm.wait import wait_for
from sdcm.remote import LOCALRUNNER
from sdcm.utils.git import clone_repo
from sdcm.utils.common import get_sct_root_path
from sdcm.utils.remote_logger import CommandClusterLoggerBase
from sdcm.kafka.kafka_config import SctKafkaConfiguration

# TODO: write/think more about the consumers

logger = logging.getLogger(__name__)


class KafkaLogger(CommandClusterLoggerBase): # pylint: disable=too-few-public-methods
# pylint: disable=invalid-overridden-method
@cached_property
def _logger_cmd(self) -> str:
return f"{self._cluster.compose_context} logs --no-color --tail=1000 >>{self._target_log_file}"


class LocalKafkaCluster(cluster.BaseCluster):
def __init__(self, remoter=LOCALRUNNER):
super().__init__(cluster_prefix="kafka", add_nodes=False)
self.remoter = remoter
self.docker_compose_path = (
Path(get_sct_root_path()) / "kafka-stack-docker-compose"
)
self._journal_thread: KafkaLogger | None = None
self.init_repository()

def init_repository(self):
# TODO: make the url configurable
# TODO: get the version after install, and send out to Argus
repo_url = "[email protected]:fruch/kafka-stack-docker-compose.git"
branch = 'master'
clone_repo(
remoter=self.remoter,
repo_url=repo_url,
branch=branch,
destination_dir_name=str(self.docker_compose_path),
clone_as_root=False,
)
self.remoter.run(f'mkdir -p {self.docker_compose_path / "connectors"}')

@property
def compose_context(self):
return f"cd {self.docker_compose_path}; docker compose -f full-stack.yml"

@property
def kafka_connect_url(self):
return "http://localhost:8083"

def compose(self, cmd):
self.remoter.run(f"{self.compose_context} {cmd}")

def start(self):
self.compose("up -d")
self.start_journal_thread()

def stop(self):
self._journal_thread.stop(timeout=120)
self.compose("down")

def install_connector(self, connector_version: str):
if connector_version.startswith("hub:"):
self.install_connector_from_hub(connector_version.replace("hub:", ""))
else:
self.install_connector_from_url(connector_version)

def install_connector_from_hub(
self, connector_version: str = "scylladb/scylla-cdc-source-connector:latest"
):
self.compose(
f"exec kafka-connect confluent-hub install --no-prompt {connector_version}"
)
self.compose("restart kafka-connect")

def install_connector_from_url(self, connector_url: str):
if connector_url.startswith("http"):
if connector_url.endswith('.jar'):
self.remoter.run(
f'curl -L --create-dirs -O --output-dir {self.docker_compose_path / "connectors"} {connector_url} '
)
if connector_url.endswith('.zip'):
self.remoter.run(
f'curl -L -o /tmp/connector.zip {connector_url} && '
f'unzip /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip'
)
if connector_url.startswith("file://"):
connector_local_path = connector_url.replace("file://", "")
if connector_url.endswith('.jar'):
self.remoter.run(
f'cp {connector_local_path} {self.docker_compose_path / "connectors"}'
)
if connector_url.endswith('.zip'):
self.remoter.run(
f'unzip {connector_local_path} -d {self.docker_compose_path / "connectors"}'
)
self.compose("restart kafka-connect")

# TODO: find release based on 'curl https://api.github.com/repos/scylladb/scylla-cdc-source-connector/releases'

def create_connector(
self,
db_cluster: cluster.BaseScyllaCluster,
connector_config: SctKafkaConfiguration,
):
# TODO: extend the number of tasks
# TODO: handle user/password
# TODO: handle client encryption SSL

connector_data = connector_config.dict(by_alias=True, exclude_none=True)
match connector_config.config.connector_class:
case "io.connect.scylladb.ScyllaDbSinkConnector":
scylla_addresses = ",".join(
[node.cql_address for node in db_cluster.nodes]
)
connector_data["config"]["scylladb.contact.points"] = scylla_addresses
case "com.scylladb.cdc.debezium.connector.ScyllaConnector":
scylla_addresses = ",".join(
[f"{node.cql_address}:{node.CQL_PORT}" for node in db_cluster.nodes]
)
connector_data["config"][
"scylla.cluster.ip.addresses"
] = scylla_addresses

self.install_connector(connector_config.version)

def kafka_connect_api_available():
res = requests.head(url=self.kafka_connect_url)
res.raise_for_status()
return True

wait_for(
func=kafka_connect_api_available,
step=2,
text="waiting for kafka-connect api",
timeout=120,
throw_exc=True,
)
logger.debug(connector_data)
res = requests.post(
url=f"{self.kafka_connect_url}/connectors", json=connector_data
)
logger.debug(res)
logger.debug(res.text)
res.raise_for_status()

@property
def kafka_log(self) -> Path:
return Path(self.logdir) / "kafka.log"

def start_journal_thread(self) -> None:
self._journal_thread = KafkaLogger(self, str(self.kafka_log))
self._journal_thread.start()

def add_nodes(
self,
count,
ec2_user_data="",
dc_idx=0,
rack=0,
enable_auto_bootstrap=False,
instance_type=None,
): # pylint: disable=too-many-arguments
raise NotImplementedError
37 changes: 37 additions & 0 deletions sdcm/kafka/kafka_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Optional

from pydantic import Field, BaseModel, Extra # pylint: disable=no-name-in-module

# pylint: disable=too-few-public-methods


class ConnectorConfiguration(BaseModel):
# general options
connector_class: str = Field(alias="connector.class")
topics: Optional[str]

# scylla-cdc-source-connector options
# see https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#configuration
# and https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#advanced-administration
scylla_name: Optional[str] = Field(alias="scylla.name")
scylla_table_names: Optional[str] = Field(alias="scylla.table.names")
scylla_user: Optional[str] = Field(alias="scylla.user")
scylla_password: Optional[str] = Field(alias="scylla.password")

# kafka-connect-scylladb
# see https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/CONFIG.md
scylladb_contact_points: Optional[str] = Field(alias="scylladb.contact.points")
scylladb_keyspace: Optional[str] = Field(alias="scylladb.keyspace")
scylladb_user: Optional[str] = Field(alias="scylladb.user")
scylladb_password: Optional[str] = Field(alias="scylladb.password")

class Config:
extra = Extra.allow


class SctKafkaConfiguration(BaseModel):
version: str = Field(
exclude=True
) # url to specific release or hub version ex. 'hub:scylladb/scylla-cdc-source-connector:1.1.2'
name: str # connector name, each one should be named differently
config: ConnectorConfiguration
23 changes: 22 additions & 1 deletion sdcm/sct_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import getpass
import pathlib
import tempfile
from copy import deepcopy
from typing import List, Union, Set

from distutils.util import strtobool
Expand Down Expand Up @@ -56,6 +57,7 @@
from sdcm.sct_events.base import add_severity_limit_rules, print_critical_events
from sdcm.utils.gce_utils import get_gce_image_tags
from sdcm.remote import LOCALRUNNER, shell_script_cmd
from sdcm.kafka.kafka_config import SctKafkaConfiguration


def _str(value: str) -> str:
Expand Down Expand Up @@ -1523,6 +1525,12 @@ class SCTConfiguration(dict):
dict(name="run_commit_log_check_thread", env="SCT_RUN_COMMIT_LOG_CHECK_THREAD", type=boolean,
help="""Run commit log check thread if commitlog_use_hard_size_limit is True"""),

dict(name="kafka_backend", env="SCT_KAFKA_BACKEND", type=str,
help="Enable validation for large cells in system table and logs",
choices=("none", "localstack", "vm", "msk")),

dict(name="kafka_connectors", env="SCT_KAFKA_CONNECTORS", type=str_or_list_or_eval,
help="configuration for setup up kafka connectors"),
]

required_params = ['cluster_backend', 'test_duration', 'n_db_nodes', 'n_loaders', 'use_preinstalled_scylla',
Expand Down Expand Up @@ -1909,6 +1917,11 @@ def __init__(self):
if len(nics) > 1 and len(self.region_names) >= 2:
raise ValueError("Multiple network interfaces aren't supported for multi region use cases")

# 18: validate kafka configuration
if kafka_connectors := self.get('kafka_connectors'):
self['kafka_connectors'] = [SctKafkaConfiguration(**connector)
for connector in kafka_connectors]

def log_config(self):
self.log.info(self.dump_config())

Expand Down Expand Up @@ -2415,7 +2428,15 @@ def dump_config(self):
:return: str
"""
return anyconfig.dumps(self, ac_parser="yaml")
out = deepcopy(self)

# handle pydantic object, and convert them back to dicts
# TODO: automate the process if we gonna keep using them more, or replace the whole configuration with pydantic/dataclasses
if kafka_connectors := self.get('kafka_connectors'):
out['kafka_connectors'] = [connector.dict(by_alias=True, exclude_none=True)
for connector in kafka_connectors]

return anyconfig.dumps(out, ac_parser="yaml")

def dump_help_config_markdown(self):
"""
Expand Down
21 changes: 21 additions & 0 deletions sdcm/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from sdcm.cluster_k8s import mini_k8s, gke, eks
from sdcm.cluster_k8s.eks import MonitorSetEKS
from sdcm.cql_stress_cassandra_stress_thread import CqlStressCassandraStressThread
from sdcm.kafka.kafka_cluster import LocalKafkaCluster
from sdcm.provision.azure.provisioner import AzureProvisioner
from sdcm.provision.network_configuration import ssh_connection_ip_type
from sdcm.provision.provisioner import provisioner_factory
Expand Down Expand Up @@ -840,6 +841,12 @@ def prepare_kms_host(self) -> None:
self.params["append_scylla_yaml"] = yaml.safe_dump(append_scylla_yaml)
return None

def kafka_configure(self):
if self.kafka_cluster:
for connector_config in self.params.get('kafka_connectors'):
self.kafka_cluster.create_connector(db_cluster=self.db_cluster,
connector_config=connector_config)

@teardown_on_exception
@log_run_info
def setUp(self): # pylint: disable=too-many-branches,too-many-statements
Expand All @@ -856,6 +863,7 @@ def setUp(self): # pylint: disable=too-many-branches,too-many-statements
self.loaders = None
self.monitors = None
self.siren_manager = None
self.kafka_cluster = None
self.k8s_clusters = []
self.connections = []
make_threads_be_daemonic_by_default()
Expand Down Expand Up @@ -1554,6 +1562,14 @@ def get_cluster_k8s_local_kind_cluster(self):
else:
self.monitors = NoMonitorSet()

def get_cluster_kafka(self):
if kafka_backend := self.params.get('kafka_backend'):
if kafka_backend == 'localstack':
self.kafka_cluster = LocalKafkaCluster()
self.kafka_cluster.start()
else:
raise NotImplementedError(f"{kafka_backend=} not implemented")

@staticmethod
def _add_and_wait_for_cluster_nodes_in_parallel(clusters):
def _add_and_wait_for_cluster_nodes(cluster):
Expand Down Expand Up @@ -1779,6 +1795,8 @@ def init_resources(self, loader_info=None, db_info=None,
if cluster_backend is None:
cluster_backend = 'aws'

self.get_cluster_kafka()

if cluster_backend in ('aws', 'aws-siren'):
self.get_cluster_aws(loader_info=loader_info, db_info=db_info,
monitor_info=monitor_info)
Expand Down Expand Up @@ -2795,6 +2813,9 @@ def tearDown(self):
k8s_cluster.gather_k8s_logs_by_operator()
k8s_cluster.gather_k8s_logs()

if self.kafka_cluster:
with silence(parent=self, name='stopping kafka'):
self.kafka_cluster.stop()
if self.params.get('collect_logs'):
self.collect_logs()
self.clean_resources()
Expand Down
5 changes: 3 additions & 2 deletions sdcm/utils/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ def get_git_status_info() -> GitStatus:
return git_status


def clone_repo(remoter, repo_url: str, destination_dir_name: str = "", clone_as_root=True):
def clone_repo(remoter, repo_url: str, destination_dir_name: str = "", clone_as_root=True, branch=None):
# pylint: disable=broad-except
try:
LOGGER.debug("Cloning from %s...", repo_url)
rm_cmd = f"rm -rf ./{repo_url.split('/')[-1].split('.')[0]}"
remoter.sudo(rm_cmd, ignore_status=False)
clone_cmd = f"git clone {repo_url} {destination_dir_name}"
branch = f'--branch {branch}' if branch else ''
clone_cmd = f"git clone {branch} {repo_url} {destination_dir_name}"
if clone_as_root:
remoter.sudo(clone_cmd)
else:
Expand Down
Loading

0 comments on commit 7d8333d

Please sign in to comment.