Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a basic test to run the oracle #26

Merged
merged 9 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 209 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ reth-revm = { git = "https://github.com/paradigmxyz/reth" }
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth" }


loocapro marked this conversation as resolved.
Show resolved Hide resolved
# alloy
alloy-eips = { version = "0.5.4", default-features = false }
alloy-consensus = { version = "0.5.4", default-features = false }
Expand Down Expand Up @@ -69,3 +70,6 @@ eyre = "0.6"
# testing
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }



loocapro marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 7 additions & 4 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-tracing.workspace = true
reth.workspace = true
reth-tokio-util = { git = "https://github.com/paradigmxyz/reth" }
shekhirin marked this conversation as resolved.
Show resolved Hide resolved

# alloy
alloy-primitives.workspace = true
Expand All @@ -34,17 +35,19 @@ enr = "0.12"
# async
futures.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"
tokio-tungstenite = { version = "0.23", features = ["native-tls"] }
tokio.workspace = true
rand = "0.8.5"

thiserror = "1"
shekhirin marked this conversation as resolved.
Show resolved Hide resolved

# misc
clap = "4"
eyre.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror = "1"
uuid = "1.10.0"

[dev-dependencies]
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
160 changes: 139 additions & 21 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use futures::FutureExt;
use futures::{FutureExt, Stream};
use network::{proto::OracleProtoHandler, OracleNetwork};
use offchain_data::DataFeederStream;
use offchain_data::{DataFeederError, DataFeederStream, DataFeeds};
use oracle::Oracle;
use reth::chainspec::EthereumChainSpecParser;
use reth_exex::ExExContext;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;

mod cli_ext;
Expand All @@ -17,6 +19,35 @@ mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";

/// Helper function to start the oracle stack.
async fn start<Node: FullNodeComponents, D>(
ctx: ExExContext<Node>,
tcp_port: u16,
udp_port: u16,
data_feeder: D,
) -> eyre::Result<(Oracle<Node, D>, Node::Network)>
where
Node::Network: NetworkProtocols,
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
// Define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
// Add it to the network as a subprotocol
let net = ctx.network().clone();
net.add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

// The instance of the execution extension that will handle chain events
let exex = ExEx::new(ctx);

// The instance of the oracle network that will handle discovery and gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;

// The oracle instance that will orchestrate the network, the execution extensions,
// the off-chain data stream, and the gossiping
let oracle = Oracle::new(exex, network, data_feeder, to_peers);
Ok((oracle, net.clone()))
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::<EthereumChainSpecParser, OracleExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
Expand All @@ -36,26 +67,8 @@ fn main() -> eyre::Result<()> {
// Source: https://github.com/vados-cosmonic/wasmCloud/commit/440e8c377f6b02f45eacb02692e4d2fabd53a0ec
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
// define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
// add it to the network as a subprotocol
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

// the instance of the execution extension that will handle chain events
let exex = ExEx::new(ctx);

// the instance of the oracle network that will handle discovery and
// gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
// the off-chain data feed stream
let data_feed = DataFeederStream::new(args.binance_symbols).await?;

// the oracle instance that will orchestrate the network, the execution
// extensions, the offchain data stream and the
// gossiping the oracle will always sign and
// broadcast data via the channel until a peer is
// subcribed to it
let oracle = Oracle::new(exex, network, data_feed, to_peers);
let (oracle, _net) = start(ctx, tcp_port, udp_port, data_feed).await?;
Ok(oracle)
})
})
Expand All @@ -67,3 +80,108 @@ fn main() -> eyre::Result<()> {
handle.wait_for_node_exit().await
})
}

