Skip to content

Commit

Permalink
fix(source): revert create source reader with retry (#19754)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 12, 2024
1 parent 9f929d9 commit 8e1d60b
Showing 1 changed file with 16 additions and 121 deletions.
137 changes: 16 additions & 121 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use anyhow::anyhow;
use either::Either;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::array::ArrayRef;
Expand All @@ -41,7 +38,6 @@ use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::Instrument;

use super::executor_core::StreamSourceCore;
use super::{
Expand All @@ -50,7 +46,6 @@ use super::{
};
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::UpdateMutation;

Expand Down Expand Up @@ -505,98 +500,20 @@ impl<S: StateStore> SourceExecutor<S> {

let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
tracing::debug!(state = ?recover_state, "start with state");

let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut reader_and_splits: Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)> = None;
let seek_to_latest = self.is_shared_non_cdc && is_uninitialized;
let source_reader = source_desc.source.clone();
let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc);
let source_ctx = Arc::new(source_ctx);
let mut build_source_stream_fut = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match source_reader
.build_stream(
recover_state.clone(),
column_ids.clone(),
source_ctx.clone(),
seek_to_latest,
)
.await {
Ok((stream, latest_splits)) => Ok((stream, latest_splits)),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("build_source_stream_with_retry"))
.await
.expect("Retry build source stream until success.")
});

let mut need_resume_after_build = false;
// loop to create source stream until success
loop {
if let Some(barrier) = build_source_stream_and_poll_barrier(
&mut barrier_stream,
&mut reader_and_splits,
&mut build_source_stream_fut,
let (source_chunk_reader, latest_splits) = self
.build_stream_source_reader(
&source_desc,
recover_state,
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
// Note that shared CDC source is special. It already starts from latest.
self.is_shared_non_cdc && is_uninitialized,
)
.await?
{
if let Message::Barrier(barrier) = barrier {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);

// update the rate limit option, we will apply the rate limit
// when we finish building the source stream.
self.rate_limit_rps = *new_rate_limit;
}
}
Mutation::Resume => {
// We record the Resume mutation here and postpone the resume of the source stream
// after we have successfully built the source stream.
need_resume_after_build = true;
}
_ => {
// ignore other mutations and output a warn log
tracing::warn!(
"Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before
finish building source stream.",
mutation
);
}
}
}

// bump state store epoch
let _ = self.persist_state_and_clear_cache(barrier.epoch).await?;
yield Message::Barrier(barrier);
} else {
unreachable!("Only barrier message is expected when building source stream.");
}
} else {
assert!(reader_and_splits.is_some());
tracing::info!("source stream created successfully");
break;
}
}
let (source_chunk_reader, latest_splits) =
reader_and_splits.expect("source chunk reader and splits must be created");

let source_chunk_reader = apply_rate_limit(source_chunk_reader, self.rate_limit_rps)
.boxed()
.map_err(StreamExecutorError::connector_error);
.instrument_await("source_build_reader")
.await?;
let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error);
if let Some(latest_splits) = latest_splits {
// make sure it is written to state table later.
// Then even it receives no messages, we can observe it in state table.
Expand All @@ -608,12 +525,13 @@ impl<S: StateStore> SourceExecutor<S> {
}
// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);
let mut command_paused = false;

// - If the first barrier requires us to pause on startup and we haven't received a Resume mutation, pause the stream.
if is_pause_on_startup && !need_resume_after_build {
// - If the first barrier requires us to pause on startup, pause the stream.
if is_pause_on_startup {
tracing::info!("source paused on startup");
stream.pause_stream();
command_paused = true;
Expand Down Expand Up @@ -828,29 +746,6 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

async fn build_source_stream_and_poll_barrier(
barrier_stream: &mut BoxStream<'static, StreamExecutorResult<Message>>,
reader_and_splits: &mut Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)>,
build_future: &mut Pin<
Box<impl Future<Output = (BoxChunkSourceStream, Option<Vec<SplitImpl>>)>>,
>,
) -> StreamExecutorResult<Option<Message>> {
if reader_and_splits.is_some() {
return Ok(None);
}

tokio::select! {
biased;
build_ret = &mut *build_future => {
*reader_and_splits = Some(build_ret);
Ok(None)
}
msg = barrier_stream.next() => {
msg.transpose()
}
}
}

impl<S: StateStore> Execute for SourceExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
if self.stream_source_core.is_some() {
Expand Down

0 comments on commit 8e1d60b

Please sign in to comment.