From f0751e2e884b2f26af8853f4119872ff09ed72c1 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 26 Nov 2024 16:37:20 -0500 Subject: [PATCH] fix clear --- src/connector/src/source/base.rs | 2 ++ src/connector/src/source/mod.rs | 4 +--- src/frontend/src/handler/create_source.rs | 7 ++----- src/frontend/src/handler/create_table.rs | 3 ++- src/frontend/src/utils/with_options.rs | 6 ++++++ src/sqlparser/src/ast/statement.rs | 5 +++++ src/sqlparser/src/parser.rs | 16 ++++++---------- 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index e031a85a34d62..fc804ff4acdb6 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -58,6 +58,8 @@ const SPLIT_TYPE_FIELD: &str = "split_type"; const SPLIT_INFO_FIELD: &str = "split_info"; pub const UPSTREAM_SOURCE_KEY: &str = "connector"; +pub const WEBHOOK_CONNECTOR: &str = "webhook"; + pub trait TryFromBTreeMap: Sized + UnknownFields { /// Used to initialize the source properties from the raw untyped `WITH` options. fn try_from_btreemap( diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index d9f1d039e7ed7..21d0c7c3937d6 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -28,7 +28,7 @@ pub mod pulsar; use std::future::IntoFuture; -pub use base::{UPSTREAM_SOURCE_KEY, *}; +pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *}; pub(crate) use common::*; use google_cloud_pubsub::subscription::Subscription; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; @@ -55,8 +55,6 @@ pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; -pub const WEBHOOK_CONNECTOR: &str = "webhook"; - pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool { const PREFIXES: &[&str] = &[ "schema.registry", diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 635be76689351..2957818702990 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -52,12 +52,12 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; -pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_connector::source::{ ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, WEBHOOK_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; +pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR}; use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; @@ -1137,9 +1137,6 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), - WEBHOOK_CONNECTOR => hashmap!( - Format::Native => vec![Encode::Native], - ), )) }); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a517d1186b3bd..097a3a96ffec5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1337,7 +1337,8 @@ pub fn check_create_table_with_source( if cdc_table_info.is_some() { return Ok(format_encode); } - let defined_source = with_options.contains_key(UPSTREAM_SOURCE_KEY); + let defined_source = with_options.is_source_connector(); + if !include_column_options.is_empty() && !defined_source { return Err(ErrorCode::InvalidInputSyntax( "INCLUDE should be used with a connector".to_owned(), diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 9d61021dab4fe..cc8d6747aedd0 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -30,6 +30,7 @@ use risingwave_sqlparser::ast::{ use super::OverwriteOptions; use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; +use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR}; use crate::session::SessionImpl; use crate::Binder; @@ -156,6 +157,11 @@ impl WithOptions { ))) } } + + pub fn is_source_connector(&self) -> bool { + self.inner.contains_key(UPSTREAM_SOURCE_KEY) + && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR + } } /// Get the secret id from the name. diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 72680161defea..d817c8c2759ae 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -297,6 +297,11 @@ impl Parser<'_> { } } Ok(expected.into()) + } else if connector.contains("webhook") { + parser_err!( + "Source with webhook connector is not supported. \ + Please use the `CREATE TABLE ... WITH ...` statement instead.", + ); } else { Ok(parse_format_encode(self)?) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 8320deaebdf8f..09f6393eff19d 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -38,6 +38,7 @@ use crate::tokenizer::*; use crate::{impl_parse_to, parser_v2}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; +pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook"; #[derive(Debug, Clone, PartialEq)] pub enum ParserError { @@ -2551,21 +2552,16 @@ impl Parser<'_> { let include_options = self.parse_include_options()?; // PostgreSQL supports `WITH ( options )`, before `AS` - let mut with_options = self.parse_with_properties()?; + let with_options = self.parse_with_properties()?; let option = with_options .iter() .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY); let connector = option.map(|opt| opt.value.to_string()); - let contain_webhook = if let Some(connector) = &connector - && connector.contains("webhook") - { - // for webhook source, we clear all the options and use `webhook_info` to store the webhook info - with_options.clear(); - true - } else { - false - }; + let contain_webhook = + connector.is_some() && connector.as_ref().unwrap().contains(WEBHOOK_CONNECTOR); + + // webhook connector does not require row format let format_encode = if let Some(connector) = connector && !contain_webhook {