Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat(storage): non_pk_watermark state clean #19889

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message TableWatermarks {

// The direction of the table watermark.
bool is_ascending = 2;

// The table watermark is non-pk prefix table watermark.
bool is_non_pk_prefix = 3;
}

message EpochNewChangeLog {
Expand Down Expand Up @@ -194,6 +197,7 @@ message HummockVersion {
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
// map<uint32, TableWatermarks> non_pk_prefix_table_watermarks = 8;
}

message HummockVersionDelta {
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/util/row_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ impl OrderedRowSerde {
}
}

#[must_use]
pub fn index(&self, idx: usize) -> Cow<'_, Self> {
if 1 == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
schema: vec![self.schema[idx].clone()],
order_types: vec![self.order_types[idx]],
})
}
}

/// Note: prefer [`Row::memcmp_serialize`] if possible.
pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
self.serialize_datums(row.iter(), append_to)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl CompactStatus {
pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type == TaskType::VnodeWatermark {
assert!(task.input_ssts.len() == 2);
assert!(task.input_ssts[1].table_infos.is_empty());
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn should_delete_key_by_watermark(
let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
return false;
};
watermark.direction.filter_by_watermark(key, w)
watermark.direction.filter_by_watermark_key(key, w)
}

#[cfg(test)]
Expand Down
41 changes: 40 additions & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use itertools::Itertools;
use parking_lot::Mutex;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
use risingwave_hummock_sdk::compaction_group::StateTableId;
Expand Down Expand Up @@ -728,6 +729,44 @@ impl HummockManager {
}
}

// Filter out the table that has a primary key prefix watermark.
let table_id_with_pk_prefix_watermark: HashSet<_> = self
.metadata_manager
.catalog_controller
.get_table_by_ids(
version
.latest_version()
.table_watermarks
.keys()
.map(|id| id.table_id() as _)
.collect(),
)
.await
.map_err(|e| Error::Internal(e.into()))?
.into_iter()
.filter_map(|table| {
// pk prefix watermark.
if table.watermark_indices.is_empty() || table.watermark_indices[0] == 0 {
Some(TableId::from(table.get_id()))
} else {
None
}
})
.collect();

let table_watermarks = version
.latest_version()
.table_watermarks
.iter()
.filter_map(|(table_id, table_watermarks)| {
if table_id_with_pk_prefix_watermark.contains(table_id) {
Some((*table_id, table_watermarks.clone()))
} else {
None
}
})
.collect();

while let Some(compact_task) = compact_status.get_compact_task(
version
.latest_version()
Expand All @@ -742,7 +781,7 @@ impl HummockManager {
selector,
&table_id_to_option,
developer_config.clone(),
&version.latest_version().table_watermarks,
&table_watermarks,
&version.latest_version().state_table_info,
) {
let target_level_id = compact_task.input.target_level as u32;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ async fn build_table(
},
);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
Expand Down Expand Up @@ -186,11 +188,14 @@ async fn build_table_2(
);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{
Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
};
Expand Down Expand Up @@ -111,6 +113,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let merge_iter = RefCell::new(SkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
BTreeMap::new(),
Arc::new(CompactionCatalogAgent::dummy()),
));
c.bench_with_input(
BenchmarkId::new("bench-merge-iter-skip-empty-watermark", "unordered"),
Expand Down
11 changes: 10 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let builder = SstableBuilder::for_test(
id,
writer,
self.options.clone(),
table_id_to_vnode,
table_id_to_watermark_serde,
);

Ok(builder)
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks(
})
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
bincode = { version = "=2.0.0-rc.3", features = ["serde"] }
bytes = "1"
hex = "0.4"
itertools = { workspace = true }
Expand Down
45 changes: 44 additions & 1 deletion src/storage/hummock_sdk/src/compact_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem::size_of;

use itertools::Itertools;
Expand All @@ -22,6 +22,7 @@ use risingwave_pb::hummock::{
PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats, PbValidationTask,
};

use crate::compaction_group::StateTableId;
use crate::key_range::KeyRange;
use crate::level::InputLevel;
use crate::sstable_info::SstableInfo;
Expand Down Expand Up @@ -114,6 +115,48 @@ impl CompactTask {
}
}

impl CompactTask {
// The compact task may need to reclaim key with TTL
pub fn is_contains_ttl(&self) -> bool {
self.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
}

// The compact task may need to reclaim key with range tombstone
pub fn is_contains_range_tombstone(&self) -> bool {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0)
}

// The compact task may need to reclaim key with split sst
pub fn is_contains_split_sst(&self) -> bool {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.sst_id != sst.object_id)
}

pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.sorted()
.unique()
}

// filter the table-id that in existing_table_ids with the table-id in compact-task
pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
let existing_table_ids: HashSet<u32> = HashSet::from_iter(self.existing_table_ids.clone());
self.get_table_ids_from_input_ssts()
.filter(|table_id| existing_table_ids.contains(table_id))
.collect()
}
}

impl From<PbCompactTask> for CompactTask {
#[expect(deprecated)]
fn from(pb_compact_task: PbCompactTask) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub fn safe_epoch_table_watermarks_impl(
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
watermark_type: table_watermarks.watermark_type,
})
} else {
None
Expand Down
Loading
Loading