Skip to content

Commit

Permalink
fix(streaming): handle pause on bootstrap for Now and Values (#12716
Browse files Browse the repository at this point in the history
)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 10, 2023
1 parent ac514f9 commit d2e37ea
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 28 deletions.
2 changes: 0 additions & 2 deletions e2e_test/batch/transaction/now.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Disabled, see https://github.com/risingwavelabs/risingwave/issues/10887

statement ok
create table t (ts timestamp);

Expand Down
8 changes: 1 addition & 7 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,7 @@ impl Barrier {
}
}

/// Whether this barrier is for configuration change. Used for source executor initialization.
pub fn is_update(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Update { .. }))
}

/// Whether this barrier is for resume. Used for now executor to determine whether to yield a
/// chunk and a watermark before this barrier.
/// Whether this barrier is for resume.
pub fn is_resume(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Resume))
}
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl<S: StateStore> NowExecutor<S> {
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
Expand All @@ -104,7 +105,7 @@ impl<S: StateStore> NowExecutor<S> {
// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause | Mutation::Update { .. } => paused = true,
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
}
Expand Down
13 changes: 13 additions & 0 deletions src/stream/src/executor/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,23 @@ impl ValuesExecutor {
.unwrap();

let emit = barrier.is_newly_added(self.ctx.id);
let paused_on_startup = barrier.is_pause_on_startup();

yield Message::Barrier(barrier);

// If it's failover, do not evaluate rows (assume they have been yielded)
if emit {
if paused_on_startup {
// Wait for the data stream to be resumed before yielding the chunks.
while let Some(barrier) = barrier_receiver.recv().await {
let is_resume = barrier.is_resume();
yield Message::Barrier(barrier);
if is_resume {
break;
}
}
}

let cardinality = schema.len();
ensure!(cardinality > 0);
while !rows.is_empty() {
Expand Down
6 changes: 6 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ impl Session {
self.query_tx.send((sql.into(), tx)).await?;
rx.await?
}

/// Run `FLUSH` on the session.
pub async fn flush(&mut self) -> Result<()> {
self.run("FLUSH").await?;
Ok(())
}
}

/// Options for killing nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,43 @@
use std::time::Duration;

use anyhow::Result;
use risingwave_simulation::cluster::Configuration;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::nexmark::NexmarkCluster;
use risingwave_simulation::utils::AssertResult;
use tokio::time::{sleep, timeout};

const CREATE_TABLE: &str = "CREATE TABLE t (v int)";
const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)";
const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t";

const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid";
const SELECT: &str = "SELECT * FROM count_bid";

const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction";
const SELECT_2: &str = "SELECT * FROM count_auction";

const SET_PARAMETER: &str = "ALTER SYSTEM SET pause_on_next_bootstrap TO true";

#[derive(Clone, Copy)]
enum ResumeBy {
Risectl,
Restart,
}

impl ResumeBy {
async fn resume(self, cluster: &mut Cluster) -> Result<()> {
match self {
ResumeBy::Risectl => cluster.resume().await?,
ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await,
};
Ok(())
}
}

async fn test_impl(resume_by: ResumeBy) -> Result<()> {
const CREATE_TABLE: &str = "CREATE TABLE t (v int)";
const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)";
const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t";

const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid";
const SELECT: &str = "SELECT * FROM count_bid";

const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction";
const SELECT_2: &str = "SELECT * FROM count_auction";

const CREATE_VALUES: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)";
const SELECT_VALUES: &str = "SELECT count(*) FROM values";

let mut cluster = NexmarkCluster::new(
Configuration {
meta_nodes: 1,
Expand Down Expand Up @@ -77,18 +91,21 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
// New streaming jobs should also start from paused.
cluster.run(CREATE_2).await?;
sleep(Duration::from_secs(10)).await;
cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the
cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the aggregation
// result will be 0 instead of empty or NULL

// `VALUES` should also be paused.
tokio::time::timeout(Duration::from_secs(10), cluster.run(CREATE_VALUES))
.await
.expect_err("`VALUES` should be paused so creation should never complete");

// DML on tables should be blocked.
let result = timeout(Duration::from_secs(10), cluster.run(INSERT_INTO_TABLE)).await;
assert!(result.is_err());
cluster.run(SELECT_COUNT_TABLE).await?.assert_result_eq("0");

match resume_by {
ResumeBy::Risectl => cluster.resume().await?,
ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await,
}
// Resume the cluster.
resume_by.resume(&mut cluster).await?;
sleep(Duration::from_secs(10)).await;

// The source should be resumed.
Expand All @@ -100,17 +117,22 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
{
let mut session = cluster.start_session();

session.run("FLUSH").await?;
session.flush().await?;
let count: i64 = session.run(SELECT_COUNT_TABLE).await?.parse().unwrap();

session.run(INSERT_INTO_TABLE).await?;
session.run("FLUSH").await?;
session.flush().await?;
session
.run(SELECT_COUNT_TABLE)
.await?
.assert_result_eq(format!("{}", count + 1));
}

if let ResumeBy::Risectl = resume_by {
// `VALUES` should be successfully created
cluster.run(SELECT_VALUES).await?.assert_result_eq("3");
}

Ok(())
}

Expand All @@ -123,3 +145,64 @@ async fn test_pause_on_bootstrap_resume_by_risectl() -> Result<()> {
async fn test_pause_on_bootstrap_resume_by_restart() -> Result<()> {
test_impl(ResumeBy::Restart).await
}

// The idea is similar to `e2e_test/batch/transaction/now.slt`.
async fn test_temporal_filter(resume_by: ResumeBy) -> Result<()> {
const CREATE_TABLE: &str = "create table t (ts timestamp)";
const CREATE_TEMPORAL_FILTER: &str =
"create materialized view mv as select count(*) from t where ts at time zone 'utc' >= now()";
const INSERT_TIMESTAMPS: &str = "
insert into t select * from generate_series(
now() at time zone 'utc' - interval '10' second,
now() at time zone 'utc' + interval '20' second,
interval '1' second / 20
);
";
const SELECT: &str = "select * from mv";

let mut cluster = Cluster::start(Configuration {
meta_nodes: 1,
..Configuration::for_scale()
})
.await?;

cluster.run(SET_PARAMETER).await?;

{
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(CREATE_TEMPORAL_FILTER).await?;
session.run(INSERT_TIMESTAMPS).await?;
session.flush().await?;
};

// Kill the meta node and wait for the service to recover.
cluster.kill_nodes(["meta-1"], 0).await;
sleep(Duration::from_secs(10)).await;

let count: i32 = cluster.run(SELECT).await?.parse()?;
assert_ne!(count, 0, "the following tests are meaningless");

sleep(Duration::from_secs(10)).await;
let new_count: i32 = cluster.run(SELECT).await?.parse()?;
assert_eq!(count, new_count, "temporal filter should have been paused");

// Resume the cluster.
resume_by.resume(&mut cluster).await?;
sleep(Duration::from_secs(40)).await; // 40 seconds is enough for all timestamps to be expired

let count: i32 = cluster.run(SELECT).await?.parse()?;
assert_eq!(count, 0, "temporal filter should have been resumed");

Ok(())
}

#[tokio::test]
async fn test_pause_on_bootstrap_temporal_filter_resume_by_risectl() -> Result<()> {
test_temporal_filter(ResumeBy::Risectl).await
}

#[tokio::test]
async fn test_pause_on_bootstrap_temporal_filter_resume_by_restart() -> Result<()> {
test_temporal_filter(ResumeBy::Restart).await
}

0 comments on commit d2e37ea

Please sign in to comment.