From a9f2cc89dac4ae3172ba9ef5a3e4e34f931790c6 Mon Sep 17 00:00:00 2001 From: Joey Kraut Date: Fri, 13 Dec 2024 12:39:33 -0800 Subject: [PATCH] state: storage: tx: Fix trace nesting in db transactions --- Cargo.lock | 1 + Cargo.toml | 1 + state/Cargo.toml | 1 + state/src/interface/mod.rs | 29 ++++++++++++++++++++++------- state/src/storage/db.rs | 5 +++++ state/src/storage/tx/mod.rs | 1 + util/Cargo.toml | 2 +- workers/network-manager/Cargo.toml | 2 +- 8 files changed, 33 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6386733fe..b9a7af0d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9068,6 +9068,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tracing-opentelemetry", "tracing-slog", "tui", "tui-logger", diff --git a/Cargo.toml b/Cargo.toml index 22787cd78..4ffabc62b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ itertools = "0.10" serde = { version = "1.0" } serde_json = "1.0.64" tracing = "0.1" +tracing-opentelemetry = "0.22" metrics = "=0.22.3" lazy_static = "1.4" diff --git a/state/Cargo.toml b/state/Cargo.toml index 0f361d6fc..091dfd89e 100644 --- a/state/Cargo.toml +++ b/state/Cargo.toml @@ -67,6 +67,7 @@ serde_json = "1.0" slog = { version = "2.2", features = ["max_level_trace"] } tempfile = { version = "3.8", optional = true } tracing = { workspace = true, features = ["log"] } +tracing-opentelemetry = { workspace = true } tracing-slog = "0.2" tui = "0.19" tui-logger = "0.8" diff --git a/state/src/interface/mod.rs b/state/src/interface/mod.rs index f1d72b0f3..71987959b 100644 --- a/state/src/interface/mod.rs +++ b/state/src/interface/mod.rs @@ -25,7 +25,8 @@ use job_types::{ use libmdbx::{RO, RW}; use system_bus::SystemBus; use system_clock::SystemClock; -use tracing::{error, info_span, Instrument}; +use tracing::{error, info_span, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use util::{err_str, raw_err_str}; use crate::{ @@ -302,16 +303,23 @@ impl StateInner { F: FnOnce(&StateTxn<'_, RO>) -> Result + Send + 'static, { let db = self.db.clone(); + let thread_span = info_span!("db_read_thread"); + tokio::task::spawn_blocking(move || { + // Enter the thread_span to set it as the current span + let _thread_span_guard = thread_span.enter(); + + // Create a new read tx let tx = db.new_read_tx()?; - let _fn_span = tracing::info_span!("db_read_operation").entered(); + + // Execute the operation + let op_span = info_span!("db_read_operation").entered(); let res = f(&tx)?; - drop(_fn_span); // End the operation span before committing + drop(op_span); // End the operation span before committing tx.commit()?; Ok(res) }) - .instrument(info_span!("db_read_thread")) .await .map_err(err_str!(StateError::Runtime))? } @@ -327,16 +335,23 @@ impl StateInner { F: FnOnce(&StateTxn<'_, RW>) -> Result + Send + 'static, { let db = self.db.clone(); + let thread_span = info_span!("db_write_thread"); + tokio::task::spawn_blocking(move || { + // Enter the thread_span to set it as the current span + let _thread_span_guard = thread_span.enter(); + + // Create a new write tx let tx = db.new_write_tx()?; - let _fn_span = tracing::info_span!("db_write_operation").entered(); + + // Execute the operation + let op_span = info_span!("db_write_operation").entered(); let res = f(&tx)?; - drop(_fn_span); // End the operation span before committing + drop(op_span); // End the operation span before committing tx.commit()?; Ok(res) }) - .instrument(info_span!("db_write_thread")) .await .map_err(err_str!(StateError::Runtime))? } diff --git a/state/src/storage/db.rs b/state/src/storage/db.rs index d63ea1c7e..e5dcbcb8a 100644 --- a/state/src/storage/db.rs +++ b/state/src/storage/db.rs @@ -7,6 +7,7 @@ use std::{ops::Bound, path::Path}; use libmdbx::{Database, Geometry, WriteMap, RO, RW}; use serde::{Deserialize, Serialize}; +use tracing::instrument; use util::err_str; use crate::{ciborium_serialize, NUM_TABLES}; @@ -149,24 +150,28 @@ impl DB { } /// Create a new read-only transaction + #[instrument(skip(self))] pub fn new_read_tx(&self) -> Result, StorageError> { let txn = self.new_raw_read_tx()?; Ok(StateTxn::new(txn)) } /// Create a new raw read-only transaction + #[instrument(skip(self))] pub fn new_raw_read_tx(&self) -> Result, StorageError> { let txn = self.db.begin_ro_txn().map_err(StorageError::BeginTx)?; Ok(DbTxn::new(txn)) } /// Create a new read-write transaction + #[instrument(skip(self))] pub fn new_write_tx(&self) -> Result, StorageError> { let txn = self.new_raw_write_tx()?; Ok(StateTxn::new(txn)) } /// Create a new read-write transaction + #[instrument(skip(self))] pub fn new_raw_write_tx(&self) -> Result, StorageError> { self.db.begin_rw_txn().map_err(StorageError::BeginTx).map(DbTxn::new) } diff --git a/state/src/storage/tx/mod.rs b/state/src/storage/tx/mod.rs index 2d082077e..6f6925c53 100644 --- a/state/src/storage/tx/mod.rs +++ b/state/src/storage/tx/mod.rs @@ -59,6 +59,7 @@ impl<'db, T: TransactionKind> StateTxn<'db, T> { } /// Commit the transaction + #[instrument(skip(self))] pub fn commit(self) -> Result<(), StorageError> { self.inner.commit() } diff --git a/util/Cargo.toml b/util/Cargo.toml index 71ce29aa8..aaee08da1 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -40,7 +40,7 @@ serde_json = { workspace = true } tracing = { workspace = true } tracing-serde = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -tracing-opentelemetry = "0.22" +tracing-opentelemetry = { workspace = true } opentelemetry_sdk = { version = "0.21", features = ["trace", "rt-tokio"] } opentelemetry-otlp = "0.14" opentelemetry = { version = "0.21", default-features = false, features = [ diff --git a/workers/network-manager/Cargo.toml b/workers/network-manager/Cargo.toml index ebdf9808d..55b30e80c 100644 --- a/workers/network-manager/Cargo.toml +++ b/workers/network-manager/Cargo.toml @@ -36,5 +36,5 @@ util = { path = "../../util" } itertools = "0.11" serde_json = { workspace = true } tracing = { workspace = true } -tracing-opentelemetry = "0.22" +tracing-opentelemetry = { workspace = true } uuid = "1.1.2"