diff --git a/src/index.rs b/src/index.rs index 758eedee..42926493 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,6 +4,7 @@ use bitcoin::hashes::Hash; use bitcoin::{BlockHash, OutPoint, Txid}; use bitcoin_slices::{bsl, Visit, Visitor}; use std::ops::ControlFlow; +use std::thread; use crate::{ chain::{Chain, NewHeader}, @@ -188,22 +189,48 @@ impl Index { return Ok(true); // no more blocks to index (done for now) } } - for chunk in new_headers.chunks(self.batch_size) { - exit_flag.poll().with_context(|| { - format!( - "indexing interrupted at height: {}", - chunk.first().unwrap().height() - ) - })?; - self.sync_blocks(daemon, chunk)?; - } + + thread::scope(|s| -> Result<()> { + let (tx, rx) = crossbeam_channel::bounded(1); + + let chunks = new_headers.chunks(self.batch_size); + let index = &self; + let reader = s.spawn(move || -> Result<()> { + for chunk in chunks { + exit_flag.poll().with_context(|| { + format!( + "indexing interrupted at height: {}", + chunk.first().unwrap().height() + ) + })?; + let batch = index.index_blocks(daemon, chunk)?; + tx.send(batch).context("writer disconnected")?; + } + Ok(()) + }); + + let index = &self; + let writer = s.spawn(move || { + let stats = &index.stats; + for mut batch in rx { + stats.observe_duration("sort", || batch.sort()); + stats.observe_batch(&batch); + stats.observe_duration("write", || index.store.write(&batch)); + stats.observe_db(&index.store); + } + }); + + reader.join().expect("reader thread panic")?; + writer.join().expect("writer thread panic"); + Ok(()) + })?; self.chain.update(new_headers); self.stats.observe_chain(&self.chain); self.flush_needed = true; Ok(false) // sync is not done } - fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> { + fn index_blocks(&self, daemon: &Daemon, chunk: &[NewHeader]) -> Result { let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); let mut heights = chunk.iter().map(|h| h.height()); @@ -222,12 +249,7 @@ impl Index { "some blocks were not indexed: {:?}", heights ); - batch.sort(); - self.stats.observe_batch(&batch); - self.stats - .observe_duration("write", || self.store.write(&batch)); - self.stats.observe_db(&self.store); - Ok(()) + Ok(batch) } pub(crate) fn is_ready(&self) -> bool {