diff --git a/state/src/applicator/order_book.rs b/state/src/applicator/order_book.rs index 08b9c0c95..4e54e318a 100644 --- a/state/src/applicator/order_book.rs +++ b/state/src/applicator/order_book.rs @@ -96,7 +96,8 @@ impl StateApplicator { tx.commit()?; // Mark the order as locally matchable in the order book cache - self.order_cache().add_matchable_order_blocking(order_id); + todo!("re-implement write paths for order book cache"); + // self.order_cache().add_matchable_order_blocking(order_id); self.system_bus().publish( ORDER_STATE_CHANGE_TOPIC.to_string(), SystemBusMessage::OrderStateChange { order: order_info }, diff --git a/state/src/applicator/wallet_index.rs b/state/src/applicator/wallet_index.rs index ac7954a9c..dccd07adf 100644 --- a/state/src/applicator/wallet_index.rs +++ b/state/src/applicator/wallet_index.rs @@ -198,9 +198,9 @@ impl StateApplicator { /// that are fn update_external_order_cache(&self, wallet: &Wallet) { let order_cache = self.order_cache(); - for (id, order) in wallet.get_nonzero_orders().into_iter() { + for (id, order) in wallet.get_matchable_orders().into_iter() { if order.allow_external_matches { - order_cache.add_externally_enabled_order_blocking(id); + order_cache.mark_order_externally_matchable_blocking(id); } else { order_cache.remove_externally_enabled_order_blocking(id); } diff --git a/state/src/caching/order_cache.rs b/state/src/caching/order_cache.rs index 85a745025..26577dca5 100644 --- a/state/src/caching/order_cache.rs +++ b/state/src/caching/order_cache.rs @@ -5,106 +5,98 @@ use std::collections::HashSet; -use common::types::wallet::OrderIdentifier; +use circuit_types::{order::OrderSide, Amount}; +use common::types::wallet::{Order, OrderIdentifier, Pair}; use tokio::sync::RwLock; +use tracing::instrument; use crate::storage::{db::DB, error::StorageError}; -use super::RwLockHashSet; +use super::{order_metadata_index::OrderMetadataIndex, RwLockHashSet}; + +/// A filter for querying the order book cache +#[derive(Clone)] +pub struct OrderBookFilter { + /// The pair to filter on + pair: Pair, + /// The side to filter on + side: OrderSide, + /// Whether to only accept externally matchable orders + external: bool, +} + +impl OrderBookFilter { + /// Construct a new order book filter + pub fn new(pair: Pair, side: OrderSide, external: bool) -> Self { + Self { pair, side, external } + } +} /// The order book cache #[derive(Default)] pub struct OrderBookCache { - /// The set of open orders which are matchable and locally managed - matchable_orders: RwLockHashSet, /// The set of local orders which have external matches enabled /// /// This may not be a subset of `matchable_orders`, some externally /// matchable orders may not be yet matchable, e.g. if they are waiting for /// validity proofs externally_enabled_orders: RwLockHashSet, + /// The index of order metadata + order_metadata_index: OrderMetadataIndex, } impl OrderBookCache { /// Construct a new order book cache pub fn new() -> Self { Self { - matchable_orders: RwLock::new(HashSet::new()), externally_enabled_orders: RwLock::new(HashSet::new()), + order_metadata_index: OrderMetadataIndex::new(), } } // --- Getters --- // - /// Get the set of matchable orders - pub async fn matchable_orders(&self) -> Vec { - self.matchable_orders.read().await.iter().copied().collect() - } - - /// Get the set of matchable orders in a blocking fashion - pub fn matchable_orders_blocking(&self) -> Vec { - self.matchable_orders.blocking_read().iter().copied().collect() - } - - /// Get the set of externally matchable orders - /// - /// This is the intersection of `matchable_orders` and - /// `externally_enabled_orders` - pub async fn externally_matchable_orders(&self) -> Vec { - let matchable = self.matchable_orders.read().await; - let external = self.externally_enabled_orders.read().await; - matchable.intersection(&external).copied().collect() - } - - /// Get the set of externally matchable orders in a blocking fashion - pub fn externally_matchable_orders_blocking(&self) -> Vec { - let matchable = self.matchable_orders.blocking_read(); - let external = self.externally_enabled_orders.blocking_read(); - matchable.intersection(&external).copied().collect() + /// Get orders matching a filter + pub async fn get_orders(&self, filter: OrderBookFilter) -> Vec { + let orders = self.order_metadata_index.get_orders(&filter.pair, filter.side).await; + if filter.external { + let externally_matchable = self.externally_enabled_orders.read().await; + orders.into_iter().filter(|id| externally_matchable.contains(id)).collect() + } else { + orders + } } // --- Setters --- // - /// Add a matchable order - pub async fn add_matchable_order(&self, order: OrderIdentifier) { - self.matchable_orders.write().await.insert(order); - } - - /// Add a matchable order in a blocking fashion - pub fn add_matchable_order_blocking(&self, order: OrderIdentifier) { - self.matchable_orders.blocking_write().insert(order); + /// Add an order to the cache + pub async fn add_order(&self, id: OrderIdentifier, order: &Order, matchable_amount: Amount) { + self.order_metadata_index.add_order(id, order, matchable_amount).await; + if order.allow_external_matches { + self.externally_enabled_orders.write().await.insert(id); + } } - /// Add an externally enabled order - pub async fn add_externally_enabled_order(&self, order: OrderIdentifier) { + /// Mark an order as externally matchable + pub async fn mark_order_externally_matchable(&self, order: OrderIdentifier) { self.externally_enabled_orders.write().await.insert(order); } - /// Add an externally enabled order in a blocking fashion - pub fn add_externally_enabled_order_blocking(&self, order: OrderIdentifier) { + /// Mark an order as externally matchable in a blocking fashion + pub fn mark_order_externally_matchable_blocking(&self, order: OrderIdentifier) { self.externally_enabled_orders.blocking_write().insert(order); } /// Remove an order from the cache entirely pub async fn remove_order(&self, order: OrderIdentifier) { - self.matchable_orders.write().await.remove(&order); self.externally_enabled_orders.write().await.remove(&order); + self.order_metadata_index.remove_order(&order).await; } /// Remove an order in a blocking fashion pub fn remove_order_blocking(&self, order: OrderIdentifier) { - self.remove_matchable_order_blocking(order); self.remove_externally_enabled_order_blocking(order); - } - - /// Remove a matchable order - pub async fn remove_matchable_order(&self, order: OrderIdentifier) { - self.matchable_orders.write().await.remove(&order); - } - - /// Remove a matchable order in a blocking fashion - pub fn remove_matchable_order_blocking(&self, order: OrderIdentifier) { - self.matchable_orders.blocking_write().remove(&order); + todo!("re-implement write paths for order book cache"); } /// Remove an externally enabled order @@ -120,7 +112,8 @@ impl OrderBookCache { /// Backfill the order cache from a DB /// /// This method may be used to populate the cache on startup - pub fn backfill_from_db(&self, db: &DB) -> Result<(), StorageError> { + #[instrument(skip(self, db))] + pub async fn hydrate_from_db(&self, db: &DB) -> Result<(), StorageError> { let tx = db.new_read_tx()?; let orders = tx.get_local_orders()?; for order_id in orders.into_iter() { @@ -131,15 +124,20 @@ impl OrderBookCache { }; if info.local && info.ready_for_match() { - self.add_matchable_order_blocking(order_id); - } - - // Check if the order allows external matches - if let Some(wallet) = tx.get_wallet_for_order(&order_id)? - && let Some(order) = wallet.get_order(&order_id) - && order.allow_external_matches - { - self.add_externally_enabled_order_blocking(order_id); + // Get the order itself + let wallet = match tx.get_wallet_for_order(&order_id)? { + Some(wallet) => wallet, + None => continue, + }; + + let order = match wallet.get_order(&order_id) { + Some(order) => order, + None => continue, + }; + + let matchable_amount = + wallet.get_balance_for_order(order).map(|b| b.amount).unwrap_or_default(); + self.add_order(order_id, order, matchable_amount).await; } } @@ -149,90 +147,121 @@ impl OrderBookCache { #[cfg(test)] mod test { - use std::collections::HashSet; - use std::hash::Hash; + use super::*; + use common::types::wallet_mocks::mock_order; - use super::OrderBookCache; - use uuid::Uuid; - - /// Returns whether two vectors contain the same elements, ignoring order - fn same_elements(a: Vec, b: Vec) -> bool { - let a: HashSet<_> = a.into_iter().collect(); - let b: HashSet<_> = b.into_iter().collect(); - a == b - } - - /// Tests the `get_matchable_orders` method - #[test] - fn test_get_matchable_orders() { + /// Tests getting an order by pair + #[tokio::test] + async fn test_get_orders_basic() { let cache = OrderBookCache::new(); - let order1 = Uuid::new_v4(); - let order2 = Uuid::new_v4(); - - // Add the first order as matchable and the second as externally enabled - cache.add_matchable_order_blocking(order1); - cache.add_externally_enabled_order_blocking(order2); + let order_id = OrderIdentifier::new_v4(); + let order = mock_order(); - assert_eq!(cache.matchable_orders_blocking(), vec![order1]); + // Add an order to the cache + cache.add_order(order_id, &order, 100 /* matchable_amount */).await; - // Remove the second order, this should not affect the matchable orders - cache.remove_order_blocking(order2); - assert_eq!(cache.matchable_orders_blocking(), vec![order1]); + let filter = OrderBookFilter::new(order.pair(), order.side, false /* external */); + let orders = cache.get_orders(filter.clone()).await; + assert_eq!(orders.len(), 1); + assert_eq!(orders[0], order_id); - // Remove the first order, the matchable orders should now be empty - cache.remove_matchable_order_blocking(order1); - assert_eq!(cache.matchable_orders_blocking(), vec![]); + // Remove the order from the cache + cache.remove_order(order_id).await; + let orders = cache.get_orders(filter).await; + assert_eq!(orders.len(), 0); } - /// Tests the `externally_matchable_orders` method - #[test] - fn test_externally_matchable_orders() { + /// Tests getting multiple orders by their pair + #[tokio::test] + async fn test_get_orders_multiple() { let cache = OrderBookCache::new(); - let order1 = Uuid::new_v4(); - let order2 = Uuid::new_v4(); - - // Add the first order as externally enabled and the second as matchable - cache.add_matchable_order_blocking(order1); - cache.add_externally_enabled_order_blocking(order1); - cache.add_matchable_order_blocking(order2); - - assert_eq!(cache.externally_matchable_orders_blocking(), vec![order1]); - - // Remove the second order, this should not affect externally enabled orders - cache.remove_order_blocking(order2); - assert_eq!(cache.externally_matchable_orders_blocking(), vec![order1]); + let order_id1 = OrderIdentifier::new_v4(); + let order_id2 = OrderIdentifier::new_v4(); + let order_id3 = OrderIdentifier::new_v4(); + let order = mock_order(); + + cache.add_order(order_id1, &order, 100 /* matchable_amount */).await; + cache.add_order(order_id2, &order, 300 /* matchable_amount */).await; + cache.add_order(order_id3, &order, 200 /* matchable_amount */).await; + + let filter = OrderBookFilter::new(order.pair(), order.side, false /* external */); + let orders = cache.get_orders(filter.clone()).await; + assert_eq!(orders.len(), 3); + assert_eq!(orders, vec![order_id2, order_id3, order_id1]); + + // Remove the middle order + cache.remove_order(order_id3).await; + let orders = cache.get_orders(filter).await; + assert_eq!(orders.len(), 2); + assert_eq!(orders, vec![order_id2, order_id1]); + } - // Remove the first order, externally enabled orders should now be empty - cache.remove_externally_enabled_order_blocking(order1); - assert_eq!(cache.externally_matchable_orders_blocking(), vec![]); + /// Tests getting external orders only + #[tokio::test] + async fn test_get_orders_external() { + let cache = OrderBookCache::new(); + let order_id1 = OrderIdentifier::new_v4(); + let order_id2 = OrderIdentifier::new_v4(); + let order_id3 = OrderIdentifier::new_v4(); + let mut order1 = mock_order(); + let mut order2 = order1.clone(); + let mut order3 = order1.clone(); + order1.allow_external_matches = true; + order2.allow_external_matches = false; + order3.allow_external_matches = true; + + cache.add_order(order_id1, &order1, 100 /* matchable_amount */).await; + cache.add_order(order_id2, &order2, 200 /* matchable_amount */).await; + cache.add_order(order_id3, &order3, 300 /* matchable_amount */).await; + + let filter = OrderBookFilter::new(order1.pair(), order1.side, true /* external */); + let orders = cache.get_orders(filter.clone()).await; + assert_eq!(orders.len(), 2); + assert_eq!(orders, vec![order_id3, order_id1]); + + // Remove the first order + cache.remove_order(order_id1).await; + let orders = cache.get_orders(filter).await; + assert_eq!(orders.len(), 1); + assert_eq!(orders, vec![order_id3]); } - /// Tests the `order_cache` methods - #[test] - fn test_order_cache_multiple() { + /// Tests getting orders on different pairs + #[tokio::test] + async fn test_get_orders_different_pairs() { let cache = OrderBookCache::new(); - let order1 = Uuid::new_v4(); - let order2 = Uuid::new_v4(); - let order3 = Uuid::new_v4(); - - // Add orders - cache.add_matchable_order_blocking(order1); - cache.add_externally_enabled_order_blocking(order2); - cache.add_matchable_order_blocking(order3); - cache.add_externally_enabled_order_blocking(order3); - - // Two are matchable, only one is externally matchable - assert!(same_elements(cache.matchable_orders_blocking(), vec![order1, order3])); - assert_eq!(cache.externally_matchable_orders_blocking(), vec![order3]); - - // Remove the first order, only one is matchable - cache.remove_order_blocking(order1); - assert_eq!(cache.matchable_orders_blocking(), vec![order3]); - assert_eq!(cache.externally_matchable_orders_blocking(), vec![order3]); - - // Remove the last order, none are matchable - cache.remove_order_blocking(order3); - assert_eq!(cache.matchable_orders_blocking(), vec![]); - assert_eq!(cache.externally_matchable_orders_blocking(), vec![]); + let order_id1 = OrderIdentifier::new_v4(); + let order_id2 = OrderIdentifier::new_v4(); + let order_id3 = OrderIdentifier::new_v4(); + let order1 = mock_order(); + let order2 = mock_order(); + let order3 = order1.clone(); + + cache.add_order(order_id1, &order1, 300 /* matchable_amount */).await; + cache.add_order(order_id2, &order2, 100 /* matchable_amount */).await; + cache.add_order(order_id3, &order3, 200 /* matchable_amount */).await; + + // Check the first pair + let filter1 = OrderBookFilter::new(order1.pair(), order1.side, false /* external */); + let orders = cache.get_orders(filter1.clone()).await; + assert_eq!(orders.len(), 2); + assert_eq!(orders, vec![order_id1, order_id3]); + + // Check the second pair + let filter2 = OrderBookFilter::new(order2.pair(), order2.side, false /* external */); + let orders = cache.get_orders(filter2.clone()).await; + assert_eq!(orders.len(), 1); + assert_eq!(orders, vec![order_id2]); + + // Remove from the first pair + cache.remove_order(order_id1).await; + let orders = cache.get_orders(filter1).await; + assert_eq!(orders.len(), 1); + assert_eq!(orders, vec![order_id3]); + + // Remove from the second pair + cache.remove_order(order_id2).await; + let orders = cache.get_orders(filter2).await; + assert_eq!(orders.len(), 0); } } diff --git a/state/src/interface/order_book.rs b/state/src/interface/order_book.rs index c36274822..7edad1546 100644 --- a/state/src/interface/order_book.rs +++ b/state/src/interface/order_book.rs @@ -186,18 +186,19 @@ impl StateInner { /// Get a list of order IDs that are locally managed and ready for match #[instrument(name = "get_locally_matchable_orders", skip_all)] pub async fn get_locally_matchable_orders(&self) -> Result, StateError> { - let matchable_orders = self.order_cache.matchable_orders().await; - self.with_read_tx(move |tx| { - let mut res = Vec::new(); - for id in matchable_orders.into_iter() { - if Self::is_task_queue_free(&id, tx)? { - res.push(id); - } - } - - Ok(res) - }) - .await + todo!("Fix this method"); + // let matchable_orders = self.order_cache.matchable_orders().await; + // self.with_read_tx(move |tx| { + // let mut res = Vec::new(); + // for id in matchable_orders.into_iter() { + // if Self::is_task_queue_free(&id, tx)? { + // res.push(id); + // } + // } + + // Ok(res) + // }) + // .await } /// Get a list of order IDs that are locally managed and ready for match in @@ -207,23 +208,24 @@ impl StateInner { &self, matching_pool: MatchingPoolName, ) -> Result, StateError> { - let matchable_orders = self.order_cache.matchable_orders().await; - self.with_read_tx(move |tx| { - let mut res = Vec::new(); - for id in matchable_orders.into_iter() { - let order_matching_pool = tx.get_matching_pool_for_order(&id)?; - if order_matching_pool != matching_pool { - continue; - } - - if Self::is_task_queue_free(&id, tx)? { - res.push(id); - } - } - - Ok(res) - }) - .await + todo!("Fix this method"); + // let matchable_orders = self.order_cache.matchable_orders().await; + // self.with_read_tx(move |tx| { + // let mut res = Vec::new(); + // for id in matchable_orders.into_iter() { + // let order_matching_pool = + // tx.get_matching_pool_for_order(&id)?; if + // order_matching_pool != matching_pool { continue; + // } + + // if Self::is_task_queue_free(&id, tx)? { + // res.push(id); + // } + // } + + // Ok(res) + // }) + // .await } /// Get the set of orders that are ready for a match and allow external @@ -232,19 +234,21 @@ impl StateInner { pub async fn get_externally_matchable_orders( &self, ) -> Result, StateError> { - let externally_matchable_orders = self.order_cache.externally_matchable_orders().await; - self.with_read_tx(move |tx| { - let mut orders = Vec::new(); - for id in externally_matchable_orders.into_iter() { - // Check that the task queue is free - if Self::is_task_queue_free(&id, tx)? { - orders.push(id); - } - } - - Ok(orders) - }) - .await + todo!("Fix this method"); + // let externally_matchable_orders = + // self.order_cache.externally_matchable_orders().await; + // self.with_read_tx(move |tx| { + // let mut orders = Vec::new(); + // for id in externally_matchable_orders.into_iter() { + // // Check that the task queue is free + // if Self::is_task_queue_free(&id, tx)? { + // orders.push(id); + // } + // } + + // Ok(orders) + // }) + // .await } /// Choose an order to handshake with according to their priorities diff --git a/state/src/replication/state_machine/snapshot.rs b/state/src/replication/state_machine/snapshot.rs index c488bb21a..fdf2d0a34 100644 --- a/state/src/replication/state_machine/snapshot.rs +++ b/state/src/replication/state_machine/snapshot.rs @@ -11,6 +11,7 @@ use openraft::{ ErrorSubject, ErrorVerb, LogId, RaftSnapshotBuilder, Snapshot, SnapshotMeta, StorageError as RaftStorageError, StoredMembership, }; +use tracing::error; use util::{err_str, get_current_time_millis}; use crate::replication::error::{new_snapshot_error, ReplicationV2Error}; @@ -255,7 +256,11 @@ impl StateMachine { let order_cache_clone = self.applicator.config.order_cache.clone(); let jh = tokio::task::spawn_blocking(move || { Self::copy_db_data(&snapshot_db, &db_clone)?; - order_cache_clone.backfill_from_db(&db_clone)?; + tokio::spawn(async move { + if let Err(e) = order_cache_clone.hydrate_from_db(&snapshot_db).await { + error!("error hydrating order cache from snapshot: {e}"); + }; + }); Ok(()) }); @@ -581,13 +586,15 @@ mod tests { target_sm.update_from_snapshot(&meta, snapshot_db).await.unwrap(); // Check that the order cache has the correct orders - let matchable_orders = order_cache.matchable_orders().await; - assert_eq!(matchable_orders.len(), 2); - assert!(matchable_orders.contains(&oid1)); - assert!(matchable_orders.contains(&oid2)); - - let external_matchable_orders = order_cache.externally_matchable_orders().await; - assert_eq!(external_matchable_orders.len(), 1); - assert!(external_matchable_orders.contains(&oid1)); + todo!("Fix this test") + // let matchable_orders = order_cache.matchable_orders().await; + // assert_eq!(matchable_orders.len(), 2); + // assert!(matchable_orders.contains(&oid1)); + // assert!(matchable_orders.contains(&oid2)); + + // let external_matchable_orders = + // order_cache.externally_matchable_orders().await; + // assert_eq!(external_matchable_orders.len(), 1); + // assert!(external_matchable_orders.contains(&oid1)); } }