Skip to content

Commit

Permalink
feat(storage): fill missing columns with null for replicated state_ta…
Browse files Browse the repository at this point in the history
…ble (#13621)
  • Loading branch information
kwannoel authored Dec 20, 2023
1 parent 4fa341c commit e0a9b80
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl ArrayBuilderImpl {
dispatch_array_builder_variants!(self, inner, { inner.append(None) })
}

pub fn append_n_null(&mut self, n: usize) {
dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
}

/// Append a [`Datum`] or [`DatumRef`] multiple times,
/// panicking if the datum's type does not match the array builder's type.
pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl fmt::Debug for StreamChunk {
if f.alternate() {
write!(
f,
"StreamChunk {{ cardinality: {}, capacity: {}, data: \n{}\n }}",
"StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
self.cardinality(),
self.capacity(),
self.to_pretty()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::default::Default;
use std::vec;

use fixedbitset::FixedBitSet;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/row_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::row::{OwnedRow, Project, RowExt};

Expand All @@ -28,7 +30,6 @@ pub fn find_columns_by_ids(
table_columns: &[ColumnDesc],
column_ids: &[ColumnId],
) -> (Vec<ColumnDesc>, Vec<usize>) {
use std::collections::HashMap;
let mut table_columns = table_columns
.iter()
.enumerate()
Expand Down
123 changes: 114 additions & 9 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::default::Default;
use std::ops::Bound;
use std::ops::Bound::*;
Expand All @@ -23,7 +24,7 @@ use futures::{pin_mut, FutureExt, Stream, StreamExt};
use futures_async_stream::for_await;
use itertools::{izip, Itertools};
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{
Expand All @@ -32,6 +33,7 @@ use risingwave_common::catalog::{
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
use risingwave_common::util::row_serde::OrderedRowSerde;
Expand Down Expand Up @@ -147,6 +149,13 @@ pub struct StateTableInner<
/// We will need to use to build data chunks from state table rows.
data_types: Vec<DataType>,

/// "i" here refers to the base state_table's actual schema.
/// "o" here refers to the replicated state table's output schema.
/// This mapping is used to reconstruct a row being written from replicated state table.
/// Such that the schema of this row will match the full schema of the base state table.
/// It is only applicable for replication.
i2o_mapping: ColIndexMapping,

/// Output indices
/// Used for:
/// 1. Computing output_value_indices to ser/de replicated rows.
Expand Down Expand Up @@ -203,6 +212,9 @@ where
}

// initialize
// FIXME(kwannoel): Enforce that none of the constructors here
// should be used by replicated state table.
// Apart from from_table_catalog_inner.
impl<S, SD, const IS_REPLICATED: bool, W, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, W, USE_WATERMARK_CACHE>
where
Expand Down Expand Up @@ -234,7 +246,7 @@ where
store: S,
vnodes: Option<Arc<Bitmap>>,
is_consistent_op: bool,
output_indices: Vec<usize>,
output_column_ids: Vec<ColumnId>,
) -> Self {
let table_id = TableId::new(table_catalog.id);
let table_columns: Vec<ColumnDesc> = table_catalog
Expand Down Expand Up @@ -342,6 +354,34 @@ where
StateTableWatermarkCache::new(0)
};

// Get info for replicated state table.
let output_column_ids_to_input_idx = output_column_ids
.iter()
.enumerate()
.map(|(pos, id)| (*id, pos))
.collect::<HashMap<_, _>>();

// Compute column descriptions
let columns: Vec<ColumnDesc> = table_catalog
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().into())
.collect_vec();

// Compute i2o mapping
let mut i2o_mapping = vec![None; columns.len()];
let mut output_column_indices = vec![];
for (i, column) in columns.iter().enumerate() {
if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
i2o_mapping[i] = Some(*pos);
output_column_indices.push(i);
}
}
let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_indices.len());

// Compute output indices
let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);

Self {
table_id,
local_store: local_state_store,
Expand All @@ -360,6 +400,7 @@ where
watermark_cache,
data_types,
output_indices,
i2o_mapping,
}
}

Expand Down Expand Up @@ -533,6 +574,7 @@ where
watermark_cache,
data_types,
output_indices: vec![],
i2o_mapping: ColIndexMapping::new(vec![], 0),
}
}

Expand Down Expand Up @@ -638,13 +680,7 @@ where
vnodes: Option<Arc<Bitmap>>,
output_column_ids: Vec<ColumnId>,
) -> Self {
let columns = table_catalog
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().into())
.collect_vec();
let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
Self::from_table_catalog_inner(table_catalog, store, vnodes, false, output_indices).await
Self::from_table_catalog_inner(table_catalog, store, vnodes, false, output_column_ids).await
}
}

Expand Down Expand Up @@ -891,10 +927,19 @@ where
}
}

fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
}

/// Write batch with a `StreamChunk` which should have the same schema with the table.
// allow(izip, which use zip instead of zip_eq)
#[allow(clippy::disallowed_methods)]
pub fn write_chunk(&mut self, chunk: StreamChunk) {
let chunk = if IS_REPLICATED {
self.fill_non_output_indices(chunk)
} else {
chunk
};
let (chunk, op) = chunk.into_parts();

let vnodes = compute_chunk_vnode(
Expand Down Expand Up @@ -1490,3 +1535,63 @@ fn end_range_to_memcomparable<R: Row>(
}
}
}

fn fill_non_output_indices(
i2o_mapping: &ColIndexMapping,
data_types: &[DataType],
chunk: StreamChunk,
) -> StreamChunk {
let cardinality = chunk.cardinality();
let (ops, columns, vis) = chunk.into_inner();
let mut full_columns = Vec::with_capacity(data_types.len());
for (i, data_type) in data_types.iter().enumerate() {
if let Some(j) = i2o_mapping.try_map(i) {
full_columns.push(columns[j].clone());
} else {
let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
column_builder.append_n_null(cardinality);
let column: ArrayRef = column_builder.finish().into();
full_columns.push(column)
}
}
let data_chunk = DataChunk::new(full_columns, vis);
StreamChunk::from_parts(ops, data_chunk)
}

#[cfg(test)]
mod tests {
use std::fmt::Debug;

use expect_test::{expect, Expect};

use super::*;

fn check(actual: impl Debug, expect: Expect) {
let actual = format!("{:#?}", actual);
expect.assert_eq(&actual);
}

#[test]
fn test_fill_non_output_indices() {
let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
let replicated_chunk = [OwnedRow::new(vec![
Some(222_i32.into()),
Some(2_i32.into()),
])];
let replicated_chunk = StreamChunk::from_parts(
vec![Op::Insert],
DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
);
let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
check(
filled_chunk,
expect![[r#"
StreamChunk { cardinality: 1, capacity: 1, data:
+---+---+---+-----+
| + | 2 | | 222 |
+---+---+---+-----+
}"#]],
);
}
}
Loading

0 comments on commit e0a9b80

Please sign in to comment.