Skip to content

Commit

Permalink
fix clear
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 26, 2024
1 parent 5287275 commit f0751e2
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";
pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub const WEBHOOK_CONNECTOR: &str = "webhook";

pub trait TryFromBTreeMap: Sized + UnknownFields {
/// Used to initialize the source properties from the raw untyped `WITH` options.
fn try_from_btreemap(
Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub mod pulsar;

use std::future::IntoFuture;

pub use base::{UPSTREAM_SOURCE_KEY, *};
pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *};
pub(crate) use common::*;
use google_cloud_pubsub::subscription::Subscription;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
Expand All @@ -55,8 +55,6 @@ pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;

pub const WEBHOOK_CONNECTOR: &str = "webhook";

pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
const PREFIXES: &[&str] = &[
"schema.registry",
Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_connector::source::{
ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, WEBHOOK_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
Expand Down Expand Up @@ -1137,9 +1137,6 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
// support source stream job
Format::Plain => vec![Encode::Json],
),
WEBHOOK_CONNECTOR => hashmap!(
Format::Native => vec![Encode::Native],
),
))
});

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,8 @@ pub fn check_create_table_with_source(
if cdc_table_info.is_some() {
return Ok(format_encode);
}
let defined_source = with_options.contains_key(UPSTREAM_SOURCE_KEY);
let defined_source = with_options.is_source_connector();

if !include_column_options.is_empty() && !defined_source {
return Err(ErrorCode::InvalidInputSyntax(
"INCLUDE should be used with a connector".to_owned(),
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_sqlparser::ast::{
use super::OverwriteOptions;
use crate::catalog::ConnectionId;
use crate::error::{ErrorCode, Result as RwResult, RwError};
use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
use crate::session::SessionImpl;
use crate::Binder;

Expand Down Expand Up @@ -156,6 +157,11 @@ impl WithOptions {
)))
}
}

pub fn is_source_connector(&self) -> bool {
self.inner.contains_key(UPSTREAM_SOURCE_KEY)
&& self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
}
}

/// Get the secret id from the name.
Expand Down
5 changes: 5 additions & 0 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ impl Parser<'_> {
}
}
Ok(expected.into())
} else if connector.contains("webhook") {
parser_err!(
"Source with webhook connector is not supported. \
Please use the `CREATE TABLE ... WITH ...` statement instead.",
);
} else {
Ok(parse_format_encode(self)?)
}
Expand Down
16 changes: 6 additions & 10 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::tokenizer::*;
use crate::{impl_parse_to, parser_v2};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook";

#[derive(Debug, Clone, PartialEq)]
pub enum ParserError {
Expand Down Expand Up @@ -2551,21 +2552,16 @@ impl Parser<'_> {
let include_options = self.parse_include_options()?;

// PostgreSQL supports `WITH ( options )`, before `AS`
let mut with_options = self.parse_with_properties()?;
let with_options = self.parse_with_properties()?;

let option = with_options
.iter()
.find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY);
let connector = option.map(|opt| opt.value.to_string());
let contain_webhook = if let Some(connector) = &connector
&& connector.contains("webhook")
{
// for webhook source, we clear all the options and use `webhook_info` to store the webhook info
with_options.clear();
true
} else {
false
};
let contain_webhook =
connector.is_some() && connector.as_ref().unwrap().contains(WEBHOOK_CONNECTOR);

// webhook connector does not require row format
let format_encode = if let Some(connector) = connector
&& !contain_webhook
{
Expand Down

0 comments on commit f0751e2

Please sign in to comment.