diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs index 8332cc4d1d453..e113f5f695f60 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -169,14 +169,14 @@ async fn read_hummock_table_watermarks( for (vnode, epoch, watermark) in table_watermarks .watermarks - .into_iter() + .iter() .flat_map(move |(epoch, watermarks)| { - watermarks.into_iter().flat_map(move |vnode_watermark| { + watermarks.iter().flat_map(move |vnode_watermark| { let watermark = vnode_watermark.watermark().clone(); let vnodes = vnode_watermark.vnode_bitmap().iter_ones().collect_vec(); vnodes .into_iter() - .map(move |vnode| (vnode, epoch, Vec::from(watermark.as_ref()))) + .map(move |vnode| (vnode, *epoch, Vec::from(watermark.as_ref()))) }) }) { diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index fb65be5796d54..57becef3a9b6d 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -144,5 +144,9 @@ harness = false name = "bench_imm_compact" harness = false +[[bench]] +name = "bench_table_watermarks" +harness = false + [lints] workspace = true diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs new file mode 100644 index 0000000000000..11ec3c4bdcb54 --- /dev/null +++ b/src/storage/benches/bench_table_watermarks.rs @@ -0,0 +1,259 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(lazy_cell)] + +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, LazyLock}; + +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use itertools::Itertools; +use risingwave_common::buffer::{Bitmap, BitmapBuilder}; +use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; +use risingwave_common::util::epoch::test_epoch; +use risingwave_hummock_sdk::table_watermark::{ + TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, +}; +use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::HummockEpoch; +use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; +use spin::Mutex; +use tokio::sync::mpsc::unbounded_channel; + +fn vnode_bitmaps(part_count: usize) -> impl Iterator> { + static BITMAP_CACHE: LazyLock>>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + assert_eq!(VirtualNode::COUNT % part_count, 0); + let mut cache = BITMAP_CACHE.lock(); + match cache.entry(part_count) { + Entry::Occupied(entry) => entry.get().clone().into_iter(), + Entry::Vacant(entry) => entry + .insert({ + let part_size = VirtualNode::COUNT / part_count; + (0..part_count) + .map(move |part_idx| { + let start = part_idx * part_size; + let end = part_idx * part_size + part_size; + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in start..end { + bitmap.set(i, true); + } + Arc::new(bitmap.finish()) + }) + .collect() + }) + .clone() + .into_iter(), + } +} + +fn gen_watermark(epoch_idx: usize) -> Bytes { + static WATERMARK_CACHE: LazyLock>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + let mut cache = WATERMARK_CACHE.lock(); + match cache.entry(epoch_idx) { + Entry::Occupied(entry) => entry.get().clone(), + Entry::Vacant(entry) => entry + .insert(Bytes::copy_from_slice( + format!("key_test_{:010}", epoch_idx).as_bytes(), + )) + .clone(), + } +} + +fn gen_epoch_watermarks( + epoch_idx: usize, + vnode_part_count: usize, +) -> (HummockEpoch, Vec) { + ( + test_epoch(epoch_idx as _), + vnode_bitmaps(vnode_part_count) + .map(|bitmap| VnodeWatermark::new(bitmap, gen_watermark(epoch_idx))) + .collect_vec(), + ) +} + +fn gen_committed_table_watermarks( + old_epoch_idx: usize, + new_epoch_idx: usize, + vnode_part_count: usize, +) -> TableWatermarks { + assert!(old_epoch_idx <= new_epoch_idx); + TableWatermarks { + watermarks: (old_epoch_idx..=new_epoch_idx) + .map(|epoch_idx| { + let (epoch, watermarks) = gen_epoch_watermarks(epoch_idx, vnode_part_count); + (epoch, watermarks.into()) + }) + .collect(), + direction: WatermarkDirection::Ascending, + } +} + +fn gen_version( + old_epoch_idx: usize, + new_epoch_idx: usize, + table_count: usize, + vnode_part_count: usize, +) -> HummockVersion { + let table_watermarks = Arc::new(gen_committed_table_watermarks( + old_epoch_idx, + new_epoch_idx, + vnode_part_count, + )); + // let table_watermarks = + // gen_committed_table_watermarks(old_epoch_idx, new_epoch_idx, vnode_part_count); + HummockVersion { + id: new_epoch_idx as _, + max_committed_epoch: test_epoch(new_epoch_idx as _), + safe_epoch: test_epoch(old_epoch_idx as _), + table_watermarks: (0..table_count) + .map(|table_id| (TableId::new(table_id as _), table_watermarks.clone())) + .collect(), + ..Default::default() + } +} + +fn bench_table_watermarks(c: &mut Criterion) { + let version_count = 500; + let epoch_count = 1000; + let table_count = 1; + let vnode_part_count = 16; + let pre_generated_versions: VecDeque<_> = (1..version_count + 1) + .map(|epoch_idx| { + gen_version( + epoch_idx, + epoch_idx + epoch_count, + table_count, + vnode_part_count, + ) + }) + .collect(); + c.bench_function("new pinned version", |b| { + b.iter_batched( + || pre_generated_versions.clone(), + |mut versions| { + let mut pinned_version = + PinnedVersion::new(versions.pop_front().unwrap(), unbounded_channel().0); + while let Some(version) = versions.pop_front() { + pinned_version = pinned_version.new_pin_version(version); + } + }, + BatchSize::SmallInput, + ) + }); + + let safe_epoch_idx: usize = 9500; + let committed_epoch_idx: usize = 10000; + let staging_epoch_count: usize = 500; + let vnode_part_count = 16; + + let mut table_watermarks = TableWatermarksIndex::new_committed( + gen_committed_table_watermarks(safe_epoch_idx, committed_epoch_idx, vnode_part_count) + .into(), + test_epoch(committed_epoch_idx as u64), + ); + for i in 0..staging_epoch_count { + let (epoch, watermarks) = + gen_epoch_watermarks(committed_epoch_idx + i + 1, vnode_part_count); + table_watermarks.add_epoch_watermark( + epoch, + watermarks.into(), + WatermarkDirection::Ascending, + ); + } + let table_watermarks = table_watermarks; + let committed_watermarks = gen_committed_table_watermarks( + safe_epoch_idx + 1, + committed_epoch_idx + 1, + vnode_part_count, + ); + let batch_size = 100; + c.bench_function("apply committed watermark", |b| { + b.iter_batched( + || { + (0..batch_size) + .map(|_| (table_watermarks.clone(), committed_watermarks.clone())) + .collect_vec() + }, + |list| { + for (mut table_watermarks, committed_watermarks) in list { + table_watermarks.apply_committed_watermarks( + committed_watermarks.into(), + test_epoch((committed_epoch_idx + 1) as _), + ); + } + }, + BatchSize::SmallInput, + ) + }); + + // Code for the original table watermark index + // let mut table_watermarks = + // gen_committed_table_watermarks(safe_epoch_idx, committed_epoch_idx, vnode_part_count) + // .build_index(test_epoch(committed_epoch_idx as u64)); + // for i in 0..staging_epoch_count { + // let (epoch, watermarks) = + // gen_epoch_watermarks(committed_epoch_idx + i + 1, vnode_part_count); + // table_watermarks.add_epoch_watermark(epoch, &watermarks, WatermarkDirection::Ascending); + // } + // let table_watermarks = table_watermarks; + // let committed_watermarks = gen_committed_table_watermarks( + // safe_epoch_idx + 1, + // committed_epoch_idx + 1, + // vnode_part_count, + // ) + // .build_index(test_epoch((committed_epoch_idx + 1) as _)); + // let batch_size = 100; + // c.bench_function("apply committed watermark", |b| { + // b.iter_batched( + // || { + // (0..batch_size) + // .map(|_| (table_watermarks.clone(), committed_watermarks.clone())) + // .collect_vec() + // }, + // |list| { + // for (mut table_watermarks, committed_watermarks) in list { + // table_watermarks.apply_committed_watermarks(&committed_watermarks); + // } + // }, + // BatchSize::SmallInput, + // ) + // }); + + c.bench_function("read latest watermark", |b| { + b.iter(|| { + for i in 0..VirtualNode::COUNT { + let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); + } + }) + }); + + c.bench_function("read committed watermark", |b| { + b.iter(|| { + for i in 0..VirtualNode::COUNT { + let _ = table_watermarks.read_watermark( + VirtualNode::from_index(i), + test_epoch(committed_epoch_idx as u64), + ); + } + }) + }); +} + +criterion_group!(benches, bench_table_watermarks); +criterion_main!(benches); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 9e07598d07920..2a1e924d99144 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -33,7 +34,7 @@ use super::StateTableId; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; -use crate::table_watermark::{TableWatermarks, TableWatermarksIndex, VnodeWatermark}; +use crate::table_watermark::{TableWatermarks, VnodeWatermark}; use crate::version::{HummockVersion, HummockVersionDelta}; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; @@ -195,18 +196,6 @@ impl HummockVersion { .unwrap_or(0) } - pub fn build_table_watermarks_index(&self) -> HashMap { - self.table_watermarks - .iter() - .map(|(table_id, table_watermarks)| { - ( - *table_id, - table_watermarks.build_index(self.max_committed_epoch), - ) - }) - .collect() - } - pub fn safe_epoch_table_watermarks( &self, existing_table_ids: &[u32], @@ -598,25 +587,34 @@ impl HummockVersion { for table_id in &version_delta.removed_table_ids { let _ = self.table_watermarks.remove(table_id); } + + let mut modified_table_watermarks: HashMap = HashMap::new(); + for (table_id, table_watermarks) in &version_delta.new_table_watermarks { - match self.table_watermarks.entry(*table_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().apply_new_table_watermarks(table_watermarks); - } - Entry::Vacant(entry) => { - entry.insert(table_watermarks.clone()); - } + if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) { + let mut current_table_watermarks = (**current_table_watermarks).clone(); + current_table_watermarks.apply_new_table_watermarks(table_watermarks); + modified_table_watermarks.insert(*table_id, current_table_watermarks); + } else { + modified_table_watermarks.insert(*table_id, table_watermarks.clone()); } } if version_delta.safe_epoch != self.safe_epoch { assert!(version_delta.safe_epoch > self.safe_epoch); - self.table_watermarks - .values_mut() - .for_each(|table_watermarks| { - table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch) - }); + for (table_id, table_watermarks) in &self.table_watermarks { + let table_watermarks = modified_table_watermarks + .entry(*table_id) + .or_insert_with(|| (**table_watermarks).clone()); + table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch); + } self.safe_epoch = version_delta.safe_epoch; } + + for (table_id, table_watermarks) in modified_table_watermarks { + self.table_watermarks + .insert(table_id, Arc::new(table_watermarks)); + } + sst_split_info } diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index ed496618f8a33..083662a5c8923 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -14,18 +14,19 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; -use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::mem::size_of; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; use bytes::Bytes; +use itertools::Itertools; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; -use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; +use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks}; use tracing::{debug, warn}; use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange}; @@ -37,40 +38,12 @@ pub struct ReadTableWatermark { pub vnode_watermarks: BTreeMap, } -impl ReadTableWatermark { - pub fn merge_multiple(mut watermarks: Vec) -> Option { - fn merge_other(this: &mut ReadTableWatermark, other: ReadTableWatermark) { - assert_eq!(this.direction, other.direction); - for (vnode, watermark) in other.vnode_watermarks { - match this.vnode_watermarks.entry(vnode) { - btree_map::Entry::Vacant(entry) => { - entry.insert(watermark); - } - btree_map::Entry::Occupied(mut entry) => { - let prev_watermark = entry.get(); - let overwrite = match this.direction { - WatermarkDirection::Ascending => watermark > prev_watermark, - WatermarkDirection::Descending => watermark < prev_watermark, - }; - if overwrite { - entry.insert(watermark); - } - } - } - } - } - let mut ret = watermarks.pop()?; - while let Some(watermark) = watermarks.pop() { - merge_other(&mut ret, watermark); - } - Some(ret) - } -} - #[derive(Clone)] pub struct TableWatermarksIndex { - watermark_direction: WatermarkDirection, - index: HashMap>, + pub watermark_direction: WatermarkDirection, + // later epoch at the back + pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, + pub committed_watermarks: Option>, latest_epoch: HummockEpoch, committed_epoch: HummockEpoch, } @@ -79,23 +52,43 @@ impl TableWatermarksIndex { pub fn new(watermark_direction: WatermarkDirection, committed_epoch: HummockEpoch) -> Self { Self { watermark_direction, - index: Default::default(), + staging_watermarks: VecDeque::new(), + committed_watermarks: None, latest_epoch: committed_epoch, committed_epoch, } } - pub fn index(&self) -> &HashMap> { - &self.index + pub fn new_committed( + committed_watermarks: Arc, + committed_epoch: HummockEpoch, + ) -> Self { + Self { + watermark_direction: committed_watermarks.direction, + staging_watermarks: VecDeque::new(), + committed_epoch, + latest_epoch: committed_epoch, + committed_watermarks: Some(committed_watermarks), + } } pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option { - self.index.get(&vnode).and_then(|epoch_watermarks| { - epoch_watermarks - .upper_bound(Included(&epoch)) - .value() - .cloned() - }) + // iterate from new epoch to old epoch + for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain( + self.committed_watermarks + .iter() + .flat_map(|watermarks| watermarks.watermarks.iter().rev()), + ) { + if *watermark_epoch > epoch { + continue; + } + for vnode_watermark in vnode_watermark_list.as_ref() { + if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) { + return Some(vnode_watermark.watermark.clone()); + } + } + } + None } pub fn latest_watermark(&self, vnode: VirtualNode) -> Option { @@ -208,45 +201,54 @@ impl TableWatermarksIndex { pub fn add_epoch_watermark( &mut self, epoch: HummockEpoch, - vnode_watermark_list: &Vec, + vnode_watermark_list: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { assert!(epoch > self.latest_epoch); assert_eq!(self.watermark_direction, direction); self.latest_epoch = epoch; - for vnode_watermark in vnode_watermark_list { - for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { - let epoch_watermarks = self.index.entry(vnode).or_default(); - if let Some((prev_epoch, prev_watermark)) = epoch_watermarks.last_key_value() { - assert!(*prev_epoch < epoch); - match self.watermark_direction { - WatermarkDirection::Ascending => { - assert!(vnode_watermark.watermark >= prev_watermark); - } - WatermarkDirection::Descending => { - assert!(vnode_watermark.watermark <= prev_watermark); - } - }; - }; - assert!(self - .index - .entry(vnode) - .or_default() - .insert(epoch, vnode_watermark.watermark.clone()) - .is_none()); + #[cfg(debug_assertions)] + { + let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + for vnode_watermark in vnode_watermark_list.as_ref() { + for vnode in vnode_watermark.vnode_bitmap.iter_ones() { + assert!(!vnode_is_set.is_set(vnode)); + vnode_is_set.set(vnode, true); + let vnode = VirtualNode::from_index(vnode); + if let Some(prev_watermark) = self.latest_watermark(vnode) { + match self.watermark_direction { + WatermarkDirection::Ascending => { + assert!(vnode_watermark.watermark >= prev_watermark); + } + WatermarkDirection::Descending => { + assert!(vnode_watermark.watermark <= prev_watermark); + } + }; + } + } } } + self.staging_watermarks + .push_back((epoch, vnode_watermark_list)); } - pub fn apply_committed_watermarks(&mut self, committed_index: &TableWatermarksIndex) { - self.committed_epoch = committed_index.committed_epoch; - for (vnode, committed_epoch_watermark) in &committed_index.index { - let epoch_watermark = self.index.entry(*vnode).or_default(); - // keep only watermark higher than committed epoch - *epoch_watermark = epoch_watermark.split_off(&committed_index.committed_epoch); - for (epoch, watermark) in committed_epoch_watermark { - epoch_watermark.insert(*epoch, watermark.clone()); - } + pub fn apply_committed_watermarks( + &mut self, + committed_watermark: Arc, + committed_epoch: HummockEpoch, + ) { + assert_eq!(self.watermark_direction, committed_watermark.direction); + assert!(self.committed_epoch <= committed_epoch); + if self.committed_epoch == committed_epoch { + return; + } + self.committed_epoch = committed_epoch; + self.committed_watermarks = Some(committed_watermark); + // keep only watermark higher than committed epoch + while let Some((old_epoch, _)) = self.staging_watermarks.front() + && *old_epoch <= committed_epoch + { + let _ = self.staging_watermarks.pop_front(); } } } @@ -324,7 +326,7 @@ impl VnodeWatermark { #[derive(Clone, Debug, PartialEq)] pub struct TableWatermarks { // later epoch at the back - pub watermarks: Vec<(HummockEpoch, Vec)>, + pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub direction: WatermarkDirection, } @@ -336,7 +338,7 @@ impl TableWatermarks { ) -> Self { Self { direction, - watermarks: vec![(epoch, watermarks)], + watermarks: vec![(epoch, Arc::from(watermarks))], } } @@ -375,7 +377,7 @@ impl TableWatermarks { pub fn add_new_epoch_watermarks( &mut self, epoch: HummockEpoch, - watermarks: Vec, + watermarks: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { assert_eq!(self.direction, direction); @@ -396,8 +398,8 @@ impl TableWatermarks { .watermarks .iter() .map(VnodeWatermark::from_protobuf) - .collect(); - (epoch, watermarks) + .collect_vec(); + (epoch, Arc::from(watermarks)) }) .collect(), direction: if pb.is_ascending { @@ -407,20 +409,6 @@ impl TableWatermarks { }, } } - - pub fn build_index(&self, committed_epoch: HummockEpoch) -> TableWatermarksIndex { - let mut ret = TableWatermarksIndex { - index: HashMap::new(), - watermark_direction: self.direction, - latest_epoch: HummockEpoch::MIN, - committed_epoch: HummockEpoch::MIN, - }; - for (epoch, vnode_watermark_list) in &self.watermarks { - ret.add_epoch_watermark(*epoch, vnode_watermark_list, self.direction); - } - ret.committed_epoch = committed_epoch; - ret - } } pub fn merge_multiple_new_table_watermarks( @@ -446,7 +434,7 @@ pub fn merge_multiple_new_table_watermarks( epoch_watermarks .entry(new_epoch) .or_insert_with(Vec::new) - .extend(new_epoch_watermarks); + .extend(new_epoch_watermarks.iter().cloned()); } } } @@ -457,7 +445,10 @@ pub fn merge_multiple_new_table_watermarks( TableWatermarks { direction, // ordered from earlier epoch to later epoch - watermarks: epoch_watermarks.into_iter().collect(), + watermarks: epoch_watermarks + .into_iter() + .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) + .collect(), }, ) }) @@ -508,7 +499,7 @@ impl TableWatermarks { while let Some((epoch, _)) = self.watermarks.last() { if *epoch >= safe_epoch { let (epoch, watermarks) = self.watermarks.pop().expect("have check Some"); - for watermark in &watermarks { + for watermark in watermarks.as_ref() { for vnode in watermark.vnode_bitmap.iter_vnodes() { unset_vnode.remove(&vnode); } @@ -522,7 +513,7 @@ impl TableWatermarks { && let Some((_, watermarks)) = self.watermarks.pop() { let mut new_vnode_watermarks = Vec::new(); - for vnode_watermark in watermarks { + for vnode_watermark in watermarks.as_ref() { let mut set_vnode = Vec::new(); for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { if unset_vnode.remove(&vnode) { @@ -537,7 +528,7 @@ impl TableWatermarks { let bitmap = Arc::new(builder.finish()); new_vnode_watermarks.push(VnodeWatermark { vnode_bitmap: bitmap, - watermark: vnode_watermark.watermark, + watermark: vnode_watermark.watermark.clone(), }) } } @@ -545,9 +536,15 @@ impl TableWatermarks { if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut() && *last_epoch == safe_epoch { - last_watermarks.extend(new_vnode_watermarks); + *last_watermarks = Arc::from( + last_watermarks + .iter() + .cloned() + .chain(new_vnode_watermarks.into_iter()) + .collect_vec(), + ); } else { - result_epoch_watermark.push((safe_epoch, new_vnode_watermarks)); + result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks))); } } } @@ -577,6 +574,7 @@ mod tests { use std::vec; use bytes::Bytes; + use itertools::Itertools; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; @@ -619,7 +617,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), - )], + )] + .into(), direction, ); @@ -639,7 +638,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), - )], + )] + .into(), direction, ); let epoch4 = epoch3.next_epoch(); @@ -649,7 +649,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); second_table_watermark.add_new_epoch_watermarks( @@ -657,7 +658,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); @@ -687,7 +689,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), - )], + )] + .into(), direction, ); let epoch3 = epoch2.next_epoch(); @@ -696,7 +699,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), - )], + )] + .into(), direction, ); let epoch4 = epoch3.next_epoch(); @@ -706,7 +710,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); @@ -725,6 +730,7 @@ mod tests { build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), )] + .into() ), ( epoch3, @@ -732,6 +738,7 @@ mod tests { build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), )] + .into() ), ( epoch5, @@ -739,6 +746,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -756,6 +764,7 @@ mod tests { build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), )] + .into() ), ( epoch5, @@ -763,6 +772,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -780,6 +790,7 @@ mod tests { build_bitmap((1..3).chain(5..VirtualNode::COUNT)), watermark3.clone() )] + .into() ), ( epoch5, @@ -787,6 +798,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -806,6 +818,7 @@ mod tests { watermark3.clone() ) ] + .into() )], direction, } @@ -814,7 +827,7 @@ mod tests { #[test] fn test_merge_multiple_new_table_watermarks() { - fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Vec) { + fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) { ( epoch, bitmaps @@ -823,7 +836,8 @@ mod tests { watermark: Bytes::from(vec![1, 2, epoch as _]), vnode_bitmap: Arc::new(bitmap.clone()), }) - .collect(), + .collect_vec() + .into(), ) } fn build_table_watermark( @@ -896,12 +910,12 @@ mod tests { let mut index = TableWatermarksIndex::new(direction, COMMITTED_EPOCH); index.add_epoch_watermark( EPOCH1, - &vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())], + vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), direction, ); index.add_epoch_watermark( EPOCH2, - &vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())], + vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(), direction, ); @@ -1036,28 +1050,30 @@ mod tests { vec![VnodeWatermark { watermark: watermark1.clone(), vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), - }], + }] + .into(), )], direction: WatermarkDirection::Ascending, - }, + } + .into(), + ); + index.apply_committed_watermarks( + version + .table_watermarks + .get(&test_table_id) + .unwrap() + .clone(), + EPOCH1, ); - let committed_index = version - .build_table_watermarks_index() - .remove(&test_table_id) - .unwrap(); - index.apply_committed_watermarks(&committed_index); assert_eq!(EPOCH1, index.committed_epoch); assert_eq!(EPOCH2, index.latest_epoch); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); - let epoch_watermark = index.index.get(&vnode).unwrap(); if (1..5).contains(&vnode.to_index()) { - assert_eq!(2, epoch_watermark.len()); - assert_eq!(&watermark1, epoch_watermark.get(&EPOCH1).unwrap()); - assert_eq!(&watermark2, epoch_watermark.get(&EPOCH2).unwrap()); + assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); + assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap()); } else { - assert_eq!(1, epoch_watermark.len()); - assert_eq!(&watermark1, epoch_watermark.get(&EPOCH1).unwrap()); + assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); } } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 0c6680d7cee86..6f227b9889363 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::mem::size_of; +use std::sync::Arc; use prost::Message; use risingwave_common::catalog::TableId; @@ -30,7 +31,7 @@ pub struct HummockVersion { pub levels: HashMap, pub max_committed_epoch: u64, pub safe_epoch: u64, - pub table_watermarks: HashMap, + pub table_watermarks: HashMap>, } impl Default for HummockVersion { @@ -68,7 +69,7 @@ impl HummockVersion { .map(|(table_id, table_watermark)| { ( TableId::new(*table_id), - TableWatermarks::from_protobuf(table_watermark), + Arc::new(TableWatermarks::from_protobuf(table_watermark)), ) }) .collect(), diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index e7b03fa61c504..940eeeaabbc29 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -27,7 +27,9 @@ use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, +}; use risingwave_hummock_sdk::EpochWithGap; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; @@ -2184,21 +2186,23 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; let check_version_table_watermark = |version: PinnedVersion| { - let table_watermarks = version.table_watermark_index().get(&TEST_TABLE_ID).unwrap(); + let table_watermarks = TableWatermarksIndex::new_committed( + version + .version() + .table_watermarks + .get(&TEST_TABLE_ID) + .unwrap() + .clone(), + version.max_committed_epoch(), + ); assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); - let index = table_watermarks.index(); - assert_eq!(2, index.len()); - let vnode1_watermark = index.get(&vnode1).unwrap(); - assert_eq!(1, vnode1_watermark.len()); assert_eq!( - &gen_inner_key(watermark1), - vnode1_watermark.get(&epoch1).unwrap() + gen_inner_key(watermark1), + table_watermarks.read_watermark(vnode1, epoch1).unwrap() ); - let vnode2_watermark = index.get(&vnode2).unwrap(); - assert_eq!(1, vnode2_watermark.len()); assert_eq!( - &gen_inner_key(watermark1), - vnode2_watermark.get(&epoch1).unwrap() + gen_inner_key(watermark1), + table_watermarks.read_watermark(vnode2, epoch1).unwrap() ); }; diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index d00f64c42cee3..9e23af91781a5 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -550,9 +550,11 @@ impl SealedData { for (table_id, (direction, watermarks, _)) in unseal_epoch_data.table_watermarks { match self.table_watermarks.entry(table_id) { Entry::Occupied(mut entry) => { - entry - .get_mut() - .add_new_epoch_watermarks(epoch, watermarks, direction); + entry.get_mut().add_new_epoch_watermarks( + epoch, + Arc::from(watermarks), + direction, + ); } Entry::Vacant(entry) => { entry.insert(TableWatermarks::single_epoch(epoch, watermarks, direction)); diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 46ef8edc442b3..da9569e6bb83c 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -19,7 +19,6 @@ use std::time::{Duration, Instant}; use auto_enums::auto_enum; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::hummock::hummock_version::Levels; @@ -77,7 +76,6 @@ impl Drop for PinnedVersionGuard { pub struct PinnedVersion { version: Arc, compaction_group_index: Arc>, - table_watermark_index: Arc>, guard: Arc, } @@ -88,12 +86,10 @@ impl PinnedVersion { ) -> Self { let version_id = version.id; let compaction_group_index = version.build_compaction_group_info(); - let table_watermark_index = version.build_table_watermarks_index(); PinnedVersion { version: Arc::new(version), compaction_group_index: Arc::new(compaction_group_index), - table_watermark_index: Arc::new(table_watermark_index), guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, @@ -105,11 +101,7 @@ impl PinnedVersion { self.compaction_group_index.clone() } - pub fn table_watermark_index(&self) -> &Arc> { - &self.table_watermark_index - } - - pub(crate) fn new_pin_version(&self, version: HummockVersion) -> Self { + pub fn new_pin_version(&self, version: HummockVersion) -> Self { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", @@ -118,12 +110,10 @@ impl PinnedVersion { ); let version_id = version.id; let compaction_group_index = version.build_compaction_group_info(); - let table_watermark_index = version.build_table_watermarks_index(); PinnedVersion { version: Arc::new(version), compaction_group_index: Arc::new(compaction_group_index), - table_watermark_index: Arc::new(table_watermark_index), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f85d78479474a..7a9545dcf1736 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -28,6 +28,7 @@ use risingwave_common_service::observer_manager::{NotificationClient, ObserverMa use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; +use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; @@ -123,8 +124,9 @@ pub fn get_committed_read_version_tuple( mut key_range: TableKeyRange, epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { - if let Some(index) = version.table_watermark_index().get(&table_id) { - index.rewrite_range_with_table_watermark(epoch, &mut key_range) + if let Some(table_watermarks) = version.version().table_watermarks.get(&table_id) { + TableWatermarksIndex::new_committed(table_watermarks.clone(), version.max_committed_epoch()) + .rewrite_range_with_table_watermark(epoch, &mut key_range) } (key_range, (vec![], vec![], version)) } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 892671f9364de..6041f002680ed 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -233,9 +233,15 @@ impl HummockReadVersion { Self { table_id, table_watermarks: committed_version - .table_watermark_index() + .version() + .table_watermarks .get(&table_id) - .cloned(), + .map(|table_watermarks| { + TableWatermarksIndex::new_committed( + table_watermarks.clone(), + committed_version.max_committed_epoch(), + ) + }), staging: StagingVersion { imm: VecDeque::default(), sst: VecDeque::default(), @@ -380,13 +386,22 @@ impl HummockReadVersion { })); } - if let Some(committed_watermarks) = - self.committed.table_watermark_index().get(&self.table_id) + if let Some(committed_watermarks) = self + .committed + .version() + .table_watermarks + .get(&self.table_id) { if let Some(watermark_index) = &mut self.table_watermarks { - watermark_index.apply_committed_watermarks(committed_watermarks); + watermark_index.apply_committed_watermarks( + committed_watermarks.clone(), + self.committed.max_committed_epoch(), + ); } else { - self.table_watermarks = Some(committed_watermarks.clone()); + self.table_watermarks = Some(TableWatermarksIndex::new_committed( + committed_watermarks.clone(), + self.committed.max_committed_epoch(), + )); } } } @@ -399,7 +414,7 @@ impl HummockReadVersion { .get_or_insert_with(|| { TableWatermarksIndex::new(direction, self.committed.max_committed_epoch()) }) - .add_epoch_watermark(epoch, &vnode_watermarks, direction), + .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), } }