Skip to content

Commit

Permalink
feat: Rewrite order matching strategy
Browse files Browse the repository at this point in the history
Refactors the current trading component into a clearly separated orderbook component and a trade execution component. The linking part is the `ExecutableMatch` which can be derived from the matches stored into the database.

At the moment we assume optimistically that the trade execution will succeed. However, we should consider that a pending match may never get filled or it fails at execution in such a scenario we would need to rollback the matched orders.
  • Loading branch information
holzeis committed May 27, 2024
1 parent f2996aa commit 167979c
Show file tree
Hide file tree
Showing 20 changed files with 1,287 additions and 1,315 deletions.
43 changes: 24 additions & 19 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ use coordinator::run_migration;
use coordinator::scheduler::NotificationScheduler;
use coordinator::settings::Settings;
use coordinator::storage::CoordinatorTenTenOneStorage;
use coordinator::trade;
use coordinator::trade::websocket::InternalPositionUpdateMessage;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use lnd_bridge::LndBridge;
use rand::thread_rng;
use rand::RngCore;
use rust_decimal::prelude::FromPrimitive;
use rust_decimal::Decimal;
use std::backtrace::Backtrace;
use std::net::IpAddr;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -261,21 +264,22 @@ async fn main() -> Result<()> {

let (tx_orderbook_feed, _rx) = broadcast::channel(100);

let (_handle, trading_sender) = trading::start(
node.clone(),
tx_orderbook_feed.clone(),
auth_users_notifier.clone(),
let trade_executor = trade::spawn_trade_executor(node.clone(), auth_users_notifier.clone())?;

let order_matching_fee_rate =
Decimal::from_f32(node.settings.read().await.order_matching_fee_rate).expect("to fit");

let orderbook_sender = trading::spawn_orderbook(
node.pool.clone(),
notification_service.get_sender(),
network,
node.inner.oracle_pubkey,
);
let _handle = async_match::monitor(
node.clone(),
node_event_handler.subscribe(),
auth_users_notifier.clone(),
network,
node.inner.oracle_pubkey,
);
trade_executor.clone(),
tx_orderbook_feed.clone(),
order_matching_fee_rate,
)?;

let _handle =
async_match::monitor(node.clone(), node_event_handler.subscribe(), trade_executor);

let _handle = rollover::monitor(
pool.clone(),
node_event_handler.subscribe(),
Expand All @@ -294,11 +298,12 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await
if let Err(e) =
expired_positions::close(node.clone(), orderbook_sender.clone()).await
{
tracing::error!("Failed to close expired positions! Error: {e:#}");
}
Expand All @@ -308,11 +313,11 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await;
liquidated_positions::monitor(node.clone(), trading_sender.clone()).await
liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await
}
}
});
Expand All @@ -325,7 +330,7 @@ async fn main() -> Result<()> {
settings.clone(),
exporter,
NODE_ALIAS,
trading_sender,
orderbook_sender,
tx_orderbook_feed,
tx_position_feed,
tx_user_feed,
Expand Down
29 changes: 13 additions & 16 deletions coordinator/src/node/expired_positions.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::db::orders;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::OrderbookMessage;
use crate::position::models::Position;
use crate::position::models::PositionState;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use rust_decimal::prelude::FromPrimitive;
Expand All @@ -18,14 +16,15 @@ use xxi_node::commons::average_execution_price;
use xxi_node::commons::Match;
use xxi_node::commons::MatchState;
use xxi_node::commons::NewMarketOrder;
use xxi_node::commons::NewOrder;
use xxi_node::commons::OrderReason;
use xxi_node::commons::OrderState;

/// The timeout before we give up on closing an expired position collaboratively. This value should
/// not be larger than our refund transaction time lock.
pub const EXPIRED_POSITION_TIMEOUT: Duration = Duration::days(7);

pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
pub async fn close(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_positions(&mut conn)
Expand All @@ -50,8 +49,9 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->

if order.expiry < OffsetDateTime::now_utc() {
tracing::warn!(trader_id, order_id, "Matched order expired! Giving up on that position, looks like the corresponding dlc channel has to get force closed.");
// TODO(holzeis): It's not ideal that the order and match are updated by the trade
// executor. This should rather get updated by the orderbook.
orderbook::db::orders::set_order_state(&mut conn, order.id, OrderState::Expired)?;

orderbook::db::matches::set_match_state_by_order_id(
&mut conn,
order.id,
Expand All @@ -75,11 +75,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->

tracing::debug!(trader_pk=%position.trader, %position.expiry_timestamp, "Attempting to close expired position");

let order_id = uuid::Uuid::new_v4();
let trader_pubkey = position.trader;
let new_order = NewMarketOrder {
id: uuid::Uuid::new_v4(),
id: order_id,
contract_symbol: position.contract_symbol,
quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"),
trader_id: position.trader,
trader_id: trader_pubkey,
direction: position.trader_direction.opposite(),
leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"),
// This order can basically not expire, but if the user does not come back online within
Expand All @@ -89,18 +91,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->
stable: position.stable,
};

let order = orders::insert_market_order(&mut conn, new_order.clone(), OrderReason::Expired)
.map_err(|e| anyhow!(e))
.context("Failed to insert expired order into DB")?;

let message = NewOrderMessage {
order,
channel_opening_params: None,
let message = OrderbookMessage::NewOrder {
new_order: NewOrder::Market(new_order),
order_reason: OrderReason::Expired,
};

if let Err(e) = trading_sender.send(message).await {
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}");
if let Err(e) = orderbook_sender.send(message).await {
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing expired position. Error: {e:#}");
continue;
}
}
Expand Down
37 changes: 13 additions & 24 deletions coordinator/src/node/liquidated_positions.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::db::orders;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::OrderbookMessage;
use anyhow::Result;
use rust_decimal::prelude::FromPrimitive;
use rust_decimal::Decimal;
Expand All @@ -17,16 +16,17 @@ use xxi_node::commons::Direction;
use xxi_node::commons::Match;
use xxi_node::commons::MatchState;
use xxi_node::commons::NewMarketOrder;
use xxi_node::commons::NewOrder;
use xxi_node::commons::OrderReason;
use xxi_node::commons::OrderState;

/// The timeout before we give up on closing a liquidated position collaboratively. This value
/// should not be larger than our refund transaction time lock.
pub const LIQUIDATION_POSITION_TIMEOUT: Duration = Duration::days(7);

pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) {
pub async fn monitor(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) {
if let Err(e) =
check_if_positions_need_to_get_liquidated(trading_sender.clone(), node.clone()).await
check_if_positions_need_to_get_liquidated(orderbook_sender.clone(), node.clone()).await
{
tracing::error!("Failed to check if positions need to get liquidated. Error: {e:#}");
}
Expand All @@ -35,7 +35,7 @@ pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>)
/// For all open positions, check if the maintenance margin has been reached. Send a liquidation
/// async match to the traders whose positions have been liquidated.
async fn check_if_positions_need_to_get_liquidated(
trading_sender: mpsc::Sender<NewOrderMessage>,
orderbook_sender: mpsc::Sender<OrderbookMessage>,
node: Node,
) -> Result<()> {
let mut conn = node.pool.get()?;
Expand Down Expand Up @@ -121,11 +121,13 @@ async fn check_if_positions_need_to_get_liquidated(
}
}

let trader_pubkey = position.trader;
let order_id = uuid::Uuid::new_v4();
let new_order = NewMarketOrder {
id: uuid::Uuid::new_v4(),
id: order_id,
contract_symbol: position.contract_symbol,
quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"),
trader_id: position.trader,
trader_id: trader_pubkey,
direction: position.trader_direction.opposite(),
leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"),
// This order can basically not expire, but if the user does not come back online
Expand All @@ -140,26 +142,13 @@ async fn check_if_positions_need_to_get_liquidated(
false => OrderReason::CoordinatorLiquidated,
};

let order = match orders::insert_market_order(
&mut conn,
new_order.clone(),
order_reason.clone(),
) {
Ok(order) => order,
Err(e) => {
tracing::error!("Failed to insert liquidation order into DB. Error: {e:#}");
continue;
}
};

let message = NewOrderMessage {
order,
channel_opening_params: None,
let message = OrderbookMessage::NewOrder {
new_order: NewOrder::Market(new_order),
order_reason,
};

if let Err(e) = trading_sender.send(message).await {
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing liquidated position. Error: {e:#}");
if let Err(e) = orderbook_sender.send(message).await {
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing liquidated position. Error: {e:#}");
continue;
}
}
Expand Down
Loading

0 comments on commit 167979c

Please sign in to comment.