From e29fdecda3548834beb1de8b7c8b54c3835d8e8c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 1 Apr 2024 13:42:18 +0800 Subject: [PATCH 1/5] bump await tree to 0.2 Signed-off-by: Bugen Zhao --- Cargo.lock | 76 ++++++++++++++----- Cargo.toml | 2 +- .../src/rpc/service/monitor_service.rs | 18 ++--- src/compute/src/server.rs | 16 +--- src/storage/compactor/src/rpc.rs | 6 +- .../src/hummock/compactor/compactor_runner.rs | 1 - src/storage/src/hummock/compactor/context.rs | 5 +- .../compactor/shared_buffer_compact.rs | 2 +- .../event_handler/hummock_event_handler.rs | 2 +- .../src/hummock/event_handler/uploader.rs | 2 +- .../src/hummock/store/hummock_storage.rs | 5 +- src/stream/src/task/barrier_manager.rs | 8 +- .../src/task/barrier_manager/managed_state.rs | 14 ++-- src/stream/src/task/stream_manager.rs | 52 +++---------- 14 files changed, 96 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9decb29b1006e..c6827f3aad96e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,7 +234,7 @@ name = "apache-avro-derive" version = "0.17.0" source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ - "darling 0.20.3", + "darling 0.20.8", "proc-macro2", "quote", "serde_json", @@ -960,19 +960,20 @@ dependencies = [ [[package]] name = "await-tree" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325bcfc4b87d4aa36f1319b806bacc40fcefcaf43a12bd85a5a2f44fc14ce9de" +checksum = "0c2d7aec54383fa38ac2f9c28435a02f7312f7174e470c7d5566d2b7e17f9a8d" dependencies = [ "coarsetime", - "derive_builder", + "derive_builder 0.20.0", "flexstr", "indextree", - "itertools 0.10.5", + "itertools 0.12.0", "parking_lot 0.12.1", "pin-project", "tokio", "tracing", + "weak-table", ] [[package]] @@ -2866,12 +2867,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", + "darling_core 0.20.8", + "darling_macro 0.20.8", ] [[package]] @@ -2904,9 +2905,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" dependencies = [ "fnv", "ident_case", @@ -2940,11 +2941,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ - "darling_core 0.20.3", + "darling_core 0.20.8", "quote", "syn 2.0.48", ] @@ -3354,7 +3355,16 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" dependencies = [ - "derive_builder_macro", + "derive_builder_macro 0.12.0", +] + +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro 0.20.0", ] [[package]] @@ -3369,16 +3379,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling 0.20.8", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "derive_builder_macro" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" dependencies = [ - "derive_builder_core", + "derive_builder_core 0.12.0", "syn 1.0.109", ] +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core 0.20.0", + "syn 2.0.48", +] + [[package]] name = "derive_utils" version = "0.13.2" @@ -5157,7 +5189,7 @@ dependencies = [ "bytes", "chrono", "csv", - "derive_builder", + "derive_builder 0.12.0", "enum-display", "faster-hex", "futures", @@ -6327,7 +6359,7 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c60492b5eb751e55b42d716b6b26dceb66767996cd7a5560a842fbf613ca2e92" dependencies = [ - "darling 0.20.3", + "darling 0.20.8", "heck 0.4.1", "num-bigint", "proc-macro-crate 3.1.0", @@ -11337,7 +11369,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" dependencies = [ - "darling 0.20.3", + "darling 0.20.8", "proc-macro2", "quote", "syn 2.0.48", @@ -13800,6 +13832,12 @@ dependencies = [ "wast 201.0.0", ] +[[package]] +name = "weak-table" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "323f4da9523e9a669e1eaf9c6e763892769b1d38c623913647bfdc1532fe4549" + [[package]] name = "web-sys" version = "0.3.64" diff --git a/Cargo.toml b/Cargo.toml index 616f17bffc8f9..533af67edff65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = "0.6" -await-tree = "0.1.1" +await-tree = "0.2.1" aws-config = { version = "1", default-features = false, features = [ "behavior-version-latest", "rt-tokio", diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 95755ada1d862..705cf82331e6e 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -79,10 +79,9 @@ impl MonitorService for MonitorServiceImpl { .collect(); let rpc_traces = if let Some(m) = &self.grpc_await_tree_reg { - m.lock() - .await - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) + m.collect::() // TODO + .into_iter() + .map(|(k, v)| (k, v.to_string())) .collect() } else { Default::default() @@ -92,9 +91,9 @@ impl MonitorService for MonitorServiceImpl { self.stream_mgr.env.state_store().as_hummock() && let Some(m) = hummock.compaction_await_tree_reg() { - m.read() - .iter() - .map(|(k, v)| (k.clone(), v.to_string())) + m.collect::() + .into_iter() + .map(|(k, v)| (k, v.to_string())) .collect() } else { Default::default() @@ -296,12 +295,11 @@ pub mod grpc_middleware { use either::Either; use futures::Future; use hyper::Body; - use tokio::sync::Mutex; use tonic::transport::NamedService; use tower::{Layer, Service}; /// Manages the await-trees of `gRPC` requests that are currently served by the compute node. - pub type AwaitTreeRegistryRef = Arc>>; + pub type AwaitTreeRegistryRef = await_tree::Registry; #[derive(Clone)] pub struct AwaitTreeMiddlewareLayer { @@ -372,7 +370,7 @@ pub mod grpc_middleware { }; Either::Right(async move { - let root = registry.lock().await.register(key, req.uri().path()); + let root = registry.register(key, req.uri().path()); root.instrument(inner.call(req)).await }) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index f9cd24162f561..b8c556e877bb3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -75,9 +75,7 @@ use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS; use crate::rpc::service::exchange_service::ExchangeServiceImpl; use crate::rpc::service::health_service::HealthServiceImpl; -use crate::rpc::service::monitor_service::{ - AwaitTreeMiddlewareLayer, AwaitTreeRegistryRef, MonitorServiceImpl, -}; +use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl}; use crate::rpc::service::stream_service::StreamServiceImpl; use crate::telemetry::ComputeTelemetryCreator; use crate::ComputeNodeOpts; @@ -371,17 +369,7 @@ pub async fn compute_node_serve( memory_mgr.get_watermark_epoch(), ); - let grpc_await_tree_reg = await_tree_config - .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); - - // Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if - // this is not the case, we can use the following command to get it printed into the logs - // periodically. - // - // Comment out the following line to enable. - // TODO: may optionally enable based on the features - #[cfg(any())] - stream_mgr.clone().spawn_print_trace(); + let grpc_await_tree_reg = await_tree_config.map(|config| await_tree::Registry::new(config)); // Boot the runtime gRPC services. let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env); diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index a748d96190a17..974b5116cf721 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -85,9 +85,9 @@ impl MonitorService for MonitorServiceImpl { let compaction_task_traces = match &self.await_tree_reg { None => Default::default(), Some(await_tree_reg) => await_tree_reg - .read() - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) + .collect::() + .into_iter() + .map(|(k, v)| (k, v.to_string())) .collect(), }; Ok(Response::new(StackTraceResponse { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index a229ef6e15f93..866dd8e6b56d5 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -628,7 +628,6 @@ pub async fn compact( let traced = match context.await_tree_reg.as_ref() { None => runner.right_future(), Some(await_tree_reg) => await_tree_reg - .write() .register( format!("compact_runner/{}-{}", compact_task.task_id, split_index), format!( diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index fb748b67e8a55..9211ca0017d7f 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use more_asserts::assert_ge; -use parking_lot::RwLock; use super::task_progress::TaskProgressManagerRef; use crate::hummock::compactor::CompactionExecutor; @@ -25,10 +24,10 @@ use crate::hummock::MemoryLimiter; use crate::monitor::CompactorMetrics; use crate::opts::StorageOpts; -pub type CompactionAwaitTreeRegRef = Arc>>; +pub type CompactionAwaitTreeRegRef = await_tree::Registry; pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> CompactionAwaitTreeRegRef { - Arc::new(RwLock::new(await_tree::Registry::new(config))) + await_tree::Registry::new(config) } /// A `CompactorContext` describes the context of a compactor. diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index a94c66f56f4a0..7d2d00ac29ffd 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -186,7 +186,7 @@ async fn compact_shared_buffer( LazyLock::new(|| AtomicUsize::new(0)); let tree_root = context.await_tree_reg.as_ref().map(|reg| { let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed); - reg.write().register( + reg.register( format!("compact_shared_buffer/{}", id), format!( "Compact Shared Buffer: {:?}", diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cbbc3f656e8e0..d5083470ce319 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -245,7 +245,7 @@ impl HummockEventHandler { LazyLock::new(|| AtomicUsize::new(0)); let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| { let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed); - reg.write().register( + reg.register( format!("spawn_upload_task/{}", upload_task_id), format!("Spawn Upload Task: {}", task_info), ) diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index d00f64c42cee3..073dfc0a8749c 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -89,7 +89,7 @@ pub(crate) fn default_spawn_merging_task( LazyLock::new(|| AtomicUsize::new(0)); let tree_root = await_tree_reg.as_ref().map(|reg| { let merging_task_id = NEXT_MERGING_TASK_ID.fetch_add(1, Relaxed); - reg.write().register( + reg.register( format!("merging_task/{}", merging_task_id), format!( "Merging Imm {:?} {:?} {:?}", diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index cef5cca728a17..c59d2c0540e54 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -21,7 +21,6 @@ use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; use more_asserts::assert_gt; -use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; @@ -451,8 +450,8 @@ impl HummockStorage { self.backup_reader.clone() } - pub fn compaction_await_tree_reg(&self) -> Option<&RwLock>> { - self.compact_await_tree_reg.as_ref().map(AsRef::as_ref) + pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> { + self.compact_await_tree_reg.as_ref() } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 65fd141ac376c..170e105776f48 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -287,7 +287,7 @@ pub(crate) struct StreamActorManager { pub(super) watermark_epoch: AtomicU64Ref, /// Manages the await-trees of all actors. - pub(super) await_tree_reg: Option>>>, + pub(super) await_tree_reg: Option, /// Runtime for the streaming actors. pub(super) runtime: BackgroundShutdownRuntime, @@ -324,7 +324,7 @@ pub(super) struct LocalBarrierWorker { impl LocalBarrierWorker { pub(super) fn new( actor_manager: Arc, - barrier_await_tree_reg: Option>>>, + barrier_await_tree_reg: Option, ) -> Self { let (event_tx, event_rx) = unbounded_channel(); let (failure_tx, failure_rx) = unbounded_channel(); @@ -744,8 +744,8 @@ impl LocalBarrierWorker { pub fn spawn( env: StreamEnvironment, streaming_metrics: Arc, - await_tree_reg: Option>>>, - barrier_await_tree_reg: Option>>>, + await_tree_reg: Option, + barrier_await_tree_reg: Option, watermark_epoch: AtomicU64Ref, actor_op_rx: UnboundedReceiver, ) -> JoinHandle<()> { diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 9a17afc08a039..3a8578e28db5f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -23,7 +23,6 @@ use anyhow::anyhow; use await_tree::InstrumentAwait; use futures::stream::FuturesOrdered; use futures::{FutureExt, StreamExt}; -use parking_lot::Mutex; use prometheus::HistogramTimer; use risingwave_common::must_match; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -142,7 +141,7 @@ pub(super) struct ManagedBarrierState { await_epoch_completed_futures: FuturesOrdered, /// Manages the await-trees of all barriers. - barrier_await_tree_reg: Option>>>, + barrier_await_tree_reg: Option, } impl ManagedBarrierState { @@ -159,7 +158,7 @@ impl ManagedBarrierState { pub(super) fn new( state_store: StateStoreImpl, streaming_metrics: Arc, - barrier_await_tree_reg: Option>>>, + barrier_await_tree_reg: Option, ) -> Self { Self { epoch_barrier_state_map: BTreeMap::default(), @@ -171,11 +170,9 @@ impl ManagedBarrierState { } } - pub(super) fn reset_and_take_barrier_await_tree_reg( - &mut self, - ) -> Option>>> { + pub(super) fn reset_and_take_barrier_await_tree_reg(&mut self) -> Option { if let Some(reg) = &self.barrier_await_tree_reg { - reg.lock().clear(); + reg.clear(); } self.barrier_await_tree_reg.take() } @@ -300,8 +297,7 @@ impl ManagedBarrierState { }, ); if let Some(reg) = &self.barrier_await_tree_reg { - reg.lock() - .register(prev_epoch, format!("SyncEpoch({})", prev_epoch)) + reg.register(prev_epoch, format!("SyncEpoch({})", prev_epoch)) .instrument(future) .left_future() } else { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 7d6d584c736e2..94cdfc78a537c 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -13,9 +13,7 @@ // limitations under the License. use core::time::Duration; -use std::collections::HashMap; use std::fmt::Debug; -use std::io::Write; use std::mem::take; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -25,7 +23,6 @@ use async_recursion::async_recursion; use futures::stream::BoxStream; use futures::FutureExt; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; @@ -76,8 +73,8 @@ pub type AtomicU64Ref = Arc; /// `LocalStreamManager` manages all stream executors in this project. #[derive(Clone)] pub struct LocalStreamManager { - await_tree_reg: Option>>>, - barrier_await_tree_reg: Option>>>, + await_tree_reg: Option, + barrier_await_tree_reg: Option, pub env: StreamEnvironment, @@ -165,9 +162,9 @@ impl LocalStreamManager { ) -> Self { let await_tree_reg = await_tree_config .clone() - .map(|config| Arc::new(Mutex::new(await_tree::Registry::new(config)))); + .map(|config| await_tree::Registry::new(config)); let barrier_await_tree_reg = - await_tree_config.map(|config| Arc::new(Mutex::new(await_tree::Registry::new(config)))); + await_tree_config.map(|config| await_tree::Registry::new(config)); let (actor_op_tx, actor_op_rx) = unbounded_channel(); @@ -187,48 +184,18 @@ impl LocalStreamManager { } } - /// Print the traces of all actors periodically, used for debugging only. - pub fn spawn_print_trace(self: Arc) -> JoinHandle { - tokio::spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis(5000)).await; - let mut o = std::io::stdout().lock(); - - for (k, trace) in self - .await_tree_reg - .as_ref() - .expect("async stack trace not enabled") - .lock() - .iter() - { - writeln!(o, ">> Actor {}\n\n{}", k, trace).ok(); - } - - for (e, trace) in self - .barrier_await_tree_reg - .as_ref() - .expect("async stack trace not enabled") - .lock() - .iter() - { - writeln!(o, ">> Barrier {}\n\n{}", e, trace).ok(); - } - } - }) - } - /// Get await-tree contexts for all actors. - pub fn get_actor_traces(&self) -> HashMap { + pub fn get_actor_traces(&self) -> Vec<(ActorId, await_tree::Tree)> { match &self.await_tree_reg.as_ref() { - Some(mgr) => mgr.lock().iter().map(|(k, v)| (*k, v)).collect(), + Some(reg) => reg.collect::(), None => Default::default(), } } /// Get await-tree contexts for all barrier. - pub fn get_barrier_traces(&self) -> HashMap { + pub fn get_barrier_traces(&self) -> Vec<(u64, await_tree::Tree)> { match &self.barrier_await_tree_reg.as_ref() { - Some(mgr) => mgr.lock().iter().map(|(k, v)| (*k, v)).collect(), + Some(reg) => reg.collect::(), None => Default::default(), } } @@ -328,7 +295,7 @@ impl LocalBarrierWorker { } self.actor_manager_state.clear_state(); if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { - m.lock().clear(); + m.clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { store.clear_shared_buffer(prev_epoch).await; @@ -641,7 +608,6 @@ impl LocalBarrierWorker { }); let traced = match &self.actor_manager.await_tree_reg { Some(m) => m - .lock() .register(actor_id, trace_span) .instrument(actor) .left_future(), From 667bade0454237f86f487f554186e56f16671db2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 1 Apr 2024 14:11:25 +0800 Subject: [PATCH 2/5] simplify stream service with keyed registration Signed-off-by: Bugen Zhao --- .../src/rpc/service/monitor_service.rs | 52 +++++++++++-------- src/compute/src/rpc/service/stream_service.rs | 4 +- src/compute/src/server.rs | 11 ++-- src/stream/src/task/barrier_manager.rs | 16 ++---- .../src/task/barrier_manager/managed_state.rs | 18 +++---- src/stream/src/task/stream_manager.rs | 36 ++++++------- 6 files changed, 61 insertions(+), 76 deletions(-) diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 705cf82331e6e..9c2bad2ea9e50 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -30,6 +30,7 @@ use risingwave_pb::monitor_service::{ }; use risingwave_rpc_client::error::ToTonicStatus; use risingwave_stream::executor::monitor::global_streaming_metrics; +use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait}; use risingwave_stream::task::LocalStreamManager; use thiserror_ext::AsReport; use tonic::{Code, Request, Response, Status}; @@ -37,19 +38,13 @@ use tonic::{Code, Request, Response, Status}; #[derive(Clone)] pub struct MonitorServiceImpl { stream_mgr: LocalStreamManager, - grpc_await_tree_reg: Option, server_config: ServerConfig, } impl MonitorServiceImpl { - pub fn new( - stream_mgr: LocalStreamManager, - grpc_await_tree_reg: Option, - server_config: ServerConfig, - ) -> Self { + pub fn new(stream_mgr: LocalStreamManager, server_config: ServerConfig) -> Self { Self { stream_mgr, - grpc_await_tree_reg, server_config, } } @@ -64,24 +59,28 @@ impl MonitorService for MonitorServiceImpl { ) -> Result, Status> { let _req = request.into_inner(); - let actor_traces = self - .stream_mgr - .get_actor_traces() - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(); + let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() + .into_iter() + .map(|(k, v)| (k.0, v.to_string())) + .collect() + } else { + Default::default() + }; - let barrier_traces = self - .stream_mgr - .get_barrier_traces() - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(); + let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() + .into_iter() + .map(|(k, v)| (k.prev_epoch, v.to_string())) + .collect() + } else { + Default::default() + }; - let rpc_traces = if let Some(m) = &self.grpc_await_tree_reg { - m.collect::() // TODO + let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() .into_iter() - .map(|(k, v)| (k, v.to_string())) + .map(|(k, v)| (k.desc, v.to_string())) .collect() } else { Default::default() @@ -301,6 +300,12 @@ pub mod grpc_middleware { /// Manages the await-trees of `gRPC` requests that are currently served by the compute node. pub type AwaitTreeRegistryRef = await_tree::Registry; + /// Await-tree key type for gRPC calls. + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + pub struct GrpcCall { + pub desc: String, + } + #[derive(Clone)] pub struct AwaitTreeMiddlewareLayer { registry: Option, @@ -363,11 +368,12 @@ pub mod grpc_middleware { let mut inner = std::mem::replace(&mut self.inner, clone); let id = self.next_id.fetch_add(1, Ordering::SeqCst); - let key = if let Some(authority) = req.uri().authority() { + let desc = if let Some(authority) = req.uri().authority() { format!("{authority} - {id}") } else { format!("?? - {id}") }; + let key = GrpcCall { desc }; Either::Right(async move { let root = registry.register(key, req.uri().path()); diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 18b77ff1804bc..2222f09e45f3d 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -26,8 +26,8 @@ use tonic::{Request, Response, Status, Streaming}; #[derive(Clone)] pub struct StreamServiceImpl { - mgr: LocalStreamManager, - env: StreamEnvironment, + pub mgr: LocalStreamManager, + pub env: StreamEnvironment, } impl StreamServiceImpl { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index b8c556e877bb3..dac0afcb62e7d 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -369,18 +369,12 @@ pub async fn compute_node_serve( memory_mgr.get_watermark_epoch(), ); - let grpc_await_tree_reg = await_tree_config.map(|config| await_tree::Registry::new(config)); - // Boot the runtime gRPC services. let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env); let exchange_srv = ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics); let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone()); - let monitor_srv = MonitorServiceImpl::new( - stream_mgr.clone(), - grpc_await_tree_reg.clone(), - config.server.clone(), - ); + let monitor_srv = MonitorServiceImpl::new(stream_mgr.clone(), config.server.clone()); let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr); let health_srv = HealthServiceImpl::new(); @@ -412,6 +406,7 @@ pub async fn compute_node_serve( ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX), ) .add_service({ + let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned(); let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX); #[cfg(madsim)] @@ -420,7 +415,7 @@ pub async fn compute_node_serve( } #[cfg(not(madsim))] { - AwaitTreeMiddlewareLayer::new_optional(grpc_await_tree_reg).layer(srv) + AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv) } }) .add_service(MonitorServiceServer::new(monitor_srv)) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 170e105776f48..5cd76eda6215d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -322,10 +322,7 @@ pub(super) struct LocalBarrierWorker { } impl LocalBarrierWorker { - pub(super) fn new( - actor_manager: Arc, - barrier_await_tree_reg: Option, - ) -> Self { + pub(super) fn new(actor_manager: Arc) -> Self { let (event_tx, event_rx) = unbounded_channel(); let (failure_tx, failure_rx) = unbounded_channel(); let shared_context = Arc::new(SharedContext::new( @@ -342,7 +339,7 @@ impl LocalBarrierWorker { state: ManagedBarrierState::new( actor_manager.env.state_store(), actor_manager.streaming_metrics.clone(), - barrier_await_tree_reg, + actor_manager.await_tree_reg.clone(), ), control_stream_handle: ControlStreamHandle::empty(), actor_manager, @@ -678,10 +675,7 @@ impl LocalBarrierWorker { /// Reset all internal states. pub(super) fn reset_state(&mut self) { - *self = Self::new( - self.actor_manager.clone(), - self.state.reset_and_take_barrier_await_tree_reg(), - ); + *self = Self::new(self.actor_manager.clone()); } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report @@ -745,7 +739,6 @@ impl LocalBarrierWorker { env: StreamEnvironment, streaming_metrics: Arc, await_tree_reg: Option, - barrier_await_tree_reg: Option, watermark_epoch: AtomicU64Ref, actor_op_rx: UnboundedReceiver, ) -> JoinHandle<()> { @@ -768,7 +761,7 @@ impl LocalBarrierWorker { await_tree_reg, runtime: runtime.into(), }); - let worker = LocalBarrierWorker::new(actor_manager, barrier_await_tree_reg); + let worker = LocalBarrierWorker::new(actor_manager); tokio::spawn(worker.run(actor_op_rx)) } } @@ -870,7 +863,6 @@ impl LocalBarrierManager { StreamEnvironment::for_test(), Arc::new(StreamingMetrics::unused()), None, - None, Arc::new(AtomicU64::new(0)), rx, ); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 3a8578e28db5f..ed192b39dfe5f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -38,7 +38,7 @@ use super::BarrierCompleteResult; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::ActorId; +use crate::task::{await_tree_key, ActorId}; /// The state machine of local barrier manager. #[derive(Debug)] @@ -170,13 +170,6 @@ impl ManagedBarrierState { } } - pub(super) fn reset_and_take_barrier_await_tree_reg(&mut self) -> Option { - if let Some(reg) = &self.barrier_await_tree_reg { - reg.clear(); - } - self.barrier_await_tree_reg.take() - } - pub fn read_barrier_mutation( &mut self, barrier: &Barrier, @@ -297,9 +290,12 @@ impl ManagedBarrierState { }, ); if let Some(reg) = &self.barrier_await_tree_reg { - reg.register(prev_epoch, format!("SyncEpoch({})", prev_epoch)) - .instrument(future) - .left_future() + reg.register( + await_tree_key::BarrierAwait { prev_epoch }, + format!("SyncEpoch({})", prev_epoch), + ) + .instrument(future) + .left_future() } else { future.right_future() } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 94cdfc78a537c..c079bbe26e486 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -70,11 +70,22 @@ pub type ActorHandle = JoinHandle<()>; pub type AtomicU64Ref = Arc; +pub mod await_tree_key { + /// Await-tree key type for actors. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct Actor(pub crate::task::ActorId); + + /// Await-tree key type for barriers. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct BarrierAwait { + pub prev_epoch: u64, + } +} + /// `LocalStreamManager` manages all stream executors in this project. #[derive(Clone)] pub struct LocalStreamManager { await_tree_reg: Option, - barrier_await_tree_reg: Option, pub env: StreamEnvironment, @@ -163,8 +174,6 @@ impl LocalStreamManager { let await_tree_reg = await_tree_config .clone() .map(|config| await_tree::Registry::new(config)); - let barrier_await_tree_reg = - await_tree_config.map(|config| await_tree::Registry::new(config)); let (actor_op_tx, actor_op_rx) = unbounded_channel(); @@ -172,32 +181,19 @@ impl LocalStreamManager { env.clone(), streaming_metrics, await_tree_reg.clone(), - barrier_await_tree_reg.clone(), watermark_epoch, actor_op_rx, ); Self { await_tree_reg, - barrier_await_tree_reg, env, actor_op_tx: EventSender(actor_op_tx), } } - /// Get await-tree contexts for all actors. - pub fn get_actor_traces(&self) -> Vec<(ActorId, await_tree::Tree)> { - match &self.await_tree_reg.as_ref() { - Some(reg) => reg.collect::(), - None => Default::default(), - } - } - - /// Get await-tree contexts for all barrier. - pub fn get_barrier_traces(&self) -> Vec<(u64, await_tree::Tree)> { - match &self.barrier_await_tree_reg.as_ref() { - Some(reg) => reg.collect::(), - None => Default::default(), - } + /// Get the registry of await-trees. + pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> { + self.await_tree_reg.as_ref() } /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream @@ -608,7 +604,7 @@ impl LocalBarrierWorker { }); let traced = match &self.actor_manager.await_tree_reg { Some(m) => m - .register(actor_id, trace_span) + .register(await_tree_key::Actor(actor_id), trace_span) .instrument(actor) .left_future(), None => actor.right_future(), From 1fe134669a04087c0c50ce3ec9abf6bdbf28efc5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 1 Apr 2024 15:17:53 +0800 Subject: [PATCH 3/5] typed compactor tasks Signed-off-by: Bugen Zhao --- src/compute/src/rpc/service/monitor_service.rs | 5 +++-- src/storage/compactor/src/rpc.rs | 5 +++-- .../src/hummock/compactor/compactor_runner.rs | 8 ++++++-- src/storage/src/hummock/compactor/context.rs | 13 +++++++++++++ src/storage/src/hummock/compactor/mod.rs | 4 +++- .../src/hummock/compactor/shared_buffer_compact.rs | 4 ++-- .../hummock/event_handler/hummock_event_handler.rs | 4 ++-- src/storage/src/hummock/event_handler/uploader.rs | 6 ++++-- 8 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 9c2bad2ea9e50..d2542ca9bd085 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -29,6 +29,7 @@ use risingwave_pb::monitor_service::{ StackTraceResponse, }; use risingwave_rpc_client::error::ToTonicStatus; +use risingwave_storage::hummock::compactor::await_tree_key::Compaction; use risingwave_stream::executor::monitor::global_streaming_metrics; use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait}; use risingwave_stream::task::LocalStreamManager; @@ -90,9 +91,9 @@ impl MonitorService for MonitorServiceImpl { self.stream_mgr.env.state_store().as_hummock() && let Some(m) = hummock.compaction_await_tree_reg() { - m.collect::() + m.collect::() .into_iter() - .map(|(k, v)| (k, v.to_string())) + .map(|(k, v)| (format!("{k:?}"), v.to_string())) .collect() } else { Default::default() diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index 974b5116cf721..d74f8f977f5b1 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -23,6 +23,7 @@ use risingwave_pb::monitor_service::{ ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, }; +use risingwave_storage::hummock::compactor::await_tree_key::Compaction; use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; @@ -85,9 +86,9 @@ impl MonitorService for MonitorServiceImpl { let compaction_task_traces = match &self.await_tree_reg { None => Default::default(), Some(await_tree_reg) => await_tree_reg - .collect::() + .collect::() .into_iter() - .map(|(k, v)| (k, v.to_string())) + .map(|(k, v)| (format!("{k:?}"), v.to_string())) .collect(), }; Ok(Response::new(StackTraceResponse { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 866dd8e6b56d5..591e478f76fe9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -43,7 +43,8 @@ use crate::hummock::compactor::compaction_utils::{ use crate::hummock::compactor::iterator::ConcatSstableIterator; use crate::hummock::compactor::task_progress::TaskProgressGuard; use crate::hummock::compactor::{ - fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, CompactorContext, + await_tree_key, fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, + CompactorContext, }; use crate::hummock::iterator::{ Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, ValueMeta, @@ -629,7 +630,10 @@ pub async fn compact( None => runner.right_future(), Some(await_tree_reg) => await_tree_reg .register( - format!("compact_runner/{}-{}", compact_task.task_id, split_index), + await_tree_key::CompactRunner { + task_id: compact_task.task_id, + split_index, + }, format!( "Compaction Task {} Split {} ", compact_task.task_id, split_index diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 9211ca0017d7f..4525dcdc773fb 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -30,6 +30,19 @@ pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> Compacti await_tree::Registry::new(config) } +pub mod await_tree_key { + /// Await-tree key type for compaction tasks. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub enum Compaction { + CompactRunner { task_id: u64, split_index: usize }, + CompactSharedBuffer { id: usize }, + SpawnUploadTask { id: usize }, + MergingTask { id: usize }, + } + + pub use Compaction::*; +} + /// A `CompactorContext` describes the context of a compactor. #[derive(Clone)] pub struct CompactorContext { diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 4540af2ea0749..c18828bc93e72 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -45,7 +45,9 @@ pub use compaction_filter::{ CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter, TtlCompactionFilter, }; -pub use context::{new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactorContext}; +pub use context::{ + await_tree_key, new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactorContext, +}; use futures::{pin_mut, StreamExt}; pub use iterator::{ConcatSstableIterator, SstableStreamIterator}; use more_asserts::assert_ge; diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 7d2d00ac29ffd..9c25eee366bda 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -36,7 +36,7 @@ use tracing::{error, warn}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_filter::DummyCompactionFilter; -use crate::hummock::compactor::context::CompactorContext; +use crate::hummock::compactor::context::{await_tree_key, CompactorContext}; use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor}; use crate::hummock::event_handler::uploader::UploadTaskPayload; use crate::hummock::event_handler::LocalInstanceId; @@ -187,7 +187,7 @@ async fn compact_shared_buffer( let tree_root = context.await_tree_reg.as_ref().map(|reg| { let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed); reg.register( - format!("compact_shared_buffer/{}", id), + await_tree_key::CompactSharedBuffer { id }, format!( "Compact Shared Buffer: {:?}", payload diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index d5083470ce319..fcfcac53a478e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -39,7 +39,7 @@ use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; -use crate::hummock::compactor::{compact, CompactorContext}; +use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ @@ -246,7 +246,7 @@ impl HummockEventHandler { let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| { let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed); reg.register( - format!("spawn_upload_task/{}", upload_task_id), + await_tree_key::SpawnUploadTask { id: upload_task_id }, format!("Spawn Upload Task: {}", task_info), ) }); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 073dfc0a8749c..0f5a89bb1023f 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -42,7 +42,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info}; use crate::hummock::compactor::{ - merge_imms_in_memory, CompactionAwaitTreeRegRef, CompactionExecutor, + await_tree_key, merge_imms_in_memory, CompactionAwaitTreeRegRef, CompactionExecutor, }; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::LocalInstanceId; @@ -90,7 +90,9 @@ pub(crate) fn default_spawn_merging_task( let tree_root = await_tree_reg.as_ref().map(|reg| { let merging_task_id = NEXT_MERGING_TASK_ID.fetch_add(1, Relaxed); reg.register( - format!("merging_task/{}", merging_task_id), + await_tree_key::MergingTask { + id: merging_task_id, + }, format!( "Merging Imm {:?} {:?} {:?}", table_id, From 726c6f6b12a9153607db8bfc0df39906ca86cda0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 1 Apr 2024 15:22:37 +0800 Subject: [PATCH 4/5] fix clippy Signed-off-by: Bugen Zhao --- src/stream/src/task/stream_manager.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index c079bbe26e486..0c946ae36c853 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -171,9 +171,7 @@ impl LocalStreamManager { await_tree_config: Option, watermark_epoch: AtomicU64Ref, ) -> Self { - let await_tree_reg = await_tree_config - .clone() - .map(|config| await_tree::Registry::new(config)); + let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new); let (actor_op_tx, actor_op_rx) = unbounded_channel(); From 63c4815eb223e1974bd78c23173138e1476148c2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 14:03:18 +0800 Subject: [PATCH 5/5] update cargo lock Signed-off-by: Bugen Zhao --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8df6a267825fd..bc3ebad2e8640 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -967,7 +967,7 @@ dependencies = [ "derive_builder 0.20.0", "flexstr", "indextree", - "itertools 0.12.0", + "itertools 0.12.1", "parking_lot 0.12.1", "pin-project", "tokio", @@ -3482,7 +3482,7 @@ dependencies = [ "darling 0.20.8", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.57", ] [[package]] @@ -3502,7 +3502,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" dependencies = [ "derive_builder_core 0.20.0", - "syn 2.0.48", + "syn 2.0.57", ] [[package]]