Skip to content

Commit

Permalink
refactor(meta): proactively GC more stale objects. (#19474)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 21, 2024
1 parent 3fdd6a5 commit ad1d929
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 77 deletions.
2 changes: 2 additions & 0 deletions src/meta/model/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod m20240911_083152_variable_vnode_count;
mod m20241016_065621_hummock_gc_history;
mod m20241025_062548_singleton_vnode_count;
mod m20241115_085007_remove_function_type;
mod m20241120_182555_hummock_add_time_travel_sst_index;
mod utils;

pub struct Migrator;
Expand Down Expand Up @@ -88,6 +89,7 @@ impl MigratorTrait for Migrator {
Box::new(m20241016_065621_hummock_gc_history::Migration),
Box::new(m20241025_062548_singleton_vnode_count::Migration),
Box::new(m20241115_085007_remove_function_type::Migration),
Box::new(m20241120_182555_hummock_add_time_travel_sst_index::Migration),
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl MigrationTrait for Migration {
}

#[derive(DeriveIden)]
enum HummockSstableInfo {
pub(crate) enum HummockSstableInfo {
Table,
SstId,
ObjectId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use sea_orm_migration::prelude::*;

use crate::m20240701_060504_hummock_time_travel::HummockSstableInfo;

#[derive(DeriveMigrationName)]
pub struct Migration;

const IDX_HUMMOCK_SSTABLE_INFO_OBJECT_ID: &str = "idx_hummock_sstable_info_object_id";

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Index::create()
.table(HummockSstableInfo::Table)
.name(IDX_HUMMOCK_SSTABLE_INFO_OBJECT_ID)
.col(HummockSstableInfo::ObjectId)
.to_owned(),
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.table(HummockSstableInfo::Table)
.name(IDX_HUMMOCK_SSTABLE_INFO_OBJECT_ID)
.to_owned(),
)
.await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ pub async fn start_service_as_election_leader(
// sub_tasks executed concurrently. Can be shutdown via shutdown_all
sub_tasks.extend(hummock::start_hummock_workers(
hummock_manager.clone(),
// compaction_scheduler,
backup_manager.clone(),
&env.opts,
));
sub_tasks.push(start_worker_info_monitor(
Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,17 @@ impl HummockManager {
.collect(),
});
}
// We can directly discard reference to stale objects that will no longer be used.
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
let may_delete_object = stale_objects
.iter()
.filter_map(|(version_id, object_ids)| {
if *version_id >= min_pinned_version_id {
return None;
}
Some(object_ids.id.clone())
})
.flatten();
self.gc_manager.add_may_delete_object_ids(may_delete_object);
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
let new_checkpoint = HummockVersionCheckpoint {
version: current_version.clone(),
Expand Down
102 changes: 77 additions & 25 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(crate) struct GcManager {
store: ObjectStoreRef,
path_prefix: String,
use_new_object_prefix_strategy: bool,
/// These objects may still be used by backup or time travel.
may_delete_object_ids: parking_lot::Mutex<HashSet<HummockSstableObjectId>>,
}

impl GcManager {
Expand All @@ -60,6 +62,7 @@ impl GcManager {
store,
path_prefix: path_prefix.to_owned(),
use_new_object_prefix_strategy,
may_delete_object_ids: Default::default(),
}
}

Expand Down Expand Up @@ -101,7 +104,7 @@ impl GcManager {
prefix: Option<String>,
start_after: Option<String>,
limit: Option<u64>,
) -> Result<(Vec<HummockSstableObjectId>, u64, u64, Option<String>)> {
) -> Result<(HashSet<HummockSstableObjectId>, u64, u64, Option<String>)> {
tracing::debug!(
sst_retention_watermark,
prefix,
Expand Down Expand Up @@ -139,7 +142,7 @@ impl GcManager {
};
async move { result }
})
.try_collect::<Vec<HummockSstableObjectId>>()
.try_collect::<HashSet<HummockSstableObjectId>>()
.await?;
Ok((
filtered,
Expand All @@ -148,6 +151,28 @@ impl GcManager {
next_start_after,
))
}

pub fn add_may_delete_object_ids(
&self,
may_delete_object_ids: impl Iterator<Item = HummockSstableObjectId>,
) {
self.may_delete_object_ids
.lock()
.extend(may_delete_object_ids);
}

/// Takes if `least_count` elements available.
pub fn try_take_may_delete_object_ids(
&self,
least_count: usize,
) -> Option<HashSet<HummockSstableObjectId>> {
let mut guard = self.may_delete_object_ids.lock();
if guard.len() < least_count {
None
} else {
Some(std::mem::take(&mut *guard))
}
}
}

impl HummockManager {
Expand Down Expand Up @@ -295,14 +320,23 @@ impl HummockManager {
tracing::info!(total_object_count, total_object_size, "Finish GC");
self.metrics.total_object_size.set(total_object_size as _);
self.metrics.total_object_count.set(total_object_count as _);
match self.time_travel_pinned_object_count().await {
Ok(count) => {
self.metrics.time_travel_object_count.set(count as _);
}
Err(err) => {
use thiserror_ext::AsReport;
tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
}
}
Ok(())
}

/// Given candidate SSTs to delete, filter out false positive.
/// Returns number of SSTs to delete.
pub(crate) async fn complete_gc_batch(
&self,
object_ids: Vec<HummockSstableObjectId>,
object_ids: HashSet<HummockSstableObjectId>,
backup_manager: Option<BackupManagerRef>,
) -> Result<usize> {
if object_ids.is_empty() {
Expand All @@ -324,31 +358,23 @@ impl HummockManager {
metrics
.full_gc_candidate_object_count
.observe(candidate_object_number as _);
let pinned_object_ids = self
.all_object_ids_in_time_travel()
.await?
.collect::<HashSet<_>>();
self.metrics
.time_travel_object_count
.set(pinned_object_ids.len() as _);
// filter by SST id watermark, i.e. minimum id of uncommitted SSTs reported by compute nodes.
// filter by metadata backup
let object_ids = object_ids
.into_iter()
.filter(|id| *id < min_sst_id)
.filter(|s| !pinned_by_metadata_backup.contains(s))
.collect_vec();
let after_min_sst_id = object_ids.len();
let after_metadata_backup = object_ids.len();
// filter by time travel archive
let object_ids = object_ids
.into_iter()
.filter(|s| !pinned_object_ids.contains(s))
.collect_vec();
let object_ids = self
.filter_out_objects_by_time_travel(object_ids.into_iter())
.await?;
let after_time_travel = object_ids.len();
// filter by metadata backup
// filter by SST id watermark, i.e. minimum id of uncommitted SSTs reported by compute nodes.
let object_ids = object_ids
.into_iter()
.filter(|s| !pinned_by_metadata_backup.contains(s))
.filter(|id| *id < min_sst_id)
.collect_vec();
let after_metadata_backup = object_ids.len();
let after_min_sst_id = object_ids.len();
// filter by version
let after_version = self
.finalize_objects_to_delete(object_ids.into_iter())
Expand All @@ -359,9 +385,9 @@ impl HummockManager {
.observe(after_version_count as _);
tracing::info!(
candidate_object_number,
after_min_sst_id,
after_time_travel,
after_metadata_backup,
after_time_travel,
after_min_sst_id,
after_version_count,
"complete gc batch"
);
Expand Down Expand Up @@ -501,6 +527,27 @@ impl HummockManager {
}
Ok(total)
}

/// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use.
pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
let Some(object_ids) = self
.gc_manager
.try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
else {
return Ok(());
};
// Objects pinned by either meta backup or time travel should be filtered out.
let backup_pinned: HashSet<_> = backup_manager.list_pinned_ssts();
let object_ids = object_ids
.into_iter()
.filter(|s| !backup_pinned.contains(s));
let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
// Retry is not necessary. Full GC will handle these objects eventually.
self.delete_objects(object_ids.into_iter().collect())
.await?;
Ok(())
}
}

async fn collect_min_uncommitted_sst_id(
Expand Down Expand Up @@ -580,7 +627,7 @@ mod tests {

// Empty input results immediate return, without waiting heartbeat.
hummock_manager
.complete_gc_batch(vec![], None)
.complete_gc_batch(vec![].into_iter().collect(), None)
.await
.unwrap();

Expand All @@ -590,7 +637,9 @@ mod tests {
3,
hummock_manager
.complete_gc_batch(
vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64],
vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
.into_iter()
.collect(),
None,
)
.await
Expand All @@ -616,7 +665,10 @@ mod tests {
1,
hummock_manager
.complete_gc_batch(
[committed_object_ids, vec![max_committed_object_id + 1]].concat(),
[committed_object_ids, vec![max_committed_object_id + 1]]
.concat()
.into_iter()
.collect(),
None,
)
.await
Expand Down
Loading

0 comments on commit ad1d929

Please sign in to comment.