Skip to content

Commit

Permalink
workers: temporarily subscribe to coinbase heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
sehyunc committed Oct 21, 2023
1 parent ad543ce commit e63c032
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
1 change: 1 addition & 0 deletions workers/price-reporter/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub async fn connect_exchange(
config: &PriceReporterManagerConfig,
exchange: Exchange,
) -> Result<Box<dyn ExchangeConnection>, ExchangeConnectionError> {
dbg!("Connecting to exchange: ", exchange);
let base_token = base_token.clone();
let quote_token = quote_token.clone();

Expand Down
49 changes: 49 additions & 0 deletions workers/price-reporter/src/exchange/coinbase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@ impl CoinbaseConnection {
}

/// Construct the websocket subscription message with HMAC authentication
fn construct_hearthbeats_subscribe_message(
base_token: &Token,
quote_token: &Token,
api_key: &str,
api_secret: &str,
) -> String {
let base_ticker = base_token.get_exchange_ticker(Exchange::Coinbase);
let quote_ticker = quote_token.get_exchange_ticker(Exchange::Coinbase);
let product_ids = format!("{}-{}", base_ticker, quote_ticker);

// Authenticate the request with the API key
let channel = "heartbeats";
let timestamp = get_current_time_seconds().to_string();
let signature_bytes = HMAC::mac(
format!("{}{}{}", timestamp, channel, product_ids),
api_secret,
);

let signature = hex::encode(signature_bytes);
json!({
"type": "subscribe",
"product_ids": [ product_ids ],
"channel": channel,
"api_key": api_key,
"timestamp": timestamp,
"signature": signature,
})
.to_string()
}

/// Construct the websocket subscription message to the `heartbeats` with HMAC authentication
fn construct_subscribe_message(
base_token: &Token,
quote_token: &Token,
Expand Down Expand Up @@ -211,12 +242,30 @@ impl ExchangeConnection for CoinbaseConnection {
let authenticated_subscribe_msg =
Self::construct_subscribe_message(&base_token, &quote_token, &api_key, &api_secret);

let authenticated_hearthbeats_subscribe_msg = Self::construct_hearthbeats_subscribe_message(
&base_token,
&quote_token,
&api_key,
&api_secret,
);

// Setup the topic subscription
writer
.send(Message::Text(authenticated_subscribe_msg))
.await
.map_err(|err| ExchangeConnectionError::ConnectionHangup(err.to_string()))?;

// Setup the topic subscription to `heartbeats` channel
dbg!("Subscribing to heartbeats");
writer
.send(Message::Text(authenticated_hearthbeats_subscribe_msg))
.await
.map_err(|err| {
ExchangeConnectionError::ConnectionHangup(
"Error sending heartbeat subscription message".to_string() + &err.to_string(),
)
})?;

// Map the stream of Coinbase messages to one of midpoint prices
let mapped_stream = read.filter_map(move |message| {
let mut order_book = CoinbaseOrderBookData::default();
Expand Down
3 changes: 3 additions & 0 deletions workers/price-reporter/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ impl ConnectionMuxer {
// Restart the connection
log::error!("Error streaming from {exchange}: {e}, restarting connection...");
let new_conn = self.retry_connection(exchange).await.unwrap();
dbg!("Reconnected to exchange: ", exchange);
stream_map.insert(exchange, new_conn);
dbg!("Stream map length: ", stream_map.len());
}

}
Expand Down Expand Up @@ -472,6 +474,7 @@ impl ConnectionMuxer {
&mut self,
exchange: Exchange,
) -> Result<Box<dyn ExchangeConnection>, ExchangeConnectionError> {
dbg!("Retrying connection to exchange: ", exchange);
// Increment the retry count and filter out old requests
let now = Instant::now();
let retry_timestamps = self.exchange_retries.entry(exchange).or_default();
Expand Down

0 comments on commit e63c032

Please sign in to comment.