Skip to content

Commit

Permalink
feat: include subject in NATS source (#19708)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent a474f5e commit 5a85b55
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 7 deletions.
6 changes: 4 additions & 2 deletions e2e_test/source_inline/nats/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@

NATS_SERVER = "nats://nats-server:4222"

async def create_stream(stream_name: str, subject: str):
async def create_stream(stream_name: str, subjects: str):
# Create a NATS client
nc = NATS()

try:
# Connect to the NATS server
await nc.connect(servers=[NATS_SERVER])

# split subjects by comma
subjects = subjects.split(",")
# Enable JetStream
js = nc.jetstream()
stream_config = StreamConfig(
name=stream_name,
subjects=[subject],
subjects=subjects,
retention="limits", # Retention policy (limits, interest, or workqueue)
max_msgs=1000, # Maximum number of messages to retain
max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes
Expand Down
52 changes: 51 additions & 1 deletion e2e_test/source_inline/nats/test.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,59 @@ t
system ok
python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2

statement ok
drop table t_nats;

# test begin: include subject

system ok
python3 e2e_test/source_inline/nats/operation.py create_stream "teststreamsubject" "testsubject1,testsubject2"

system ok
python3 e2e_test/source_inline/nats/operation.py produce_stream "teststreamsubject" "testsubject1"

system ok
python3 e2e_test/source_inline/nats/operation.py produce_stream "teststreamsubject" "testsubject2"

statement ok
create table t_nats ( i int )
include partition
include subject
include offset
with (
connector = 'nats',
server_url='nats-server:4222',
subject='testsubject1,testsubject2',
connect_mode='plain',
consumer.durable_name = 'demo_subject',
consumer.ack_policy = 'all',
stream='teststreamsubject',
consumer.max_ack_pending = '100000')
format plain encode json;

statement ok
select * from t_nats;

sleep 3s

statement ok
flush;

query I rowsort
select distinct "_rw_nats_subject" from t_nats;
----
testsubject1
testsubject2

query T
select count(distinct "_rw_nats_offset") >= 200 from t_nats;
----
t

statement ok
drop table t_nats;

# test end

statement ok
set streaming_use_shared_source to true;
set streaming_use_shared_source to true;
3 changes: 3 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ message AdditionalColumnPayload {}
// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

message AdditionalSubject {}

message AdditionalColumn {
oneof column_type {
AdditionalColumnKey key = 1;
Expand All @@ -282,6 +284,7 @@ message AdditionalColumn {
AdditionalTableName table_name = 10;
AdditionalCollectionName collection_name = 11;
AdditionalColumnPayload payload = 12;
AdditionalSubject subject = 13;
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::{
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject, AdditionalTableName,
};

use crate::error::ConnectorResult;
Expand Down Expand Up @@ -61,7 +61,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
),
(
NATS_CONNECTOR,
HashSet::from(["partition", "offset", "payload"]),
HashSet::from(["partition", "offset", "payload", "subject"]),
),
(
OPENDAL_S3_CONNECTOR,
Expand Down Expand Up @@ -267,6 +267,14 @@ pub fn build_additional_column_desc(
)),
},
),
"subject" => ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar, // Assuming subject is a string
AdditionalColumn {
column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
},
),
_ => unreachable!(),
};

Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use thiserror_ext::AsReport;
use super::MessageMeta;
use crate::parser::utils::{
extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
extract_timestamp_from_meta,
extract_subject_from_meta, extract_timestamp_from_meta,
};
use crate::source::{SourceColumnDesc, SourceColumnType, SourceMeta};

Expand Down Expand Up @@ -231,6 +231,12 @@ impl SourceStreamChunkRowWriter<'_> {
// collection name for `mongodb-cdc` should be parsed from the message payload
parse_field(desc)
}
(_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|ele| extract_subject_from_meta(ele.meta))
.unwrap_or(None),
)),
(_, &Some(AdditionalColumnType::Partition(_))) => {
// the meta info does not involve spec connector
Ok(A::output_for(
Expand Down
7 changes: 7 additions & 0 deletions src/connector/src/parser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,10 @@ pub fn extract_header_inner_from_meta<'a>(
_ => None,
}
}

pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
match meta {
SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()),
_ => None,
}
}
2 changes: 2 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::kinesis::KinesisMeta;
use super::monitor::SourceMetrics;
use super::nats::source::NatsMeta;
use super::nexmark::source::message::NexmarkMeta;
use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::error::ConnectorResult as Result;
Expand Down Expand Up @@ -652,6 +653,7 @@ pub enum SourceMeta {
GooglePubsub(GooglePubsubMeta),
Datagen(DatagenMeta),
DebeziumCdc(DebeziumCdcMeta),
Nats(NatsMeta),
// For the source that doesn't have meta data.
Empty,
}
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/source/nats/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,29 @@
// limitations under the License.

use async_nats::jetstream::Message;
use risingwave_common::types::{DatumRef, ScalarRefImpl};

use crate::source::base::SourceMessage;
use crate::source::{SourceMeta, SplitId};

#[derive(Debug, Clone)]
pub struct NatsMeta {
pub subject: String,
}

impl NatsMeta {
pub fn extract_subject(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Utf8(self.subject.as_str()))
}
}

#[derive(Clone, Debug)]
pub struct NatsMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Vec<u8>,
pub reply_subject: Option<String>,
pub subject: String,
}

impl From<NatsMessage> for SourceMessage {
Expand All @@ -36,7 +49,9 @@ impl From<NatsMessage> for SourceMessage {
// use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run
offset: message.reply_subject.unwrap_or_default(),
split_id: message.split_id,
meta: SourceMeta::Empty,
meta: SourceMeta::Nats(NatsMeta {
subject: message.subject.clone(),
}),
}
}
}
Expand All @@ -51,6 +66,7 @@ impl NatsMessage {
.message
.reply
.map(|subject| subject.as_str().to_owned()),
subject: message.message.subject.as_str().to_owned(),
}
}
}
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("plan_common.AdditionalDatabaseName", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalSchemaName", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalTableName", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalSubject", "#[derive(Eq, Hash)]")
.type_attribute(
"plan_common.AdditionalCollectionName",
"#[derive(Eq, Hash)]",
Expand Down

0 comments on commit 5a85b55

Please sign in to comment.