Skip to content

Commit

Permalink
fix check
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Dec 25, 2024
1 parent 3127678 commit 49a48ad
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 27 deletions.
3 changes: 2 additions & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks(
})
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
use risingwave_hummock_sdk::table_stats::TableStats;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id;
Expand Down Expand Up @@ -2268,6 +2268,7 @@ async fn test_table_watermark() {
table_watermarks: Some((
WatermarkDirection::Ascending,
vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))],
WatermarkSerdeType::PkPrefix,
)),
switch_op_consistency_level: None,
},
Expand Down Expand Up @@ -2595,6 +2596,7 @@ async fn test_table_watermark() {
table_watermarks: Some((
WatermarkDirection::Ascending,
vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))],
WatermarkSerdeType::PkPrefix,
)),
switch_op_consistency_level: None,
},
Expand Down
20 changes: 10 additions & 10 deletions src/storage/src/hummock/iterator/skip_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ mod tests {
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::row::{OwnedRow, RowExt};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::test_epoch;
use risingwave_common::util::row_serde::OrderedRowSerde;
Expand Down Expand Up @@ -662,15 +662,15 @@ mod tests {
pk_indices: &[usize],
) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
let r = OwnedRow::new(vec![
Some(ScalarImpl::Int32(0 as i32)),
Some(ScalarImpl::Int32(0 as i32)),
Some(ScalarImpl::Int32(0_i32)),
Some(ScalarImpl::Int32(0_i32)),
Some(ScalarImpl::Int32(index as i32)), // watermark column
Some(ScalarImpl::Int32(0 as i32)),
Some(ScalarImpl::Int32(0_i32)),
]);

let pk = r.project(pk_indices);

let k1 = serialize_pk_with_vnode(pk, &pk_serde, VirtualNode::from_index(vnode));
let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
format!("{}-value-{}", vnode, index).as_bytes(),
));
Expand Down Expand Up @@ -700,20 +700,20 @@ mod tests {
);

iter.rewind().await.unwrap();
assert_eq!(iter.is_valid(), true);
assert!(iter.is_valid());
for i in 0..10 {
let (k, _v) = gen_key_value(0, i, &pk_serde, &pk_indices);
assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
iter.next().await.unwrap();
}
assert!(!iter.is_valid());
}

{
// test watermark
let watermark = {
let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
let watermark = serialize_pk(r1, &watermark_col_serde);
watermark
serialize_pk(r1, &watermark_col_serde)
};

let read_watermark = ReadTableWatermark {
Expand Down Expand Up @@ -752,13 +752,13 @@ mod tests {
);

iter.rewind().await.unwrap();
assert_eq!(iter.is_valid(), true);
assert!(iter.is_valid());
for i in 5..10 {
let (k, _v) = gen_key_value(0, i, &pk_serde, &pk_indices);
assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
iter.next().await.unwrap();
}
assert_eq!(iter.is_valid(), false);
assert!(!iter.is_valid());
}
}
}
16 changes: 1 addition & 15 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::HashSet;
use std::ops::Bound::{self, *};
use std::sync::Arc;

use bytes::BytesMut;
use futures::{pin_mut, StreamExt};
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{self, OwnedRow};
use risingwave_common::types::{DataType, Scalar, ScalarImpl, Timestamptz};
use risingwave_common::util::epoch::{test_epoch, EpochPair};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::ColumnCatalog;
use risingwave_storage::compaction_catalog_manager::{
CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
};
use risingwave_storage::hummock::iterator::SkipWatermarkIterator;
use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::SINGLETON_VNODE;
Expand Down

0 comments on commit 49a48ad

Please sign in to comment.