Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add Kafka consumer-group, add-partition tests (inline style) #16244

Merged
merged 30 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extend = [
{ path = "src/storage/backup/integration_tests/Makefile.toml" },
{ path = "src/java_binding/make-java-binding.toml" },
{ path = "src/stream/tests/integration_tests/integration_test.toml" },
{ path = "e2e_test/source_inline/commands.toml" },
]

env_files = ["./risedev-components.user.env"]
Expand Down
23 changes: 16 additions & 7 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-mo

ENV PATH /root/.cargo/bin/:$PATH

RUN rustup show
RUN rustup default `rustup show active-toolchain | awk '{print $1}'`
RUN rustup show && \
rustup default `rustup show active-toolchain | awk '{print $1}'` && \
rustup component add rustfmt llvm-tools-preview clippy && \
rustup target add wasm32-wasi

RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.29.0/buf-$(uname -s)-$(uname -m).tar.gz" | \
tar -xvzf - -C /usr/local --strip-components 1
Expand All @@ -45,18 +47,25 @@ RUN pip3 install pyarrow pytest

# Install poetry
RUN curl -sSL https://install.python-poetry.org | python3 -

# add required rustup components
RUN rustup component add rustfmt llvm-tools-preview clippy
RUN rustup target add wasm32-wasi
# Install rpk
RUN if [ "$(uname -m)" = "amd64" ] || [ "$(uname -m)" = "x86_64" ]; then \
curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip && \
unzip rpk-linux-amd64.zip -d ~/.local/bin/ && \
rm rpk-linux-amd64.zip; \
else \
curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-arm64.zip && \
unzip rpk-linux-arm64.zip -d ~/.local/bin/ && \
rm rpk-linux-arm64.zip; \
fi
ENV PATH /root/.local/bin:$PATH

ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse

# install build tools
RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \
[email protected] \
sqllogictest-bin@0.19.1 \
sqllogictest-bin@0.20.0 \
[email protected] \
&& cargo cache -a \
&& rm -rf "/root/.cargo/registry/index" \
Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240405_1
export BUILD_ENV_VERSION=v20240412

export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412
depends_on:
- mysql
- db
Expand All @@ -84,7 +84,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412
depends_on:
- mysql
- db
Expand All @@ -103,12 +103,12 @@ services:


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -119,7 +119,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412
depends_on:
db:
condition: service_healthy
Expand Down
8 changes: 7 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ while getopts 'p:' opt; do
done
shift $((OPTIND -1))

apt install -y nodejs npm
npm i -g npx

download_and_prepare_rw "$profile" source

echo "--- Download connector node package"
Expand Down Expand Up @@ -152,7 +155,7 @@ risedev ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-pubsub
risedev ci-start ci-pubsub-kafka
./scripts/source/prepare_ci_kafka.sh
cargo run --bin prepare_ci_pubsub
risedev slt './e2e_test/source/basic/*.slt'
Expand All @@ -168,6 +171,9 @@ echo "--- e2e, kafka alter source again"
./scripts/source/prepare_data_after_alter.sh 3
risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- e2e, inline test"
risedev slt './e2e_test/source_inline/**/*.slt'

echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
risedev slt './e2e_test/ch_benchmark/streaming/*.slt'
4 changes: 4 additions & 0 deletions e2e_test/source/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
> [!NOTE]
>
> Please write new tests according to the style in `e2e_test/source_inline`.

Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`
Expand Down
16 changes: 16 additions & 0 deletions e2e_test/source_inline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# "Inline" style source e2e tests

Compared with prior source tests (`e2e_test/source`), tests in this directory are expected to be easy to run locally and easy to write.

To run locally, use `risdev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service).
xxchan marked this conversation as resolved.
Show resolved Hide resolved
Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.)
according to the services started by `risdev d`.
xxchan marked this conversation as resolved.
Show resolved Hide resolved

```sh
risedev slt 'e2e_test/source-inline/**/*.slt'
xxchan marked this conversation as resolved.
Show resolved Hide resolved
```

To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment.
Use `system` command to setup instead.

Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details.
108 changes: 108 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# This file contains commands used by the tests.
Copy link
Member Author

@xxchan xxchan Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commands added here are actually not used in the slt file (although in theory they can). They are now mainly helper commands to run manually.

e.g., risedev rpk is similar to risedev psql. It sets RPK_BROKERS env var. So it's helpful for local testing.


[tasks.source-test-hook]
private = true
dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]

