Skip to content

Commit

Permalink
fix(stream/materialize): incorrect implementation in ignore-conflict …
Browse files Browse the repository at this point in the history
…behavior (#14345)

Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang authored Jan 4, 2024
1 parent 0a95b8c commit f9da72e
Showing 1 changed file with 121 additions and 38 deletions.
159 changes: 121 additions & 38 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use std::ops::Deref;
use std::sync::Arc;

use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Schema, TableId};
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{CompactedRow, RowDeserializer};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -171,11 +169,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {

let fixed_changes = self
.materialize_cache
.handle_conflict(
row_ops,
&self.state_table,
&self.conflict_behavior,
)
.handle(row_ops, &self.state_table, &self.conflict_behavior)
.await?;

match generate_output(fixed_changes, data_types.clone())? {
Expand Down Expand Up @@ -337,7 +331,7 @@ impl MaterializeBuffer {
}

pub fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
let entry = self.buffer.entry(pk);
let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
e.insert(KeyOp::Delete(old_value));
Expand Down Expand Up @@ -419,13 +413,7 @@ pub struct MaterializeCache<SD> {
_serde: PhantomData<SD>,
}

#[derive(EnumAsInner, EstimateSize)]
pub enum CacheValue {
Overwrite(Option<CompactedRow>),
Ignore(Option<EmptyValue>),
}

type EmptyValue = ();
type CacheValue = Option<CompactedRow>;

impl<SD: ValueRowSerde> MaterializeCache<SD> {
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
Expand All @@ -438,7 +426,7 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
}
}

pub async fn handle_conflict<'a, S: StateStore>(
pub async fn handle<'a, S: StateStore>(
&mut self,
row_ops: Vec<(Op, Vec<u8>, Bytes)>,
table: &StateTableInner<S, SD>,
Expand All @@ -459,7 +447,7 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
Op::Insert | Op::UpdateInsert => {
match conflict_behavior {
ConflictBehavior::Overwrite => {
match self.force_get(&key).as_overwrite().unwrap() {
match self.force_get(&key) {
Some(old_row) => fixed_changes.update(
key.clone(),
old_row.row.clone(),
Expand All @@ -470,7 +458,7 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
update_cache = true;
}
ConflictBehavior::IgnoreConflict => {
match self.force_get(&key).as_ignore().unwrap() {
match self.force_get(&key) {
Some(_) => (),
None => {
fixed_changes.insert(key.clone(), value.clone());
Expand All @@ -484,13 +472,10 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
if update_cache {
match conflict_behavior {
ConflictBehavior::Overwrite => {
self.data.push(
key,
CacheValue::Overwrite(Some(CompactedRow { row: value })),
);
self.data.push(key, Some(CompactedRow { row: value }));
}
ConflictBehavior::IgnoreConflict => {
self.data.push(key, CacheValue::Ignore(Some(())));
self.data.push(key, Some(CompactedRow { row: value }));
}
_ => unreachable!(),
}
Expand All @@ -500,28 +485,30 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
Op::Delete | Op::UpdateDelete => {
match conflict_behavior {
ConflictBehavior::Overwrite => {
match self.force_get(&key).as_overwrite().unwrap() {
match self.force_get(&key) {
Some(old_row) => {
fixed_changes.delete(key.clone(), old_row.row.clone());
}
None => (), // delete a nonexistent value
};
update_cache = true;
}
ConflictBehavior::IgnoreConflict => (),
ConflictBehavior::IgnoreConflict => {
match self.force_get(&key) {
Some(old_row) => {
if old_row.row == value {
fixed_changes.delete(key.clone(), old_row.row.clone());
update_cache = true;
}
}
None => (), // delete a nonexistent value
};
}
_ => unreachable!(),
};

if update_cache {
match conflict_behavior {
ConflictBehavior::Overwrite => {
self.data.push(key, CacheValue::Overwrite(None));
}
ConflictBehavior::IgnoreConflict => {
self.data.push(key, CacheValue::Ignore(Some(())));
}
_ => unreachable!(),
}
self.data.push(key, None);
}
}
}
Expand Down Expand Up @@ -560,10 +547,8 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
while let Some(result) = buffered.next().await {
let (key, value) = result;
match conflict_behavior {
ConflictBehavior::Overwrite => self.data.push(key, CacheValue::Overwrite(value?)),
ConflictBehavior::IgnoreConflict => {
self.data.push(key, CacheValue::Ignore(value?.map(|_| ())))
}
ConflictBehavior::Overwrite => self.data.push(key, value?),
ConflictBehavior::IgnoreConflict => self.data.push(key, value?),
_ => unreachable!(),
};
}
Expand Down Expand Up @@ -1245,6 +1230,104 @@ mod tests {
}
}

#[tokio::test]
async fn test_ignore_delete_then_insert() {
// Prepare storage and memtable.
let memory_state_store = MemoryStateStore::new();
let table_id = TableId::new(1);
// Two columns of int32 type, the first column is PK.
let schema = Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
]);
let column_ids = vec![0.into(), 1.into()];

// test insert after delete one pk, the latter insert should succeed.
let chunk1 = StreamChunk::from_pretty(
" i i
+ 1 3
- 1 3
+ 1 6",
);

// Prepare stream executors.
let source = MockSource::with_messages(
schema.clone(),
PkIndices::new(),
vec![
Message::Barrier(Barrier::new_test_barrier(1)),
Message::Chunk(chunk1),
Message::Barrier(Barrier::new_test_barrier(2)),
],
);

let order_types = vec![OrderType::ascending()];
let column_descs = vec![
ColumnDesc::unnamed(column_ids[0], DataType::Int32),
ColumnDesc::unnamed(column_ids[1], DataType::Int32),
];

let table = StorageTable::for_test(
memory_state_store.clone(),
table_id,
column_descs,
order_types,
vec![0],
vec![0, 1],
);

let mut materialize_executor = Box::new(
MaterializeExecutor::for_test(
Box::new(source),
memory_state_store,
table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids,
1,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::IgnoreConflict,
)
.await,
)
.execute();
let _msg1 = materialize_executor
.next()
.await
.transpose()
.unwrap()
.unwrap()
.as_barrier()
.unwrap();
let _msg2 = materialize_executor
.next()
.await
.transpose()
.unwrap()
.unwrap()
.as_chunk()
.unwrap();
let _msg3 = materialize_executor
.next()
.await
.transpose()
.unwrap()
.unwrap()
.as_barrier()
.unwrap();

let row = table
.get_row(
&OwnedRow::new(vec![Some(1_i32.into())]),
HummockReadEpoch::NoWait(u64::MAX),
)
.await
.unwrap();
assert_eq!(
row,
Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
);
}

#[tokio::test]
async fn test_ignore_delete_and_update_conflict() {
// Prepare storage and memtable.
Expand Down

0 comments on commit f9da72e

Please sign in to comment.