Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jun 26, 2024
1 parent 73d8614 commit ee935f5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
3 changes: 3 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

statement ok
create secret mysql_pwd with (
backend = 'meta'
) as '${MYSQL_PWD:}';
Expand Down Expand Up @@ -197,10 +198,12 @@ SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_flo
-128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00
NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL

statement ok
create secret pg_pwd with (
backend = 'meta'
) as '${PGPASSWORD:}';

statement ok
create secret pg_username with (
backend = 'meta'
) as '${PGUSER:$USER}';
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
use risingwave_sqlparser::ast::{
ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, Query, Statement,
ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format,
Query, Statement,
};
use risingwave_sqlparser::parser::Parser;

Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct SinkPlanContext {
pub async fn gen_sink_plan(
handler_args: HandlerArgs,
stmt: CreateSinkStatement,
explain_options: Option<ExplainOptions>,
) -> Result<SinkPlanContext> {
let session = handler_args.session.clone();
let session = session.as_ref();
Expand All @@ -98,7 +100,11 @@ pub async fn gen_sink_plan(

let partition_info = get_partition_compute_info(&resolved_with_options).await?;

let context = OptimizerContext::from_handler_args(handler_args.clone());
let context = if let Some(explain_options) = explain_options {
OptimizerContext::new(handler_args.clone(), explain_options)
} else {
OptimizerContext::from_handler_args(handler_args.clone())
};

// Used for debezium's table name
let sink_from_table_name;
Expand Down Expand Up @@ -419,7 +425,7 @@ pub async fn handle_create_sink(
sink_plan: plan,
sink_catalog: sink,
target_table_catalog,
} = gen_sink_plan(handle_args, stmt).await?;
} = gen_sink_plan(handle_args, stmt, None).await?;

let has_order_by = !query.order_by.is_empty();
if has_order_by {
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ async fn do_handle_explain(
(Ok(plan), context)
}
Statement::CreateSink { stmt } => {
// let context = OptimizerContext::new(handler_args, explain_options);
let plan = gen_sink_plan(handler_args, stmt)
let plan = gen_sink_plan(handler_args, stmt, Some(explain_options))
.await
.map(|plan| plan.sink_plan)?;
let context = plan.ctx();
Expand Down

0 comments on commit ee935f5

Please sign in to comment.