Skip to content

Commit

Permalink
Refactor tick by tick error handling (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
wboayue authored Dec 25, 2024
1 parent df44c02 commit 6a048d7
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 120 deletions.
13 changes: 2 additions & 11 deletions examples/tick_by_tick_all_last.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use ibapi::contracts::Contract;
use ibapi::market_data::realtime::LastTicks;
use ibapi::Client;

// This example demonstrates how to stream tick by tick data for the last price of a contract.
Expand All @@ -22,16 +21,8 @@ fn main() {
contract.security_type, contract.symbol
);

for (i, tick) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
match tick {
LastTicks::Trade(trade) => {
println!("{}: {i:?} {trade:?}", contract.symbol);
}
LastTicks::Notice(notice) => {
// server could send a notice if it doesn't recognize the contract
println!("error_code: {}, error_message: {}", notice.code, notice.message);
}
}
for (i, trade) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
println!("{}: {i:?} {trade:?}", contract.symbol);
}

// check for errors during streaming
Expand Down
13 changes: 2 additions & 11 deletions examples/tick_by_tick_bid_ask.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use ibapi::contracts::Contract;
use ibapi::market_data::realtime::BidAskTicks;
use ibapi::Client;

// This example demonstrates how to stream tick by tick data for the bid and ask price of a contract.
Expand All @@ -22,16 +21,8 @@ fn main() {
contract.security_type, contract.symbol
);

for (i, tick) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
match tick {
BidAskTicks::BidAsk(bid_ask) => {
println!("{}: {i:?} {bid_ask:?}", contract.symbol);
}
BidAskTicks::Notice(notice) => {
// server could send a notice if it doesn't recognize the contract
println!("error_code: {}, error_message: {}", notice.code, notice.message);
}
}
for (i, bid_ask) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
println!("{}: {i:?} {bid_ask:?}", contract.symbol);
}

// check for errors during streaming
Expand Down
13 changes: 2 additions & 11 deletions examples/tick_by_tick_last.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use ibapi::contracts::Contract;
use ibapi::market_data::realtime::LastTicks;
use ibapi::Client;

// This example demonstrates how to stream tick by tick data for the last price of a contract.
Expand All @@ -22,16 +21,8 @@ fn main() {
contract.security_type, contract.symbol
);

for (i, tick) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
match tick {
LastTicks::Trade(trade) => {
println!("{}: {i:?} {trade:?}", contract.symbol);
}
LastTicks::Notice(notice) => {
// server could send a notice if it doesn't recognize the contract
println!("error_code: {}, error_message: {}", notice.code, notice.message);
}
}
for (i, trade) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
println!("{}: {i:?} {trade:?}", contract.symbol);
}

// check for errors during streaming
Expand Down
13 changes: 2 additions & 11 deletions examples/tick_by_tick_midpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use ibapi::contracts::Contract;
use ibapi::market_data::realtime::MidpointTicks;
use ibapi::Client;

// This example demonstrates how to stream tick by tick data for the midpoint price of a contract.
Expand All @@ -22,16 +21,8 @@ fn main() {
contract.security_type, contract.symbol
);

for (i, tick) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
match tick {
MidpointTicks::Midpoint(midpoint) => {
println!("{}: {i:?} {midpoint:?}", contract.symbol);
}
MidpointTicks::Notice(notice) => {
// server could send a notice if it doesn't recognize the contract
println!("error_code: {}, error_message: {}", notice.code, notice.message);
}
}
for (i, midpoint) in ticks.timeout_iter(Duration::from_secs(10)).enumerate() {
println!("{}: {i:?} {midpoint:?}", contract.symbol);
}

