diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index a99dd765fd59..872d503a4924 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo}; use risingwave_hummock_sdk::HummockEpoch; @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks( }) .collect(), direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 74aac44a1eac..cc1d63843fd9 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -33,7 +33,7 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner}; use risingwave_hummock_sdk::table_stats::TableStats; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id; @@ -2268,6 +2268,7 @@ async fn test_table_watermark() { table_watermarks: Some(( WatermarkDirection::Ascending, vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))], + WatermarkSerdeType::PkPrefix, )), switch_op_consistency_level: None, }, @@ -2595,6 +2596,7 @@ async fn test_table_watermark() { table_watermarks: Some(( WatermarkDirection::Ascending, vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))], + WatermarkSerdeType::PkPrefix, )), switch_op_consistency_level: None, }, diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index ab4c6d16d1e5..c465afc937c8 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -444,7 +444,7 @@ mod tests { use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; - use risingwave_common::row::{OwnedRow, Row, RowExt}; + use risingwave_common::row::{OwnedRow, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::test_epoch; use risingwave_common::util::row_serde::OrderedRowSerde; @@ -662,15 +662,15 @@ mod tests { pk_indices: &[usize], ) -> (TableKey, SharedBufferValue) { let r = OwnedRow::new(vec![ - Some(ScalarImpl::Int32(0 as i32)), - Some(ScalarImpl::Int32(0 as i32)), + Some(ScalarImpl::Int32(0_i32)), + Some(ScalarImpl::Int32(0_i32)), Some(ScalarImpl::Int32(index as i32)), // watermark column - Some(ScalarImpl::Int32(0 as i32)), + Some(ScalarImpl::Int32(0_i32)), ]); let pk = r.project(pk_indices); - let k1 = serialize_pk_with_vnode(pk, &pk_serde, VirtualNode::from_index(vnode)); + let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode)); let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice( format!("{}-value-{}", vnode, index).as_bytes(), )); @@ -700,20 +700,20 @@ mod tests { ); iter.rewind().await.unwrap(); - assert_eq!(iter.is_valid(), true); + assert!(iter.is_valid()); for i in 0..10 { let (k, _v) = gen_key_value(0, i, &pk_serde, &pk_indices); assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); iter.next().await.unwrap(); } + assert!(!iter.is_valid()); } { // test watermark let watermark = { let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); - let watermark = serialize_pk(r1, &watermark_col_serde); - watermark + serialize_pk(r1, &watermark_col_serde) }; let read_watermark = ReadTableWatermark { @@ -752,13 +752,13 @@ mod tests { ); iter.rewind().await.unwrap(); - assert_eq!(iter.is_valid(), true); + assert!(iter.is_valid()); for i in 5..10 { let (k, _v) = gen_key_value(0, i, &pk_serde, &pk_indices); assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); iter.next().await.unwrap(); } - assert_eq!(iter.is_valid(), false); + assert!(!iter.is_valid()); } } } diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 72482759a8ac..a068a9ea3b85 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,33 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::HashSet; use std::ops::Bound::{self, *}; -use std::sync::Arc; -use bytes::BytesMut; use futures::{pin_mut, StreamExt}; -use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; -use risingwave_common::hash::VirtualNode; use risingwave_common::row::{self, OwnedRow}; use risingwave_common::types::{DataType, Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::epoch::{test_epoch, EpochPair}; -use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; -use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; -use risingwave_pb::catalog::PbTable; -use risingwave_pb::common::PbColumnOrder; -use risingwave_pb::plan_common::ColumnCatalog; -use risingwave_storage::compaction_catalog_manager::{ - CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, -}; -use risingwave_storage::hummock::iterator::SkipWatermarkIterator; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::SINGLETON_VNODE;