#[cfg(test)]
mod tests {
use crate::{offchain_data::binance::ticker::Ticker, start};
use futures::{Stream, StreamExt};
use reth_exex_test_utils::test_exex_context;
use reth_network::{NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers};
use reth_network_api::PeerId;
use reth_tokio_util::EventStream;
use reth_tracing::tracing::info;
use tokio_stream::wrappers::BroadcastStream;

async fn wait_for_session(mut events: EventStream<NetworkEvent>) -> PeerId {
while let Some(event) = events.next().await {
if let NetworkEvent::SessionEstablished { peer_id, .. } = event {
info!("Session established with {}", peer_id);
return peer_id;
}
info!("Unexpected event: {:?}", event);
}

unreachable!()
}

use crate::offchain_data::{DataFeederError, DataFeeds};
use futures::stream::{self};
use std::pin::Pin;

fn mock_stream() -> Pin<Box<dyn Stream<Item = Result<DataFeeds, DataFeederError>> + Send>> {
let ticker = Ticker {
event_type: "24hrTicker".to_string(),
event_time: 1698323450000,
symbol: "BTCUSDT".to_string(),
price_change: "100.00".to_string(),
price_change_percent: "2.5".to_string(),
weighted_avg_price: "40200.00".to_string(),
prev_close_price: "40000.00".to_string(),
last_price: "40100.00".to_string(),
last_quantity: "0.5".to_string(),
best_bid_price: "40095.00".to_string(),
best_bid_quantity: "1.0".to_string(),
best_ask_price: "40105.00".to_string(),
best_ask_quantity: "1.0".to_string(),
open_price: "39900.00".to_string(),
high_price: "40500.00".to_string(),
low_price: "39800.00".to_string(),
volume: "1500".to_string(),
quote_volume: "60000000".to_string(),
open_time: 1698237050000,
close_time: 1698323450000,
first_trade_id: 1,
last_trade_id: 2000,
num_trades: 2000,
};

// Wrap the Ticker in DataFeeds::Binance
let data_feed = DataFeeds::Binance(ticker);

// Create a stream that sends a single item and then ends, boxed and pinned
Box::pin(stream::once(async { Ok(data_feed) }))
}

#[tokio::test]
async fn e2e_oracles() {
reth_tracing::init_test_tracing();

// spawn first instance
let (ctx_1, _handle) = test_exex_context().await.unwrap();
let data_feed1 = mock_stream();
let (oracle_1, network_1) = start(ctx_1, 30303, 30304, data_feed1).await.unwrap();
let mut broadcast_stream_1 = BroadcastStream::new(oracle_1.signed_ticks().subscribe());
let signer_1 = oracle_1.signer().address();
tokio::spawn(oracle_1);
let net_1_events = network_1.event_listener();

// spawn second instance
let (ctx_2, _handle) = test_exex_context().await.unwrap();
let data_feed2 = mock_stream();
let (oracle_2, network_2) = start(ctx_2, 30305, 30306, data_feed2).await.unwrap();
let mut broadcast_stream_2 = BroadcastStream::new(oracle_2.signed_ticks().subscribe());
let signer_2 = oracle_2.signer().address();
tokio::spawn(oracle_2);
let net_2_events = network_2.event_listener();

// expect peers connected
let (peer_2, addr_2) = (network_2.peer_id(), network_2.local_addr());
network_1.add_peer(*peer_2, addr_2);
let expected_peer_2 = wait_for_session(net_1_events).await;
assert_eq!(expected_peer_2, *peer_2);

let (peer_1, addr_1) = (network_1.peer_id(), network_1.local_addr());
network_2.add_peer(*peer_1, addr_1);
let expected_peer_1 = wait_for_session(net_2_events).await;
assert_eq!(expected_peer_1, *peer_1);

// expect signed data
let signed_ticker_1 = broadcast_stream_1.next().await.unwrap().unwrap();
assert_eq!(signed_ticker_1.ticker.symbol, "BTCUSDT");
assert_eq!(signed_ticker_1.signer, signer_1);

let signed_ticker_2 = broadcast_stream_2.next().await.unwrap().unwrap();
assert_eq!(signed_ticker_2.ticker.symbol, "BTCUSDT");
assert_eq!(signed_ticker_2.signer, signer_2);
Comment on lines +178 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so far this tests that both instances received an item from the stream, but it doesn't test if they actually exchange messages as peers after connecting. Can we do that as well?

}
}
2 changes: 1 addition & 1 deletion oracle/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Future for OracleNetwork {
"Established connection, will start gossiping"
);
}
None => return Poll::Ready(Ok(())),
None => return Poll::Pending,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions oracle/src/network/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use reth_eth_wire::{
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_network_peers::PeerId;
use reth_tracing::tracing::trace;
use std::{
collections::HashMap,
pin::Pin,
Expand Down Expand Up @@ -59,6 +60,8 @@ impl Stream for OracleConnection {
}

if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) {
let signer = tick.signer;
trace!(target: "oracle::conn", ?signer, "Received signed tick data.");
return Poll::Ready(Some(
OracleProtoMessage::signed_ticker(Box::new(tick)).encoded(),
));
Expand Down Expand Up @@ -142,6 +145,7 @@ impl ConnectionHandler for OracleConnHandler {
.events
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
.ok();
trace!(target: "oracle::conn", "Connection established.");
OracleConnection {
conn,
initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping),
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/offchain_data/binance/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{ready, Stream, StreamExt};
use reth_tracing::tracing::error;
use reth_tracing::tracing::{error, trace};
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Stream for BinanceDataFeeder {
return Poll::Pending;
}
};

trace!(target: "oracle::binance", ?msg, "Received message");
Poll::Ready(Some(Ok(msg.data)))
}
Some(Err(e)) => {
Expand Down
42 changes: 32 additions & 10 deletions oracle/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{
exex::ExEx,
network::{proto::data::SignedTicker, OracleNetwork},
offchain_data::{DataFeederStream, DataFeeds},
offchain_data::{DataFeederError, DataFeeds},
};
use alloy_rlp::{BytesMut, Encodable};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info};
use reth_tracing::tracing::{error, info, trace};
use std::{
future::Future,
pin::Pin,
Expand All @@ -17,32 +17,53 @@ use std::{

/// The Oracle struct is a long running task that orchestrates discovery of new peers,
/// decoding data from chain events (ExEx) and gossiping it to peers.
pub(crate) struct Oracle<Node: FullNodeComponents> {
pub(crate) struct Oracle<Node: FullNodeComponents, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
/// The network task for this node.
/// It is composed by a discovery task and a sub protocol RLPx task.
network: OracleNetwork,
/// The execution extension task for this node.
exex: ExEx<Node>,
/// The offchain data feed stream.
data_feed: DataFeederStream,
data_feed: D,
/// The signer to sign the data feed.
signer: PrivateKeySigner,
/// Half of the broadcast channel to send data to connected peers.
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
}

impl<Node: FullNodeComponents> Oracle<Node> {
impl<Node: FullNodeComponents, D> Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
pub(crate) fn new(
exex: ExEx<Node>,
network: OracleNetwork,
data_feed: DataFeederStream,
data_feed: D,
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
) -> Self {
Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers }
}

/// Returns the signer used by the oracle.
#[allow(dead_code)]
pub(crate) fn signer(&self) -> &PrivateKeySigner {
&self.signer
}

/// Returns the signed ticker broadcast channel.
#[allow(dead_code)]
pub(crate) fn signed_ticks(&self) -> &tokio::sync::broadcast::Sender<SignedTicker> {
&self.to_peers
}
}

impl<Node: FullNodeComponents> Future for Oracle<Node> {
impl<Node: FullNodeComponents, D> Future for Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static + std::marker::Unpin,
{
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -74,8 +95,9 @@ impl<Node: FullNodeComponents> Future for Oracle<Node> {
let signature = this.signer.sign_message_sync(&buffer)?;
let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address());

if let Err(err) = this.to_peers.send(signed_ticker.clone()) {
error!(?err, "Failed to send ticker to gossip, no peers connected");
if this.to_peers.send(signed_ticker.clone()).is_ok() {
let signer = signed_ticker.signer;
trace!(target: "oracle", ?signer, "Sent signed ticker");
}
}
Some(Err(e)) => {
Expand Down
Loading