// check for errors during streaming
Expand Down
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::accounts::{AccountSummaries, AccountUpdate, AccountUpdateMulti, Famil
use crate::contracts::{Contract, OptionComputation, SecurityType};
use crate::errors::Error;
use crate::market_data::historical::{self, HistogramEntry};
use crate::market_data::realtime::{self, Bar, BarSize, DepthMarketDataDescription, MarketDepths, MidpointTicks, TickTypes, WhatToShow};
use crate::market_data::realtime::{self, Bar, BarSize, DepthMarketDataDescription, MarketDepths, MidPoint, TickTypes, WhatToShow};
use crate::market_data::MarketDataType;
use crate::messages::{IncomingMessages, OutgoingMessages};
use crate::messages::{RequestMessage, ResponseMessage};
Expand Down Expand Up @@ -1100,7 +1100,7 @@ impl Client {
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, realtime::LastTicks>, Error> {
) -> Result<Subscription<'a, realtime::Trade>, Error> {
realtime::tick_by_tick_all_last(self, contract, number_of_ticks, ignore_size)
}

Expand All @@ -1115,7 +1115,7 @@ impl Client {
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, realtime::BidAskTicks>, Error> {
) -> Result<Subscription<'a, realtime::BidAsk>, Error> {
realtime::tick_by_tick_bid_ask(self, contract, number_of_ticks, ignore_size)
}

Expand All @@ -1130,7 +1130,7 @@ impl Client {
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, realtime::LastTicks>, Error> {
) -> Result<Subscription<'a, realtime::Trade>, Error> {
realtime::tick_by_tick_last(self, contract, number_of_ticks, ignore_size)
}

Expand All @@ -1145,7 +1145,7 @@ impl Client {
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, MidpointTicks>, Error> {
) -> Result<Subscription<'a, MidPoint>, Error> {
realtime::tick_by_tick_midpoint(self, contract, number_of_ticks, ignore_size)
}

Expand Down
12 changes: 11 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{num::ParseIntError, string::FromUtf8Error, sync::Arc};

use crate::messages::ResponseMessage;
use crate::messages::{ResponseMessage, CODE_INDEX, MESSAGE_INDEX};

#[derive(Debug, Clone)]
#[non_exhaustive]
Expand All @@ -25,6 +25,7 @@ pub enum Error {
EndOfStream,
UnexpectedResponse(ResponseMessage),
UnexpectedEndOfStream,
Message(i32, String),
}

impl std::error::Error for Error {}
Expand All @@ -51,6 +52,7 @@ impl std::fmt::Display for Error {

Error::Simple(ref err) => write!(f, "error occurred: {err}"),
Error::InvalidArgument(ref err) => write!(f, "InvalidArgument: {err}"),
Error::Message(code, message) => write!(f, "[{code}] {message}"),
}
}
}
Expand Down Expand Up @@ -79,6 +81,14 @@ impl From<time::error::Parse> for Error {
}
}

impl From<ResponseMessage> for Error {
fn from(err: ResponseMessage) -> Error {
let code = err.peek_int(CODE_INDEX).unwrap();
let message = err.peek_string(MESSAGE_INDEX);
Error::Message(code, message)
}
}

impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(err: std::sync::PoisonError<T>) -> Error {
Error::Poison(format!("Mutex poison error: {}", err))
Expand Down
49 changes: 17 additions & 32 deletions src/market_data/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ pub enum BarSize {
// Day,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum BidAskTicks {
BidAsk(BidAsk),
Notice(Notice),
}

/// Represents `BidAsk` tick by tick realtime tick.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct BidAsk {
/// The spread's date and time (either as a yyyymmss hh:mm:ss formatted string or as system time according to the request). Time zone is the TWS time zone chosen on login.
Expand All @@ -56,13 +51,13 @@ pub struct BidAsk {
pub bid_ask_attribute: BidAskAttribute,
}

impl DataStream<BidAskTicks> for BidAskTicks {
impl DataStream<BidAsk> for BidAsk {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];

fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => Ok(BidAskTicks::BidAsk(decoders::decode_bid_ask_tick(message)?)),
IncomingMessages::Error => Ok(BidAskTicks::Notice(Notice::from(message))),
IncomingMessages::TickByTick => decoders::decode_bid_ask_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
Expand All @@ -79,12 +74,7 @@ pub struct BidAskAttribute {
pub ask_past_high: bool,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum MidpointTicks {
Midpoint(MidPoint),
Notice(Notice),
}

/// Represents `MidPoint` tick by tick realtime tick.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct MidPoint {
/// The trade's date and time (either as a yyyymmss hh:mm:ss formatted string or as system time according to the request). Time zone is the TWS time zone chosen on login.
Expand All @@ -93,13 +83,13 @@ pub struct MidPoint {
pub mid_point: f64,
}

impl DataStream<MidpointTicks> for MidpointTicks {
impl DataStream<MidPoint> for MidPoint {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];

fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => Ok(MidpointTicks::Midpoint(decoders::decode_mid_point_tick(message)?)),
IncomingMessages::Error => Ok(MidpointTicks::Notice(Notice::from(message))),
IncomingMessages::TickByTick => decoders::decode_mid_point_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
Expand Down Expand Up @@ -144,15 +134,10 @@ impl DataStream<Bar> for Bar {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum LastTicks {
Trade(Trade),
Notice(Notice),
}

/// Represents `Last` or `AllLast` tick-by-tick real-time tick.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct Trade {
/// Tick type: "Last" or "AllLast"
/// Tick type: `Last` or `AllLast`
pub tick_type: String,
/// The trade's date and time (either as a yyyymmss hh:mm:ss formatted string or as system time according to the request). Time zone is the TWS time zone chosen on login.
pub time: OffsetDateTime,
Expand All @@ -168,13 +153,13 @@ pub struct Trade {
pub special_conditions: String,
}

impl DataStream<LastTicks> for LastTicks {
impl DataStream<Trade> for Trade {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];

fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => Ok(LastTicks::Trade(decoders::decode_trade_tick(message)?)),
IncomingMessages::Error => Ok(LastTicks::Notice(Notice::from(message))),
IncomingMessages::TickByTick => decoders::decode_trade_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
Expand Down Expand Up @@ -428,7 +413,7 @@ pub(crate) fn tick_by_tick_all_last<'a>(
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, LastTicks>, Error> {
) -> Result<Subscription<'a, Trade>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;

let server_version = client.server_version();
Expand Down Expand Up @@ -460,7 +445,7 @@ pub(crate) fn tick_by_tick_last<'a>(
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, LastTicks>, Error> {
) -> Result<Subscription<'a, Trade>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;

let server_version = client.server_version();
Expand All @@ -478,7 +463,7 @@ pub(crate) fn tick_by_tick_bid_ask<'a>(
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, BidAskTicks>, Error> {
) -> Result<Subscription<'a, BidAsk>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;

let server_version = client.server_version();
Expand All @@ -496,7 +481,7 @@ pub(crate) fn tick_by_tick_midpoint<'a>(
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, MidpointTicks>, Error> {
) -> Result<Subscription<'a, MidPoint>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;

let server_version = client.server_version();
Expand Down
24 changes: 9 additions & 15 deletions src/market_data/realtime/tests/subscription_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,21 @@ fn test_tick_by_tick_all_last() {
let trades = trades.expect("Failed to create tick-by-tick subscription");

// Test receiving data
let received_trades: Vec<LastTicks> = trades.iter().take(2).collect();
let received_trades: Vec<Trade> = trades.iter().take(2).collect();

assert_eq!(received_trades.len(), 2, "Should receive 2 trades");

// Verify first trade
if let LastTicks::Trade(trade) = &received_trades[0] {
assert_eq!(trade.price, 3895.25, "Wrong price for first trade");
assert_eq!(trade.size, 7, "Wrong size for first trade");
assert_eq!(trade.exchange, "NASDAQ", "Wrong exchange for first trade");
} else {
panic!("Expected trade, got {:?}", received_trades[0]);
}
let trade = &received_trades[0];
assert_eq!(trade.price, 3895.25, "Wrong price for first trade");
assert_eq!(trade.size, 7, "Wrong size for first trade");
assert_eq!(trade.exchange, "NASDAQ", "Wrong exchange for first trade");

// Verify second trade
if let LastTicks::Trade(trade) = &received_trades[1] {
assert_eq!(trade.price, 3895.50, "Wrong price for second trade");
assert_eq!(trade.size, 5, "Wrong size for second trade");
assert_eq!(trade.exchange, "NYSE", "Wrong exchange for second trade");
} else {
panic!("Expected trade, got {:?}", received_trades[1]);
}
let trade = &received_trades[1];
assert_eq!(trade.price, 3895.50, "Wrong price for second trade");
assert_eq!(trade.size, 5, "Wrong size for second trade");
assert_eq!(trade.exchange, "NYSE", "Wrong exchange for second trade");

// Verify request message
let request_messages = client.message_bus.request_messages();
Expand Down
12 changes: 4 additions & 8 deletions src/market_data/realtime/tests/tick_by_tick_last_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@ fn test_tick_by_tick_last() {

// Test receiving data
let trades = result.expect("Failed to receive tick-by-tick last data");
let received_trades: Vec<LastTicks> = trades.iter().take(1).collect();
let received_trades: Vec<Trade> = trades.iter().take(1).collect();

assert_eq!(received_trades.len(), 1, "Should receive 1 trade");

// Verify trade data
let trade = &received_trades[0];
if let LastTicks::Trade(trade) = trade {
assert_eq!(trade.price, 3895.25, "Wrong price");
assert_eq!(trade.size, 7, "Wrong size");
assert_eq!(trade.exchange, "NASDAQ", "Wrong exchange");
} else {
panic!("Expected trade, got {:?}", trade);
}
assert_eq!(trade.price, 3895.25, "Wrong price");
assert_eq!(trade.size, 7, "Wrong size");
assert_eq!(trade.exchange, "NASDAQ", "Wrong exchange");

// Verify request message uses "Last" instead of "AllLast"
let request_messages = client.message_bus.request_messages();
Expand Down
Loading

0 comments on commit 6a048d7

Please sign in to comment.