Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mysql-cdc): make server.id in WITH option become optional #13031

Merged
merged 13 commits into from
Oct 26, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ echo "waiting for connector node to start"
wait_for_connector_node_start

echo "--- inline cdc test"
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'

echo "--- mysql & postgres cdc validate test"
Expand Down
133 changes: 133 additions & 0 deletions e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# create and drop CDC mysql tables concurrently

control substitution on

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

system ok
mysql --protocol=tcp -u root -e "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');
CREATE TABLE tt2 (v1 int primary key, v2 timestamp);
INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00');
CREATE TABLE tt3 (v1 int primary key, v2 timestamp);
INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00');
CREATE TABLE tt4 (v1 int primary key, v2 timestamp);
INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00');
CREATE TABLE tt5 (v1 int primary key, v2 timestamp);
INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt1',
);

statement ok
create table tt2 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt2',
);

statement ok
create table tt3 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt3',
);

statement ok
create table tt4 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt4',
);

statement ok
create table tt5 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt5',
);

sleep 3s

query IT
select * from tt1;
----
1 2023-10-23 10:00:00+00:00

query IT
select * from tt2;
----
2 2023-10-23 11:00:00+00:00

query IT
select * from tt3;
----
3 2023-10-23 12:00:00+00:00

query IT
select * from tt4;
----
4 2023-10-23 13:00:00+00:00

query IT
select * from tt5;
----
5 2023-10-23 14:00:00+00:00

statement ok
drop table tt1;

statement ok
drop table tt2;

statement ok
drop table tt3;

statement ok
drop table tt4;

statement ok
drop table tt5;
5 changes: 4 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ macro_rules! impl_cdc_source_type {
$(
$cdc_source_type,
)*
Unspecified,
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
PbSourceType::Unspecified => CdcSourceType::Unspecified,
$(
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
Expand All @@ -253,8 +254,10 @@ macro_rules! impl_cdc_source_type {
$(
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
CdcSourceType::Unspecified => PbSourceType::Unspecified,
}
}
}

}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{

const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";
const UPSTREAM_SOURCE_KEY: &str = "connector";
pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {

for_all_classified_sources!(impl_cdc_source_type);

impl<'a> From<&'a str> for CdcSourceType {
fn from(name: &'a str) -> Self {
match name {
MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
_ => CdcSourceType::Unspecified,
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod monitor;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
pub use base::*;
pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait = "0.1"
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.11"
rand = "0.8"
regex = "1"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
Expand Down
28 changes: 28 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use rand::Rng;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_fragment;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
};
Expand Down Expand Up @@ -428,6 +431,16 @@ impl DdlService for DdlServiceImpl {
// Generate source id.
let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category
fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph);

// Modify properties for cdc sources if needed
if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) {
if matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
) {
fill_cdc_mysql_server_id(&mut fragment_graph);
}
}
}

let mut stream_job = StreamingJob::Table(source, mview);
Expand Down Expand Up @@ -827,3 +840,18 @@ fn fill_table_source(
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}

// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
fn fill_cdc_mysql_server_id(fragment_graph: &mut PbStreamFragmentGraph) {
let rand_server_id = rand::thread_rng().gen_range(1..4294967295u32);
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
let props = &mut source_node.source_inner.as_mut().unwrap().properties;
props.insert("server.id".to_string(), rand_server_id.to_string());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this overwrite the user-specified server.id if they set it already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After consideration, I think we should still allow user to set the ID by their own, so I will revert here back to generate a server.id when it is not provided.
Btw I think it doesn't make sense to check the uniqueness of server.id in the validation, since we cannot lock the server.id to avoid other connector to use it at the time we create a connector. IMO we should provide some ways to improve observability for this run time errors.

tracing::debug!("generated `server.id` for mysql-cdc: {}", rand_server_id);
}
});
}
}
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ConnectorSource {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);

try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!("spawning connector split reader for split {:?}", splits);
tracing::debug!(?splits, ?prop, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
Expand Down
Loading