From 4b5efa2785d0b430669b2e14a53963ae9deee589 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 18 Apr 2024 13:40:05 +0800 Subject: [PATCH] test: add Kafka consumer-group, add-partition tests (inline style) (#16244) Signed-off-by: xxchan --- Cargo.lock | 4 +- Makefile.toml | 3 +- ci/Dockerfile | 21 +++- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +- ci/scripts/e2e-source-test.sh | 5 +- e2e_test/source/README.md | 4 + e2e_test/source_inline/README.md | 30 +++++ e2e_test/source_inline/commands.toml | 107 ++++++++++++++++++ .../source_inline/kafka/add_partition.slt | 74 ++++++++++++ .../source_inline/kafka/consumer_group.mjs | 88 ++++++++++++++ .../source_inline/kafka/consumer_group.slt | 101 +++++++++++++++++ risedev.yml | 8 +- scripts/source/prepare_ci_kafka.sh | 6 +- src/risedevtool/src/bin/risedev-dev.rs | 24 +--- src/risedevtool/src/risedev_env.rs | 6 + src/risedevtool/src/service_config.rs | 46 ++++++++ .../src/task/ensure_stop_service.rs | 13 ++- src/risedevtool/src/task/kafka_service.rs | 11 +- .../src/task/task_kafka_ready_check.rs | 8 +- 20 files changed, 526 insertions(+), 45 deletions(-) create mode 100644 e2e_test/source_inline/README.md create mode 100644 e2e_test/source_inline/commands.toml create mode 100644 e2e_test/source_inline/kafka/add_partition.slt create mode 100755 e2e_test/source_inline/kafka/consumer_group.mjs create mode 100644 e2e_test/source_inline/kafka/consumer_group.slt diff --git a/Cargo.lock b/Cargo.lock index 16350db6b46e4..25b9f0324f7e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12079,9 +12079,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8518892e5e36bfa90163e53c4e4f36a388e0afa1cd6a3de0614253b3c9029c7" +checksum = "7e7c6a33098cd55e4fead1bd1f85c1d2064f02bafdb9fe004ca39fd94aee36e6" dependencies = [ "async-trait", "educe 0.4.23", diff --git a/Makefile.toml b/Makefile.toml index 604b7b2b4e44b..f95ed99e5ec43 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -15,6 +15,7 @@ extend = [ { path = "src/storage/backup/integration_tests/Makefile.toml" }, { path = "src/java_binding/make-java-binding.toml" }, { path = "src/stream/tests/integration_tests/integration_test.toml" }, + { path = "e2e_test/source_inline/commands.toml" }, ] env_files = ["./risedev-components.user.env"] @@ -1292,7 +1293,7 @@ echo "All processes has exited." [tasks.slt] env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev" } category = "RiseDev - Test - SQLLogicTest" -install_crate = { version = "0.20.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-risedev-env-file"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 5902b612c071c..cffa1a026be3e 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -18,6 +18,15 @@ RUN apt-get update -yy && \ && rm -rf /var/lib/{apt,dpkg,cache,log}/ ENV PYO3_PYTHON=python3.12 +# Install nvm and zx +ENV NVM_DIR /root/.nvm +ENV NODE_VERSION 20.11.1 +RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash \ + && . $NVM_DIR/nvm.sh \ + && nvm install $NODE_VERSION +ENV PATH $NVM_DIR/versions/node/v$NODE_VERSION/bin:$PATH +RUN npm install -g zx + SHELL ["/bin/bash", "-c"] RUN mkdir -p /risingwave @@ -43,6 +52,16 @@ RUN pip3 install --break-system-packages pyarrow pytest # Install poetry RUN curl -sSL https://install.python-poetry.org | python3 - +# Install rpk +RUN if [ "$(uname -m)" = "amd64" ] || [ "$(uname -m)" = "x86_64" ]; then \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip && \ + unzip rpk-linux-amd64.zip -d ~/.local/bin/ && \ + rm rpk-linux-amd64.zip; \ + else \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-arm64.zip && \ + unzip rpk-linux-arm64.zip -d ~/.local/bin/ && \ + rm rpk-linux-arm64.zip; \ + fi ENV PATH /root/.local/bin:$PATH ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse @@ -51,7 +70,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.19.1 \ + sqllogictest-bin@0.20.1 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 074f60aa97e06..1ec12359d896c 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240413 +export BUILD_ENV_VERSION=v20240414_x export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 1d67188b1c4fb..c754dcc174ed1 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: db: condition: service_healthy diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 851fca303ab19..8a683f56b8550 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -152,7 +152,7 @@ risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-pubsub +risedev ci-start ci-pubsub-kafka ./scripts/source/prepare_ci_kafka.sh cargo run --bin prepare_ci_pubsub risedev slt './e2e_test/source/basic/*.slt' @@ -168,6 +168,9 @@ echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' +echo "--- e2e, inline test" +risedev slt './e2e_test/source_inline/**/*.slt' + echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/e2e_test/source/README.md b/e2e_test/source/README.md index b6e9dfa30816f..4152ab3dc9737 100644 --- a/e2e_test/source/README.md +++ b/e2e_test/source/README.md @@ -1,3 +1,7 @@ +> [!NOTE] +> +> Please write new tests according to the style in `e2e_test/source_inline`. + Test in this directory needs some prior setup. See also `ci/scripts/e2e-source-test.sh`, and `scripts/source` diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md new file mode 100644 index 0000000000000..3a9070639b8cb --- /dev/null +++ b/e2e_test/source_inline/README.md @@ -0,0 +1,30 @@ +# "Inline" style source e2e tests + +Compared with prior source tests ( `e2e_test/source` ), tests in this directory are expected to be easy to run locally and easy to write. + +Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. + +## Install Dependencies + +Some additional tools are needed to run the `system` commands in tests. + +- `rpk`: Redpanda (Kafka) CLI toolbox. https://docs.redpanda.com/current/get-started/rpk-install/ +- `zx`: A tool for writing better scripts. `npm install -g zx` + +## Run tests + +To run locally, use `risedev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). +Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.) +according to the services started by `risedev d` . + +```sh +risedev slt 'e2e_test/source_inline/**/*.slt' +``` + +## Write tests + +To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. + +Use `system` command to setup instead. +For simple cases, you can directly write a bash command; +For more complex cases, you can write a test script (with any language like bash, python, zx), and invoke it in the `system` command. diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml new file mode 100644 index 0000000000000..8af865099ac7c --- /dev/null +++ b/e2e_test/source_inline/commands.toml @@ -0,0 +1,107 @@ +# This file contains commands used by the tests. + +[tasks.source-test-hook] +private = true +dependencies = ["check-risedev-env-file"] +env_files = ["${PREFIX_CONFIG}/risedev-env"] + +# Note about the Kafka CLI tooling: +# - Built-in Kafka console tools: +# Java based. +# Style example: kafka-topics.sh --bootstrap-server localhost:9092 --topic t --create +# Some limitations: cannot disable logging easily, cannot consume to end and then exit. +# - kcat: +# C based (rdkafka) +# Some limitations: cannot do admin operations, only consume/produce. +# - rpk: +# Golang based. +# Style example: RPK_BROKERS=localhost:9092 rpk topic create t +[tasks.kafka-hook] +private = true +description = "Check if Kafka is started by RiseDev" +dependencies = ["source-test-hook"] +script = ''' +#!/usr/bin/env sh +set -e + +if [ ! -d "${PREFIX_BIN}/kafka" ]; then + echo "Kafka is not installed in ${PREFIX_BIN}/kafka. Did you enable Kafka using $(tput setaf 4)\`./risedev configure\`$(tput sgr0)?" + exit 1 +fi + +if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?" + exit 1 +fi +''' + +[tasks.clean-kafka] +category = "RiseDev - Test - Source Test - Kafka" +description = "Delete all kafka topics." +dependencies = ["kafka-hook"] +command = "rpk" +args = ["topic", "delete", "-r", "*"] + +[tasks.kafka-topics] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-produce] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consume] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consumer-groups] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +# rpk tools +[tasks.rpk] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options +script = """ +#!/usr/bin/env sh +set -e + +if [ -z "$(which rpk)" ]; then + echo "rpk is not installed. Install it via https://docs.redpanda.com/current/get-started/rpk-install/" + exit 1 +fi + +rpk "$@" +""" + +[tasks.redpanda-console] +category = "RiseDev - Test - Source Test - Kafka" +description = "Start Redpanda console (Kafka GUI) at localhost:8080." +dependencies = ["kafka-hook"] +script = ''' +#!/usr/bin/env sh +set -e +echo "$(tput setaf 2)Start Redpanda console at http://localhost:8080$(tput sgr0)" +docker run --network host -e KAFKA_BROKERS=$RPK_BROKERS docker.redpanda.com/redpandadata/console:latest +''' diff --git a/e2e_test/source_inline/kafka/add_partition.slt b/e2e_test/source_inline/kafka/add_partition.slt new file mode 100644 index 0000000000000..9399cf732b973 --- /dev/null +++ b/e2e_test/source_inline/kafka/add_partition.slt @@ -0,0 +1,74 @@ +# Note: control substitution on will force us to use "\\n" instead of "\n" in commands +control substitution on + +system ok +rpk topic create test_add_partition -p 3 + +system ok +cat < { + const [_broker_id, group_name] = line.split(/\s+/); + return group_name; + }) + .filter((group_name) => { + return group_name.startsWith(`rw-consumer-${fragment_id}`); + }); +} + +async function describe_consumer_group(group_name) { + const res = await $`rpk group describe -s ${group_name}`; + // GROUP rw-consumer-1-1 + // COORDINATOR 0 + // STATE Empty + // BALANCER + // MEMBERS 0 + // TOTAL-LAG 2 + const obj = {}; + for (const line of res.toString().trim().split("\n")) { + const [key, value] = line.split(/\s+/); + obj[key] = value; + } + return obj; +} + +async function list_consumer_group_members(fragment_id) { + const groups = await list_consumer_groups(fragment_id); + return Promise.all( + groups.map(async (group_name) => { + return (await describe_consumer_group(group_name))["MEMBERS"] + }) + ); +} + +async function list_consumer_group_lags(fragment_id) { + const groups = await list_consumer_groups(fragment_id); + return Promise.all( + groups.map(async (group_name) => { + return (await describe_consumer_group(group_name))["TOTAL-LAG"] + }) + ); +} + +const fragment_id = await get_fragment_id_of_mv(mv); +if (command == "list-groups") { + echo`${(await list_consumer_groups(fragment_id))}`; +} else if (command == "list-members") { + echo`${await list_consumer_group_members(fragment_id)}`; +} else if (command == "list-lags") { + echo`${await list_consumer_group_lags(fragment_id)}`; +} else { + throw new Error(`Invalid command: ${command}`); +} diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt new file mode 100644 index 0000000000000..ed97dec558f35 --- /dev/null +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -0,0 +1,101 @@ +# Note: control substitution on will force us to use "\\n" instead of "\n" in commands +control substitution on + +# Note either `./risedev rpk` or `rpk` is ok here. +# risedev-env contains env var RPK_BROKERS, which is read by rpk +system ok +rpk topic create test_consumer_group -p 3 + +system ok +cat </dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/.." || exit 1 -KAFKA_BIN="$SCRIPT_PATH/../../.risingwave/bin/kafka/bin" - echo "$SCRIPT_PATH" if [ "$1" == "compress" ]; then @@ -39,10 +37,10 @@ for filename in $kafka_data_files; do # always ok echo "Drop topic $topic" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --delete || true + risedev kafka-topics --topic "$topic" --delete || true echo "Recreate topic $topic with partition $partition" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --create --partitions "$partition") & + risedev kafka-topics --topic "$topic" --create --partitions "$partition") & done wait diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 1e4df4b43a80e..d5a14cbf8c1bd 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -98,28 +98,8 @@ fn task_main( let mut ports = vec![]; for service in services { - let listen_info = match service { - ServiceConfig::Minio(c) => Some((c.port, c.id.clone())), - ServiceConfig::Etcd(c) => Some((c.port, c.id.clone())), - ServiceConfig::Sqlite(_) => None, - ServiceConfig::Prometheus(c) => Some((c.port, c.id.clone())), - ServiceConfig::ComputeNode(c) => Some((c.port, c.id.clone())), - ServiceConfig::MetaNode(c) => Some((c.port, c.id.clone())), - ServiceConfig::Frontend(c) => Some((c.port, c.id.clone())), - ServiceConfig::Compactor(c) => Some((c.port, c.id.clone())), - ServiceConfig::Grafana(c) => Some((c.port, c.id.clone())), - ServiceConfig::Tempo(c) => Some((c.port, c.id.clone())), - ServiceConfig::Kafka(c) => Some((c.port, c.id.clone())), - ServiceConfig::Pubsub(c) => Some((c.port, c.id.clone())), - ServiceConfig::Redis(c) => Some((c.port, c.id.clone())), - ServiceConfig::ZooKeeper(c) => Some((c.port, c.id.clone())), - ServiceConfig::AwsS3(_) => None, - ServiceConfig::Opendal(_) => None, - ServiceConfig::RedPanda(_) => None, - }; - - if let Some(x) = listen_info { - ports.push(x); + if let Some(port) = service.port() { + ports.push((port, service.id().to_string(), service.user_managed())); } } diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 077f1ce51f82f..1efdf1470998d 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -71,6 +71,12 @@ pub fn generate_risedev_env(services: &Vec) -> String { let port = &c.port; writeln!(env, "RISEDEV_RW_FRONTEND_PORT=\"{port}\"",).unwrap(); } + ServiceConfig::Kafka(c) => { + let brokers = format!("{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_KAFKA_BOOTSTRAP_SERVERS="{brokers}""#,).unwrap(); + writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); + writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); + } _ => {} } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 97996bbeb5101..e5f149b8d10cd 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -277,6 +277,8 @@ pub struct KafkaConfig { pub provide_zookeeper: Option>, pub persist_data: bool, pub broker_id: u32, + + pub user_managed: bool, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -378,6 +380,50 @@ impl ServiceConfig { Self::Opendal(c) => &c.id, } } + + pub fn port(&self) -> Option { + match self { + Self::ComputeNode(c) => Some(c.port), + Self::MetaNode(c) => Some(c.port), + Self::Frontend(c) => Some(c.port), + Self::Compactor(c) => Some(c.port), + Self::Minio(c) => Some(c.port), + Self::Etcd(c) => Some(c.port), + Self::Sqlite(_) => None, + Self::Prometheus(c) => Some(c.port), + Self::Grafana(c) => Some(c.port), + Self::Tempo(c) => Some(c.port), + Self::AwsS3(_) => None, + Self::ZooKeeper(c) => Some(c.port), + Self::Kafka(c) => Some(c.port), + Self::Pubsub(c) => Some(c.port), + Self::Redis(c) => Some(c.port), + Self::RedPanda(_c) => None, + Self::Opendal(_) => None, + } + } + + pub fn user_managed(&self) -> bool { + match self { + Self::ComputeNode(c) => c.user_managed, + Self::MetaNode(c) => c.user_managed, + Self::Frontend(c) => c.user_managed, + Self::Compactor(c) => c.user_managed, + Self::Minio(_c) => false, + Self::Etcd(_c) => false, + Self::Sqlite(_c) => false, + Self::Prometheus(_c) => false, + Self::Grafana(_c) => false, + Self::Tempo(_c) => false, + Self::AwsS3(_c) => false, + Self::ZooKeeper(_c) => false, + Self::Kafka(c) => c.user_managed, + Self::Pubsub(_c) => false, + Self::Redis(_c) => false, + Self::RedPanda(_c) => false, + Self::Opendal(_c) => false, + } + } } mod string { diff --git a/src/risedevtool/src/task/ensure_stop_service.rs b/src/risedevtool/src/task/ensure_stop_service.rs index 24de2d32e0bb2..519804d11688c 100644 --- a/src/risedevtool/src/task/ensure_stop_service.rs +++ b/src/risedevtool/src/task/ensure_stop_service.rs @@ -17,11 +17,12 @@ use anyhow::Result; use super::{ExecuteContext, Task}; pub struct EnsureStopService { - ports: Vec<(u16, String)>, + /// `(port, id, user_managed)` + ports: Vec<(u16, String, bool)>, } impl EnsureStopService { - pub fn new(ports: Vec<(u16, String)>) -> Result { + pub fn new(ports: Vec<(u16, String, bool)>) -> Result { Ok(Self { ports }) } } @@ -30,12 +31,16 @@ impl Task for EnsureStopService { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { ctx.service(self); - for (port, service) in &self.ports { + for (port, service_id, user_managed) in &self.ports { + // Do not require stopping user-managed services + if *user_managed { + continue; + } let address = format!("127.0.0.1:{}", port); ctx.pb.set_message(format!( "waiting for port close - {} (will be used by {})", - address, service + address, service_id )); ctx.wait_tcp_close(&address)?; } diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index 9bbdd3ac5efaf..df0eb4a0fa313 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -74,7 +74,16 @@ impl Task for KafkaService { cmd.arg(config_path); - ctx.run_command(ctx.tmux_run(cmd)?)?; + if !self.config.user_managed { + ctx.run_command(ctx.tmux_run(cmd)?)?; + } else { + ctx.pb.set_message("user managed"); + writeln!( + &mut ctx.log, + "Please start your Kafka at {}:{}\n\n", + self.config.listen_address, self.config.port + )?; + } ctx.pb.set_message("started"); diff --git a/src/risedevtool/src/task/task_kafka_ready_check.rs b/src/risedevtool/src/task/task_kafka_ready_check.rs index ef8823956206c..79838bf8eca66 100644 --- a/src/risedevtool/src/task/task_kafka_ready_check.rs +++ b/src/risedevtool/src/task/task_kafka_ready_check.rs @@ -33,8 +33,12 @@ impl KafkaReadyCheckTask { impl Task for KafkaReadyCheckTask { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { - ctx.pb.set_message("waiting for online..."); - + if self.config.user_managed { + ctx.pb + .set_message("waiting for user-managed service online..."); + } else { + ctx.pb.set_message("waiting for online..."); + } let mut config = ClientConfig::new(); config.set( "bootstrap.servers",