From 5a85b55a1946ff44d1f112a131b01ec038f87d6a Mon Sep 17 00:00:00 2001 From: Pierre Nowak Date: Fri, 13 Dec 2024 07:54:37 +0100 Subject: [PATCH] feat: include subject in NATS source (#19708) Co-authored-by: tabversion Co-authored-by: tabVersion --- e2e_test/source_inline/nats/operation.py | 6 ++- e2e_test/source_inline/nats/test.slt.serial | 52 ++++++++++++++++++- proto/plan_common.proto | 3 ++ .../src/parser/additional_columns.rs | 12 ++++- src/connector/src/parser/chunk_builder.rs | 8 ++- src/connector/src/parser/utils.rs | 7 +++ src/connector/src/source/base.rs | 2 + .../src/source/nats/source/message.rs | 18 ++++++- src/prost/build.rs | 1 + 9 files changed, 102 insertions(+), 7 deletions(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 5f0250c3404aa..b33a6fc1d2c15 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -7,7 +7,7 @@ NATS_SERVER = "nats://nats-server:4222" -async def create_stream(stream_name: str, subject: str): +async def create_stream(stream_name: str, subjects: str): # Create a NATS client nc = NATS() @@ -15,11 +15,13 @@ async def create_stream(stream_name: str, subject: str): # Connect to the NATS server await nc.connect(servers=[NATS_SERVER]) + # split subjects by comma + subjects = subjects.split(",") # Enable JetStream js = nc.jetstream() stream_config = StreamConfig( name=stream_name, - subjects=[subject], + subjects=subjects, retention="limits", # Retention policy (limits, interest, or workqueue) max_msgs=1000, # Maximum number of messages to retain max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes diff --git a/e2e_test/source_inline/nats/test.slt.serial b/e2e_test/source_inline/nats/test.slt.serial index 3f4227a0d05e5..772ef9cfabc4b 100644 --- a/e2e_test/source_inline/nats/test.slt.serial +++ b/e2e_test/source_inline/nats/test.slt.serial @@ -80,9 +80,59 @@ t system ok python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2 +statement ok +drop table t_nats; + +# test begin: include subject + +system ok +python3 e2e_test/source_inline/nats/operation.py create_stream "teststreamsubject" "testsubject1,testsubject2" + +system ok +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststreamsubject" "testsubject1" + +system ok +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststreamsubject" "testsubject2" + +statement ok +create table t_nats ( i int ) +include partition +include subject +include offset +with ( + connector = 'nats', + server_url='nats-server:4222', + subject='testsubject1,testsubject2', + connect_mode='plain', + consumer.durable_name = 'demo_subject', + consumer.ack_policy = 'all', + stream='teststreamsubject', + consumer.max_ack_pending = '100000') +format plain encode json; + +statement ok +select * from t_nats; + +sleep 3s + +statement ok +flush; + +query I rowsort +select distinct "_rw_nats_subject" from t_nats; +---- +testsubject1 +testsubject2 + +query T +select count(distinct "_rw_nats_offset") >= 200 from t_nats; +---- +t statement ok drop table t_nats; +# test end + statement ok -set streaming_use_shared_source to true; \ No newline at end of file +set streaming_use_shared_source to true; diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 4a93a84a9265b..b8b4989e78b8a 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -268,6 +268,8 @@ message AdditionalColumnPayload {} // this type means we read all headers as a whole message AdditionalColumnHeaders {} +message AdditionalSubject {} + message AdditionalColumn { oneof column_type { AdditionalColumnKey key = 1; @@ -282,6 +284,7 @@ message AdditionalColumn { AdditionalTableName table_name = 10; AdditionalCollectionName collection_name = 11; AdditionalColumnPayload payload = 12; + AdditionalSubject subject = 13; } } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 87ffee01b96e4..b1b6bcb8e693d 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::{ AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp, - AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, + AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject, AdditionalTableName, }; use crate::error::ConnectorResult; @@ -61,7 +61,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, // Assuming subject is a string + AdditionalColumn { + column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})), + }, + ), _ => unreachable!(), }; diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 21e81cda40d86..574a1a5bacacb 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -26,7 +26,7 @@ use thiserror_ext::AsReport; use super::MessageMeta; use crate::parser::utils::{ extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, - extract_timestamp_from_meta, + extract_subject_from_meta, extract_timestamp_from_meta, }; use crate::source::{SourceColumnDesc, SourceColumnType, SourceMeta}; @@ -231,6 +231,12 @@ impl SourceStreamChunkRowWriter<'_> { // collection name for `mongodb-cdc` should be parsed from the message payload parse_field(desc) } + (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|ele| extract_subject_from_meta(ele.meta)) + .unwrap_or(None), + )), (_, &Some(AdditionalColumnType::Partition(_))) => { // the meta info does not involve spec connector Ok(A::output_for( diff --git a/src/connector/src/parser/utils.rs b/src/connector/src/parser/utils.rs index 7ab716a81f118..7430e0e3157ac 100644 --- a/src/connector/src/parser/utils.rs +++ b/src/connector/src/parser/utils.rs @@ -157,3 +157,10 @@ pub fn extract_header_inner_from_meta<'a>( _ => None, } } + +pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option> { + match meta { + SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()), + _ => None, + } +} diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index f259d7953b288..3525152a35c27 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -41,6 +41,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::kinesis::KinesisMeta; use super::monitor::SourceMetrics; +use super::nats::source::NatsMeta; use super::nexmark::source::message::NexmarkMeta; use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::error::ConnectorResult as Result; @@ -652,6 +653,7 @@ pub enum SourceMeta { GooglePubsub(GooglePubsubMeta), Datagen(DatagenMeta), DebeziumCdc(DebeziumCdcMeta), + Nats(NatsMeta), // For the source that doesn't have meta data. Empty, } diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index 973d2b0fc1985..1cee8a6918d1a 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -13,16 +13,29 @@ // limitations under the License. use async_nats::jetstream::Message; +use risingwave_common::types::{DatumRef, ScalarRefImpl}; use crate::source::base::SourceMessage; use crate::source::{SourceMeta, SplitId}; +#[derive(Debug, Clone)] +pub struct NatsMeta { + pub subject: String, +} + +impl NatsMeta { + pub fn extract_subject(&self) -> DatumRef<'_> { + Some(ScalarRefImpl::Utf8(self.subject.as_str())) + } +} + #[derive(Clone, Debug)] pub struct NatsMessage { pub split_id: SplitId, pub sequence_number: String, pub payload: Vec, pub reply_subject: Option, + pub subject: String, } impl From for SourceMessage { @@ -36,7 +49,9 @@ impl From for SourceMessage { // use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run offset: message.reply_subject.unwrap_or_default(), split_id: message.split_id, - meta: SourceMeta::Empty, + meta: SourceMeta::Nats(NatsMeta { + subject: message.subject.clone(), + }), } } } @@ -51,6 +66,7 @@ impl NatsMessage { .message .reply .map(|subject| subject.as_str().to_owned()), + subject: message.message.subject.as_str().to_owned(), } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 9073c23b22698..e906c47efbd55 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -163,6 +163,7 @@ fn main() -> Result<(), Box> { .type_attribute("plan_common.AdditionalDatabaseName", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalSchemaName", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalTableName", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalSubject", "#[derive(Eq, Hash)]") .type_attribute( "plan_common.AdditionalCollectionName", "#[derive(Eq, Hash)]",