From 143d5692150fba293afb70845ba6ea7967bb35b3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 03:28:13 +0800 Subject: [PATCH] Squashed commit of the following: commit b3c8e4e4f2ae1607697ec42828c8ac3f1d3e7fb8 Author: xxchan Date: Sat Apr 13 02:39:13 2024 +0800 fix Signed-off-by: xxchan commit a575c3a3e1b2e3ef76008b768d5e03f842942d9c Author: xxchan Date: Sat Apr 13 02:38:15 2024 +0800 fix Signed-off-by: xxchan commit df9f9053f1a21baf893ba05652da476af5b4cd3e Author: xxchan Date: Sat Apr 13 02:26:00 2024 +0800 update Signed-off-by: xxchan commit b8859ca6d575017f43324c956014e20a922529a9 Author: xxchan Date: Sat Apr 13 02:25:14 2024 +0800 fix Signed-off-by: xxchan commit f001e31d2be245b17ecc2f6a4f6100b1a267fd0f Author: xxchan Date: Sat Apr 13 02:23:06 2024 +0800 revert risedev Signed-off-by: xxchan commit 33aa17883e19db0619329b44c439149842e87935 Merge: 530eb8678e c98dfd5bda Author: xxchan Date: Sat Apr 13 02:22:13 2024 +0800 Merge branch 'xxchan/latin-tyrannosaurus' into xxchan/source-test commit 530eb8678e56b64fa7e3c1d2889568aeb3da4633 Merge: 0b6f74c0b8 cf221e3bfd Author: xxchan Date: Sat Apr 13 02:21:42 2024 +0800 Merge branch 'xxchan/latin-tyrannosaurus' into xxchan/source-test Signed-off-by: xxchan commit 0b6f74c0b8611ce651bdc81a12e01da95934e3ba Author: xxchan Date: Sat Apr 13 02:11:40 2024 +0800 sleep more? Signed-off-by: xxchan commit 47c2c61c16e45e33d3012e45f87c1734ab06ed23 Author: xxchan Date: Fri Apr 12 22:30:09 2024 +0800 sleep more Signed-off-by: xxchan commit 1853adf016441409edee8d7eed8756ead2217749 Author: xxchan Date: Fri Apr 12 21:59:31 2024 +0800 fix Signed-off-by: xxchan commit 8711cf5aaea7f617bdddd61255de489ee0f529be Merge: f1c0185070 4f53f8927a Author: xxchan Date: Fri Apr 12 17:52:38 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test Signed-off-by: xxchan commit f1c0185070c129eb0564e967bbbf98eeda312dfe Author: xxchan Date: Fri Apr 12 17:52:12 2024 +0800 fix Signed-off-by: xxchan commit e9e2dc45c9f6def717e42147228abdf03ef96dc3 Author: xxchan Date: Fri Apr 12 16:49:20 2024 +0800 install rpk Signed-off-by: xxchan commit e2f2a8e6923948f620f8ad5a7d6b2b91fd964054 Author: xxchan Date: Fri Apr 12 15:16:44 2024 +0800 f Signed-off-by: xxchan commit bae6495a20a746df0ad2cfc87314ab219069b734 Merge: 58de398dd1 4ac029cde9 Author: xxchan Date: Fri Apr 12 15:11:54 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test commit 58de398dd1418a643654b13e6092af758dd50480 Author: xxchan Date: Fri Apr 12 15:11:49 2024 +0800 fix Signed-off-by: xxchan commit 9ba97282c95faea0b611148164e22af04cd7e0e8 Author: xxchan Date: Thu Apr 11 17:45:24 2024 +0800 fix Signed-off-by: xxchan commit d8a2489b3ffc00ce2e390cf94a6cc107081deaa4 Author: xxchan Date: Thu Apr 11 17:18:38 2024 +0800 fix Signed-off-by: xxchan commit c4b4b162d9a5d6eb1aa0f551a7b2de886773dec9 Author: xxchan Date: Thu Apr 11 17:16:25 2024 +0800 fix Signed-off-by: xxchan commit 6dddbf3aeeac2fb1862be488ab8c19a7e7d67c74 Author: xxchan Date: Thu Apr 11 17:15:00 2024 +0800 rename new to inline Signed-off-by: xxchan commit cf50f5e3d524482c5a2a392667e4ec3663463d72 Author: xxchan Date: Thu Apr 11 17:04:54 2024 +0800 bump Signed-off-by: xxchan commit 9ca9d553c3c42cd6a681cb93d0c8687ce7344942 Author: xxchan Date: Thu Apr 11 16:00:07 2024 +0800 support user-managed kafka in risedev Signed-off-by: xxchan commit d4d405d050aa185c8f0733113fb5c10eaf47df68 Author: xxchan Date: Thu Apr 11 15:27:20 2024 +0800 update Signed-off-by: xxchan commit 6ed6cfcd03c5a2baa73b739541ec6d51f5f68686 Author: xxchan Date: Thu Apr 11 10:21:25 2024 +0800 update commit f7a3dd594d8e3c30862fc2da68b4c06d58d171b2 Merge: 8b6c48203b 254ad0cc74 Author: xxchan Date: Thu Apr 11 09:45:50 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test commit 8b6c48203b8e72be123e921e9b866d5e46801282 Author: xxchan Date: Tue Apr 9 14:12:37 2024 +0800 update commit f2806032819028bc82c5b6315cb9b12952494392 Author: xxchan Date: Fri Apr 5 23:14:30 2024 +0800 add new source tests commit 0e3ace190bf54a72481b67b669fc22d3bbd2807a Author: xxchan Date: Fri Apr 5 23:13:05 2024 +0800 revert unrelated change commit a3f34095cef20ed68e58cf9a5f6526eeeee0a3e3 Author: xxchan Date: Fri Apr 5 17:58:48 2024 +0800 fix commit 0a2ded00a1d3df433a7893884b8cee64a1c67b3a Author: xxchan Date: Fri Apr 5 17:27:45 2024 +0800 fix commit 8daa64d9c717fdee8f76a6c0ee2601b7c1daf494 Author: xxchan Date: Fri Apr 5 17:19:38 2024 +0800 debug commit 8628c1fbae02be8b76f2d2a822b764465d49e706 Author: xxchan Date: Fri Apr 5 17:15:32 2024 +0800 fix commit c8bde2038e928599b0e72daede6eed02cab77035 Author: xxchan Date: Fri Apr 5 17:00:09 2024 +0800 ci: install risedev to ci image Signed-off-by: xxchan --- Makefile.toml | 1 + ci/Dockerfile | 23 ++-- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +- ci/scripts/e2e-source-test.sh | 3 + e2e_test/source/README.md | 4 + e2e_test/source_inline/README.md | 16 +++ e2e_test/source_inline/commands.toml | 108 ++++++++++++++++ .../source_inline/kafka/add_partition.slt | 74 +++++++++++ .../source_inline/kafka/consumer_group.slt | 119 ++++++++++++++++++ risedev.yml | 8 +- scripts/source/prepare_ci_kafka.sh | 6 +- src/risedevtool/src/risedev_env.rs | 6 + src/risedevtool/src/service_config.rs | 2 + .../src/task/ensure_stop_service.rs | 4 + src/risedevtool/src/task/kafka_service.rs | 11 +- .../src/task/task_kafka_ready_check.rs | 8 +- 17 files changed, 384 insertions(+), 21 deletions(-) create mode 100644 e2e_test/source_inline/README.md create mode 100644 e2e_test/source_inline/commands.toml create mode 100644 e2e_test/source_inline/kafka/add_partition.slt create mode 100644 e2e_test/source_inline/kafka/consumer_group.slt diff --git a/Makefile.toml b/Makefile.toml index 604b7b2b4e44b..e9085a870804e 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -15,6 +15,7 @@ extend = [ { path = "src/storage/backup/integration_tests/Makefile.toml" }, { path = "src/java_binding/make-java-binding.toml" }, { path = "src/stream/tests/integration_tests/integration_test.toml" }, + { path = "e2e_test/source_inline/commands.toml" }, ] env_files = ["./risedev-components.user.env"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 78c48f0c648d6..1955538b599d5 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -34,8 +34,10 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-mo ENV PATH /root/.cargo/bin/:$PATH -RUN rustup show -RUN rustup default `rustup show active-toolchain | awk '{print $1}'` +RUN rustup show && \ + rustup default `rustup show active-toolchain | awk '{print $1}'` && \ + rustup component add rustfmt llvm-tools-preview clippy && \ + rustup target add wasm32-wasi RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.29.0/buf-$(uname -s)-$(uname -m).tar.gz" | \ tar -xvzf - -C /usr/local --strip-components 1 @@ -45,10 +47,17 @@ RUN pip3 install pyarrow pytest # Install poetry RUN curl -sSL https://install.python-poetry.org | python3 - - -# add required rustup components -RUN rustup component add rustfmt llvm-tools-preview clippy -RUN rustup target add wasm32-wasi +# Install rpk +RUN if [ "$(uname -m)" = "amd64" ] || [ "$(uname -m)" = "x86_64" ]; then \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip && \ + unzip rpk-linux-amd64.zip -d ~/.local/bin/ && \ + rm rpk-linux-amd64.zip; \ + else \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-arm64.zip && \ + unzip rpk-linux-arm64.zip -d ~/.local/bin/ && \ + rm rpk-linux-arm64.zip; \ + fi +ENV PATH /root/.local/bin:$PATH ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse @@ -56,7 +65,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.19.1 \ + sqllogictest-bin@0.20.0 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 1a23144a8e8fe..532de3cb18313 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240405_1 +export BUILD_ENV_VERSION=v20240412 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 3b99cf1082df8..283ee336113e2 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: db: condition: service_healthy diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8e1ea0fa2b4b2..8f21cf67cd8a3 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -168,6 +168,9 @@ echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' +echo "--- e2e, inline test" +risedev slt './e2e_test/source_inline/**/*.slt' + echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/e2e_test/source/README.md b/e2e_test/source/README.md index b6e9dfa30816f..4152ab3dc9737 100644 --- a/e2e_test/source/README.md +++ b/e2e_test/source/README.md @@ -1,3 +1,7 @@ +> [!NOTE] +> +> Please write new tests according to the style in `e2e_test/source_inline`. + Test in this directory needs some prior setup. See also `ci/scripts/e2e-source-test.sh`, and `scripts/source` diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md new file mode 100644 index 0000000000000..01918d30d5264 --- /dev/null +++ b/e2e_test/source_inline/README.md @@ -0,0 +1,16 @@ +# "Inline" style source e2e tests + +Compared with prior source tests (`e2e_test/source`), tests in this directory are expected to be easy to run locally and easy to write. + +To run locally, use `risdev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). +Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.) +according to the services started by `risdev d`. + +```sh +risedev slt 'e2e_test/source-inline/**/*.slt' +``` + +To write tests, please ensure each file is self-contained and does not depend on external scripts to setup the environment. +Use `system` command to setup instead. + +Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml new file mode 100644 index 0000000000000..aab89f1fc612e --- /dev/null +++ b/e2e_test/source_inline/commands.toml @@ -0,0 +1,108 @@ +# This file contains commands used by the tests. + +[tasks.source-test-hook] +private = true +dependencies = ["check-risedev-env-file"] +env_files = ["${PREFIX_CONFIG}/risedev-env"] + +# Note about the Kafka CLI tooling: +# - Built-in Kafka console tools: +# Java based. +# Style example: kafka-topics.sh --bootstrap-server localhost:9092 --topic t --create +# Some limitations: cannot disable logging easily, cannot consume to end and then exit. +# - kcat: +# C based (rdkafka) +# Some limitations: cannot do admin operations, only consume/produce. +# - rpk: +# Golang based. +# Style example: RPK_BROKERS=localhost:9092 rpk topic create t +[tasks.kafka-hook] +private = true +description = "Check if Kafka is started by RiseDev" +dependencies = ["source-test-hook"] +script = ''' +#!/usr/bin/env sh +set -e + +if [ ! -d "${PREFIX_BIN}/kafka" ]; then + echo "Kafka is not installed in ${PREFIX_BIN}/kafka. Did you enable Kafka using $(tput setaf 4)\`./risedev configure\`$(tput sgr0)?" + exit 1 +fi + +# TODO: we may support risedev-env.override so that we can connect to a Kafka not started by risedev-dev. +if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?" + exit 1 +fi +''' + +[tasks.clean-kafka] +category = "RiseDev - Test - Source Test - Kafka" +description = "Delete all kafka topics." +dependencies = ["kafka-hook"] +command = "rpk" +args = ["topic", "delete", "-r", "*"] + +[tasks.kafka-topics] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-produce] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consume] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consumer-groups] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +# rpk tools +[tasks.rpk] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options +script = """ +#!/usr/bin/env sh +set -e + +if [ -z "$(which rpk)" ]; then + echo "rpk is not installed. Install it via https://docs.redpanda.com/current/get-started/rpk-install/" + exit 1 +fi + +rpk "$@" +""" + +[tasks.redpanda-console] +category = "RiseDev - Test - Source Test - Kafka" +description = "Start Redpanda console (Kafka GUI) at localhost:8080." +dependencies = ["kafka-hook"] +script = ''' +#!/usr/bin/env sh +set -e +echo "$(tput setaf 2)Start Redpanda console at http://localhost:8080$(tput sgr0)" +docker run --network host -e KAFKA_BROKERS=$RPK_BROKERS docker.redpanda.com/redpandadata/console:latest +''' diff --git a/e2e_test/source_inline/kafka/add_partition.slt b/e2e_test/source_inline/kafka/add_partition.slt new file mode 100644 index 0000000000000..9399cf732b973 --- /dev/null +++ b/e2e_test/source_inline/kafka/add_partition.slt @@ -0,0 +1,74 @@ +# Note: control substitution on will force us to use "\\n" instead of "\n" in commands +control substitution on + +system ok +rpk topic create test_add_partition -p 3 + +system ok +cat </dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/.." || exit 1 -KAFKA_BIN="$SCRIPT_PATH/../../.risingwave/bin/kafka/bin" - echo "$SCRIPT_PATH" if [ "$1" == "compress" ]; then @@ -46,10 +44,10 @@ for filename in $kafka_data_files; do # always ok echo "Drop topic $topic" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --delete || true + risedev kafka-topics --topic "$topic" --delete || true echo "Recreate topic $topic with partition $partition" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --create --partitions "$partition") & + risedev kafka-topics --topic "$topic" --create --partitions "$partition") & done wait diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 077f1ce51f82f..1efdf1470998d 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -71,6 +71,12 @@ pub fn generate_risedev_env(services: &Vec) -> String { let port = &c.port; writeln!(env, "RISEDEV_RW_FRONTEND_PORT=\"{port}\"",).unwrap(); } + ServiceConfig::Kafka(c) => { + let brokers = format!("{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_KAFKA_BOOTSTRAP_SERVERS="{brokers}""#,).unwrap(); + writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); + writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); + } _ => {} } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 97996bbeb5101..83e90147b1b68 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -277,6 +277,8 @@ pub struct KafkaConfig { pub provide_zookeeper: Option>, pub persist_data: bool, pub broker_id: u32, + + pub user_managed: bool, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] diff --git a/src/risedevtool/src/task/ensure_stop_service.rs b/src/risedevtool/src/task/ensure_stop_service.rs index 24de2d32e0bb2..62c2aa7de7c3b 100644 --- a/src/risedevtool/src/task/ensure_stop_service.rs +++ b/src/risedevtool/src/task/ensure_stop_service.rs @@ -31,6 +31,10 @@ impl Task for EnsureStopService { ctx.service(self); for (port, service) in &self.ports { + // Do not require stopping kafka services + if service.starts_with("kafka") { + continue; + } let address = format!("127.0.0.1:{}", port); ctx.pb.set_message(format!( diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index 9bbdd3ac5efaf..df0eb4a0fa313 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -74,7 +74,16 @@ impl Task for KafkaService { cmd.arg(config_path); - ctx.run_command(ctx.tmux_run(cmd)?)?; + if !self.config.user_managed { + ctx.run_command(ctx.tmux_run(cmd)?)?; + } else { + ctx.pb.set_message("user managed"); + writeln!( + &mut ctx.log, + "Please start your Kafka at {}:{}\n\n", + self.config.listen_address, self.config.port + )?; + } ctx.pb.set_message("started"); diff --git a/src/risedevtool/src/task/task_kafka_ready_check.rs b/src/risedevtool/src/task/task_kafka_ready_check.rs index ef8823956206c..79838bf8eca66 100644 --- a/src/risedevtool/src/task/task_kafka_ready_check.rs +++ b/src/risedevtool/src/task/task_kafka_ready_check.rs @@ -33,8 +33,12 @@ impl KafkaReadyCheckTask { impl Task for KafkaReadyCheckTask { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { - ctx.pb.set_message("waiting for online..."); - + if self.config.user_managed { + ctx.pb + .set_message("waiting for user-managed service online..."); + } else { + ctx.pb.set_message("waiting for online..."); + } let mut config = ClientConfig::new(); config.set( "bootstrap.servers",