# Note about the Kafka CLI tooling:
# - Built-in Kafka console tools:
# Java based.
# Style example: kafka-topics.sh --bootstrap-server localhost:9092 --topic t --create
# Some limitations: cannot disable logging easily, cannot consume to end and then exit.
# - kcat:
# C based (rdkafka)
# Some limitations: cannot do admin operations, only consume/produce.
# - rpk:
# Golang based.
# Style example: RPK_BROKERS=localhost:9092 rpk topic create t
[tasks.kafka-hook]
private = true
description = "Check if Kafka is started by RiseDev"
dependencies = ["source-test-hook"]
script = '''
#!/usr/bin/env sh
set -e

if [ ! -d "${PREFIX_BIN}/kafka" ]; then
echo "Kafka is not installed in ${PREFIX_BIN}/kafka. Did you enable Kafka using $(tput setaf 4)\`./risedev configure\`$(tput sgr0)?"
exit 1
fi
xxchan marked this conversation as resolved.
Show resolved Hide resolved

# TODO: we may support risedev-env.override so that we can connect to a Kafka not started by risedev-dev.
if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?"
exit 1
fi
'''

[tasks.clean-kafka]
category = "RiseDev - Test - Source Test - Kafka"
description = "Delete all kafka topics."
dependencies = ["kafka-hook"]
command = "rpk"
args = ["topic", "delete", "-r", "*"]

[tasks.kafka-topics]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
script = """
#!/usr/bin/env sh
set -e
${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@"
"""

[tasks.kafka-produce]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
script = """
#!/usr/bin/env sh
set -e
${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@"
"""

[tasks.kafka-consume]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
script = """
#!/usr/bin/env sh
set -e
${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@"
"""

[tasks.kafka-consumer-groups]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
script = """
#!/usr/bin/env sh
set -e
${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@"
"""

# rpk tools
[tasks.rpk]
category = "RiseDev - Test - Source Test - Kafka"
dependencies = ["kafka-hook"]
# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options
script = """
#!/usr/bin/env sh
set -e

if [ -z "$(which rpk)" ]; then
echo "rpk is not installed. Install it via https://docs.redpanda.com/current/get-started/rpk-install/"
exit 1
fi

rpk "$@"
"""

[tasks.redpanda-console]
category = "RiseDev - Test - Source Test - Kafka"
description = "Start Redpanda console (Kafka GUI) at localhost:8080."
dependencies = ["kafka-hook"]
script = '''
#!/usr/bin/env sh
set -e
echo "$(tput setaf 2)Start Redpanda console at http://localhost:8080$(tput sgr0)"
docker run --network host -e KAFKA_BROKERS=$RPK_BROKERS docker.redpanda.com/redpandadata/console:latest
'''
74 changes: 74 additions & 0 deletions e2e_test/source_inline/kafka/add_partition.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Note: control substitution on will force us to use "\\n" instead of "\n" in commands
control substitution on

system ok
rpk topic create test_add_partition -p 3
Copy link
Member Author

@xxchan xxchan Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is for #15994 (incomplete yet, still need some more cases)


system ok
cat <<EOF | rpk topic produce test_add_partition -f "%p %v\\n" -p 0
0 {"x":"a"}
1 {"x":"b"}
2 {"x":"c"}
EOF

statement ok
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_add_partition',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE MATERIALIZED VIEW mv AS SELECT * from s;

query ?
SELECT * FROM s order by x;
----
a
b
c

sleep 2s

query ?
SELECT * FROM mv order by x;
----
a
b
c

system ok
rpk topic add-partitions test_add_partition --num 1

system ok
cat <<EOF | rpk topic produce test_add_partition -f "%p %v\\n" -p 0
3 {"x":"d"}
EOF


query ?
SELECT * FROM s order by x;
----
a
b
c
d

# It needs some time for the split change to be reflected in MV
sleep 40s

query ?
SELECT * FROM mv order by x;
----
a
b
c
d


statement ok
DROP SOURCE s CASCADE;

system ok
rpk topic delete test_add_partition
54 changes: 54 additions & 0 deletions e2e_test/source_inline/kafka/consumer_group.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env zx

// zx: A tool for writing better scripts
// https://google.github.io/zx/

const { mv: mv, topic: topic, _: _command } = minimist(process.argv.slice(3), {
string: ['mv', 'topic'],
_: ['list-members', 'list-lags'],
})
const command = _command[0];

async function get_fragment_id_of_mv(mv_name) {
const id = (
await $`
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \
--csv -t -c "select fragment_id from rw_materialized_views JOIN rw_fragments on rw_materialized_views.id = rw_fragments.table_id where name='${mv_name}';"
`
)
.toString()
.trim();
if (id == "") {
throw new Error(`Materialized view ${mv_name} not found`);
}
return id;
}

async function list_consumer_groups(fragment_id) {
const res =
await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}"`;
return res;
}

async function list_consumer_group_members(fragment_id) {
const res =
await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS""`;
return res;
}

async function list_consumer_group_lags(fragment_id, topic_name) {
const res =
await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -t {} | grep "${topic_name}""`;
return res;
}

const fragment_id = await get_fragment_id_of_mv(mv);
if (command == "list-groups") {
echo`${await list_consumer_groups(fragment_id)}`;
} else if (command == "list-members") {
echo`${await list_consumer_group_members(fragment_id)}`;
} else if (command == "list-lags") {
echo`${await list_consumer_group_lags(fragment_id, topic)}`;
} else {
throw new Error(`Invalid command: ${command}`);
}
Loading
Loading