Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mysql-cdc): make server.id in WITH option become optional #13031

Merged
merged 13 commits into from
Oct 26, 2023
9 changes: 1 addition & 8 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ echo "waiting for connector node to start"
wait_for_connector_node_start

echo "--- inline cdc test"
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'

echo "--- mysql & postgres cdc validate test"
Expand All @@ -95,21 +96,13 @@ sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt'

# kill cluster and the connector node
cargo make kill
pkill -f connector-node
echo "cluster killed "

# insert new rows
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql
psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql
echo "inserted new rows into mysql and postgres"

# start cluster w/o clean-data
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
touch .risingwave/log/connector-node.log
./connector-node/start-service.sh -p $node_port >> .risingwave/log/connector-node.log 2>&1 &
echo "(recovery) waiting for connector node to start"
wait_for_connector_node_start
StrikeW marked this conversation as resolved.
Show resolved Hide resolved

cargo make dev ci-1cn-1fe-with-recovery
echo "wait for cluster recovery finish"
sleep 20
Expand Down
133 changes: 133 additions & 0 deletions e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# create and drop CDC mysql tables concurrently

control substitution on

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

system ok
mysql --protocol=tcp -u root -e "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');
CREATE TABLE tt2 (v1 int primary key, v2 timestamp);
INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00');
CREATE TABLE tt3 (v1 int primary key, v2 timestamp);
INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00');
CREATE TABLE tt4 (v1 int primary key, v2 timestamp);
INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00');
CREATE TABLE tt5 (v1 int primary key, v2 timestamp);
INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt1',
);

statement ok
create table tt2 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt2',
);

statement ok
create table tt3 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt3',
);

statement ok
create table tt4 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt4',
);

statement ok
create table tt5 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt5',
);

sleep 3s

query IT
select * from tt1;
----
1 2023-10-23 10:00:00+00:00

query IT
select * from tt2;
----
2 2023-10-23 11:00:00+00:00

query IT
select * from tt3;
----
3 2023-10-23 12:00:00+00:00

query IT
select * from tt4;
----
4 2023-10-23 13:00:00+00:00

query IT
select * from tt5;
----
5 2023-10-23 14:00:00+00:00

statement ok
drop table tt1;

statement ok
drop table tt2;

statement ok
drop table tt3;

statement ok
drop table tt4;

statement ok
drop table tt5;
5 changes: 4 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ macro_rules! impl_cdc_source_type {
$(
$cdc_source_type,
)*
Unspecified,
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
PbSourceType::Unspecified => CdcSourceType::Unspecified,
$(
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
Expand All @@ -253,8 +254,10 @@ macro_rules! impl_cdc_source_type {
$(
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
CdcSourceType::Unspecified => PbSourceType::Unspecified,
}
}
}

}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{

const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";
const UPSTREAM_SOURCE_KEY: &str = "connector";
pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {

for_all_classified_sources!(impl_cdc_source_type);

impl<'a> From<&'a str> for CdcSourceType {
fn from(name: &'a str) -> Self {
match name {
MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
_ => CdcSourceType::Unspecified,
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod monitor;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
pub use base::*;
pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
Expand Down
36 changes: 36 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use anyhow::anyhow;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_fragment;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
};
Expand Down Expand Up @@ -428,6 +430,21 @@ impl DdlService for DdlServiceImpl {
// Generate source id.
let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category
fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph);

// Modify properties for cdc sources if needed
if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) {
if matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
) {
let server_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::MySqlCdc }>()
.await?;
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
fill_cdc_mysql_server_id(server_id, &mut fragment_graph);
}
}
}

let mut stream_job = StreamingJob::Table(source, mview);
Expand Down Expand Up @@ -798,3 +815,22 @@ fn fill_table_source(
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}

fn fill_cdc_mysql_server_id(server_id: u64, fragment_graph: &mut PbStreamFragmentGraph) {
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
let props = &mut source_node.source_inner.as_mut().unwrap().properties;

// Modify properties for cdc sources if needed
if props.contains_key("server.id") {
return;
}
props
.entry("server.id".to_string())
.or_insert(server_id.to_string());
tracing::debug!("server.id no set, generate one {}", server_id);
}
});
}
}
6 changes: 6 additions & 0 deletions src/meta/src/manager/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub mod IdCategory {
pub const CompactionGroup: IdCategoryType = 15;
pub const Function: IdCategoryType = 16;
pub const Connection: IdCategoryType = 17;
pub const MySqlCdc: IdCategoryType = 18;
}

pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;
Expand All @@ -159,6 +160,7 @@ pub struct IdGeneratorManager {
parallel_unit: Arc<StoredIdGenerator>,
compaction_group: Arc<StoredIdGenerator>,
connection: Arc<StoredIdGenerator>,
mysql_cdc: Arc<StoredIdGenerator>,
}

impl IdGeneratorManager {
Expand Down Expand Up @@ -215,6 +217,9 @@ impl IdGeneratorManager {
connection: Arc::new(
StoredIdGenerator::new(meta_store.clone(), "connection", None).await,
),
mysql_cdc: Arc::new(
StoredIdGenerator::new(meta_store.clone(), "mysql_cdc", Some(20210401)).await,
),
}
}

Expand All @@ -236,6 +241,7 @@ impl IdGeneratorManager {
IdCategory::HummockCompactionTask => &self.hummock_compaction_task,
IdCategory::CompactionGroup => &self.compaction_group,
IdCategory::Connection => &self.connection,
IdCategory::MySqlCdc => &self.mysql_cdc,
_ => unreachable!(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ConnectorSource {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);

try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!("spawning connector split reader for split {:?}", splits);
tracing::debug!(?splits, ?prop, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
Expand Down
Loading