diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 168638872407..b27ca7dadb52 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -65,6 +65,53 @@ type HeaderLruCache = MultiConsumerLruCache { to_service: UnboundedSender>, } +/// Drop aware sender struct +#[derive(Debug)] +struct ActionSender { + blockhash: B256, + tx: Option>>, +} + +impl ActionSender { + const fn new(blockhash: B256, tx: Option>>) -> Self { + Self { blockhash, tx } + } + fn send_block( + &mut self, + block_sender: Result>>, ProviderError>, + ) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(CacheAction::BlockWithSendersResult { + block_hash: self.blockhash, + res: block_sender, + }); + } + } + fn send_receipts(&mut self, receipts: Result>>, ProviderError>) { + if let Some(tx) = self.tx.take() { + let _ = + tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts }); + } + } + fn send_header(&mut self, header: Result<::Header, ProviderError>) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(CacheAction::HeaderResult { + block_hash: self.blockhash, + res: Box::new(header), + }); + } + } +} +impl Drop for ActionSender { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(CacheAction::BlockWithSendersResult { + block_hash: self.blockhash, + res: Err(ProviderError::CacheServiceUnavailable), + }); + } + } +} impl Clone for EthStateCache { fn clone(&self) -> Self { @@ -359,6 +406,8 @@ where let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); + let mut action_sender = + ActionSender::new(block_hash, Some(action_tx)); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -370,10 +419,7 @@ where TransactionVariant::WithHash, ) .map(|maybe_block| maybe_block.map(Arc::new)); - let _ = action_tx.send(CacheAction::BlockWithSendersResult { - block_hash, - res: block_sender, - }); + action_sender.send_block(block_sender); })); } } @@ -389,6 +435,8 @@ where let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); + let mut action_sender = + ActionSender::new(block_hash, Some(action_tx)); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -396,8 +444,7 @@ where .receipts_by_block(block_hash.into()) .map(|maybe_receipts| maybe_receipts.map(Arc::new)); - let _ = action_tx - .send(CacheAction::ReceiptsResult { block_hash, res }); + action_sender.send_receipts(res); })); } } @@ -414,6 +461,8 @@ where let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); + let mut action_sender = + ActionSender::new(block_hash, Some(action_tx)); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -422,10 +471,7 @@ where ProviderError::HeaderNotFound(block_hash.into()) }) }); - let _ = action_tx.send(CacheAction::HeaderResult { - block_hash, - res: Box::new(header), - }); + action_sender.send_header(header); })); } } diff --git a/crates/rpc/rpc-eth-types/src/fee_history.rs b/crates/rpc/rpc-eth-types/src/fee_history.rs index 2c365ae90bff..4d63482a670a 100644 --- a/crates/rpc/rpc-eth-types/src/fee_history.rs +++ b/crates/rpc/rpc-eth-types/src/fee_history.rs @@ -245,7 +245,7 @@ pub async fn fee_history_cache_new_blocks_task( event = events.next() => { let Some(event) = event else { // the stream ended, we are done - break; + break }; let committed = event.committed();