-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature(kafka-localstack): introducing docker-compose base kafka setup
Since we want to be able to run scylla kafka connectors with scylla clusters create by SCT, we are introducing here the first of kafka backend that would be used for local development (with SCT docker backend) * inculde a way to configure the connector as needed (also multi ones) * get it intsall from hub or by url **Note**: this doesn't yet include any code that can read out of kafka
- Loading branch information
Showing
15 changed files
with
449 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
1.59-pygithub | ||
1.60-docker-compose |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
# This program is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as published by | ||
# the Free Software Foundation; either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. | ||
# | ||
# See LICENSE for more details. | ||
# | ||
# Copyright (c) 2023 ScyllaDB | ||
import logging | ||
from pathlib import Path | ||
from functools import cached_property | ||
|
||
import requests | ||
|
||
from sdcm import cluster | ||
from sdcm.wait import wait_for | ||
from sdcm.remote import LOCALRUNNER | ||
from sdcm.utils.git import clone_repo | ||
from sdcm.utils.common import get_sct_root_path | ||
from sdcm.utils.remote_logger import CommandClusterLoggerBase | ||
from sdcm.kafka.kafka_config import SctKafkaConfiguration | ||
|
||
# TODO: write/think more about the consumers | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaLogger(CommandClusterLoggerBase): # pylint: disable=too-few-public-methods | ||
# pylint: disable=invalid-overridden-method | ||
@cached_property | ||
def _logger_cmd(self) -> str: | ||
return f"{self._cluster.compose_context} logs --no-color --tail=1000 >>{self._target_log_file}" | ||
|
||
|
||
class LocalKafkaCluster(cluster.BaseCluster): | ||
def __init__(self, remoter=LOCALRUNNER): | ||
super().__init__(cluster_prefix="kafka", add_nodes=False) | ||
self.remoter = remoter | ||
self.docker_compose_path = ( | ||
Path(get_sct_root_path()) / "kafka-stack-docker-compose" | ||
) | ||
self._journal_thread: KafkaLogger | None = None | ||
self.init_repository() | ||
|
||
def init_repository(self): | ||
# TODO: make the url configurable | ||
# TODO: get the version after install, and send out to Argus | ||
repo_url = "https://github.com/fruch/kafka-stack-docker-compose.git" | ||
branch = 'master' | ||
clone_repo( | ||
remoter=self.remoter, | ||
repo_url=repo_url, | ||
branch=branch, | ||
destination_dir_name=str(self.docker_compose_path), | ||
clone_as_root=False, | ||
) | ||
self.remoter.run(f'mkdir -p {self.docker_compose_path / "connectors"}') | ||
|
||
@property | ||
def compose_context(self): | ||
return f"cd {self.docker_compose_path}; docker compose -f full-stack.yml" | ||
|
||
@property | ||
def kafka_connect_url(self): | ||
return "http://localhost:8083" | ||
|
||
def compose(self, cmd): | ||
self.remoter.run(f"{self.compose_context} {cmd}") | ||
|
||
def start(self): | ||
self.compose("up -d") | ||
self.start_journal_thread() | ||
|
||
def stop(self): | ||
self._journal_thread.stop(timeout=120) | ||
self.compose("down") | ||
|
||
def install_connector(self, connector_version: str): | ||
if connector_version.startswith("hub:"): | ||
self.install_connector_from_hub(connector_version.replace("hub:", "")) | ||
else: | ||
self.install_connector_from_url(connector_version) | ||
|
||
def install_connector_from_hub( | ||
self, connector_version: str = "scylladb/scylla-cdc-source-connector:latest" | ||
): | ||
self.compose( | ||
f"exec kafka-connect confluent-hub install --no-prompt {connector_version}" | ||
) | ||
self.compose("restart kafka-connect") | ||
|
||
def install_connector_from_url(self, connector_url: str): | ||
if connector_url.startswith("http"): | ||
if connector_url.endswith('.jar'): | ||
self.remoter.run( | ||
f'curl -L --create-dirs -O --output-dir {self.docker_compose_path / "connectors"} {connector_url} ' | ||
) | ||
if connector_url.endswith('.zip'): | ||
self.remoter.run( | ||
f'curl -L -o /tmp/connector.zip {connector_url} && ' | ||
f'unzip /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip' | ||
) | ||
if connector_url.startswith("file://"): | ||
connector_local_path = connector_url.replace("file://", "") | ||
if connector_url.endswith('.jar'): | ||
self.remoter.run( | ||
f'cp {connector_local_path} {self.docker_compose_path / "connectors"}' | ||
) | ||
if connector_url.endswith('.zip'): | ||
self.remoter.run( | ||
f'unzip {connector_local_path} -d {self.docker_compose_path / "connectors"}' | ||
) | ||
self.compose("restart kafka-connect") | ||
|
||
# TODO: find release based on 'curl https://api.github.com/repos/scylladb/scylla-cdc-source-connector/releases' | ||
|
||
def create_connector( | ||
self, | ||
db_cluster: cluster.BaseScyllaCluster, | ||
connector_config: SctKafkaConfiguration, | ||
): | ||
# TODO: extend the number of tasks | ||
# TODO: handle user/password | ||
# TODO: handle client encryption SSL | ||
|
||
connector_data = connector_config.dict(by_alias=True, exclude_none=True) | ||
match connector_config.config.connector_class: | ||
case "io.connect.scylladb.ScyllaDbSinkConnector": | ||
scylla_addresses = ",".join( | ||
[node.cql_address for node in db_cluster.nodes] | ||
) | ||
connector_data["config"]["scylladb.contact.points"] = scylla_addresses | ||
case "com.scylladb.cdc.debezium.connector.ScyllaConnector": | ||
scylla_addresses = ",".join( | ||
[f"{node.cql_address}:{node.CQL_PORT}" for node in db_cluster.nodes] | ||
) | ||
connector_data["config"][ | ||
"scylla.cluster.ip.addresses" | ||
] = scylla_addresses | ||
|
||
self.install_connector(connector_config.version) | ||
|
||
def kafka_connect_api_available(): | ||
res = requests.head(url=self.kafka_connect_url) | ||
res.raise_for_status() | ||
return True | ||
|
||
wait_for( | ||
func=kafka_connect_api_available, | ||
step=2, | ||
text="waiting for kafka-connect api", | ||
timeout=120, | ||
throw_exc=True, | ||
) | ||
logger.debug(connector_data) | ||
res = requests.post( | ||
url=f"{self.kafka_connect_url}/connectors", json=connector_data | ||
) | ||
logger.debug(res) | ||
logger.debug(res.text) | ||
res.raise_for_status() | ||
|
||
@property | ||
def kafka_log(self) -> Path: | ||
return Path(self.logdir) / "kafka.log" | ||
|
||
def start_journal_thread(self) -> None: | ||
self._journal_thread = KafkaLogger(self, str(self.kafka_log)) | ||
self._journal_thread.start() | ||
|
||
def add_nodes( | ||
self, | ||
count, | ||
ec2_user_data="", | ||
dc_idx=0, | ||
rack=0, | ||
enable_auto_bootstrap=False, | ||
instance_type=None, | ||
): # pylint: disable=too-many-arguments | ||
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from typing import Optional | ||
|
||
from pydantic import Field, BaseModel, Extra # pylint: disable=no-name-in-module | ||
|
||
# pylint: disable=too-few-public-methods | ||
|
||
|
||
class ConnectorConfiguration(BaseModel): | ||
# general options | ||
connector_class: str = Field(alias="connector.class") | ||
topics: Optional[str] | ||
|
||
# scylla-cdc-source-connector options | ||
# see https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#configuration | ||
# and https://github.com/scylladb/scylla-cdc-source-connector?tab=readme-ov-file#advanced-administration | ||
scylla_name: Optional[str] = Field(alias="scylla.name") | ||
scylla_table_names: Optional[str] = Field(alias="scylla.table.names") | ||
scylla_user: Optional[str] = Field(alias="scylla.user") | ||
scylla_password: Optional[str] = Field(alias="scylla.password") | ||
|
||
# kafka-connect-scylladb | ||
# see https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/CONFIG.md | ||
scylladb_contact_points: Optional[str] = Field(alias="scylladb.contact.points") | ||
scylladb_keyspace: Optional[str] = Field(alias="scylladb.keyspace") | ||
scylladb_user: Optional[str] = Field(alias="scylladb.user") | ||
scylladb_password: Optional[str] = Field(alias="scylladb.password") | ||
|
||
class Config: | ||
extra = Extra.allow | ||
|
||
|
||
class SctKafkaConfiguration(BaseModel): | ||
version: str = Field( | ||
exclude=True | ||
) # url to specific release or hub version ex. 'hub:scylladb/scylla-cdc-source-connector:1.1.2' | ||
name: str # connector name, each one should be named differently | ||
config: ConnectorConfiguration |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.