Skip to content

Commit

Permalink
feat: add replica nodes for op server block fetch (#400)
Browse files Browse the repository at this point in the history
* feat: support server replicas

* reduce poller sleep time

* remove extra logs
  • Loading branch information
ncitron committed Oct 23, 2024
1 parent b635cdb commit 1d1b50b
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions opstack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ typenum.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
reqwest.workspace = true
url.workspace = true
futures.workspace = true

# consensus
alloy.workspace = true
Expand Down
13 changes: 12 additions & 1 deletion opstack/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use clap::Parser;
use eyre::Result;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use url::Url;

use helios_opstack::{
config::{Network, NetworkConfig},
Expand All @@ -19,8 +20,16 @@ async fn main() -> Result<()> {
let unsafe_signer = config.chain.unsafe_signer;
let server_addr = cli.server_address;
let gossip_addr = cli.gossip_address;
let replica_urls = cli.replica_urls.unwrap_or_default();

start_server(server_addr, gossip_addr, chain_id, unsafe_signer).await?;
start_server(
server_addr,
gossip_addr,
chain_id,
unsafe_signer,
replica_urls,
)
.await?;

Ok(())
}
Expand All @@ -46,4 +55,6 @@ struct Cli {
server_address: SocketAddr,
#[clap(short, long, default_value = "0.0.0.0:9876")]
gossip_address: SocketAddr,
#[clap(short, long, value_delimiter = ',')]
replica_urls: Option<Vec<Url>>,
}
29 changes: 25 additions & 4 deletions opstack/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,33 @@ use alloy::primitives::Address;
use axum::{extract::State, routing::get, Json, Router};
use eyre::Result;
use tokio::{
sync::{mpsc::Receiver, RwLock},
sync::{
mpsc::{channel, Receiver},
RwLock,
},
time::sleep,
};
use url::Url;

use crate::{types::ExecutionPayload, SequencerCommitment};

use self::net::{block_handler::BlockHandler, gossip::GossipService};

pub mod net;
mod poller;

pub async fn start_server(
server_addr: SocketAddr,
gossip_addr: SocketAddr,
chain_id: u64,
signer: Address,
replica_urls: Vec<Url>,
) -> Result<()> {
let state = Arc::new(RwLock::new(ServerState::new(
gossip_addr,
chain_id,
signer,
replica_urls,
)?));

let state_copy = state.clone();
Expand All @@ -36,6 +43,7 @@ pub async fn start_server(

let router = Router::new()
.route("/latest", get(latest_handler))
.route("/chain_id", get(chain_id_handler))
.with_state(state);

let listener = tokio::net::TcpListener::bind(server_addr).await?;
Expand All @@ -50,25 +58,38 @@ async fn latest_handler(
Json(state.read().await.latest_commitment.clone().map(|v| v.0))
}

async fn chain_id_handler(State(state): State<Arc<RwLock<ServerState>>>) -> Json<u64> {
Json(state.read().await.chain_id)
}

struct ServerState {
chain_id: u64,
commitment_recv: Receiver<SequencerCommitment>,
latest_commitment: Option<(SequencerCommitment, u64)>,
}

impl ServerState {
pub fn new(addr: SocketAddr, chain_id: u64, signer: Address) -> Result<Self> {
let (handler, commitment_recv) = BlockHandler::new(signer, chain_id);
pub fn new(
addr: SocketAddr,
chain_id: u64,
signer: Address,
replica_urls: Vec<Url>,
) -> Result<Self> {
let (send, commitment_recv) = channel(256);
poller::start(replica_urls, signer, chain_id, send.clone());
let handler = BlockHandler::new(signer, chain_id, send);
let gossip = GossipService::new(addr, chain_id, handler);
gossip.start()?;

Ok(Self {
chain_id,
commitment_recv,
latest_commitment: None,
})
}

pub fn update(&mut self) {
if let Ok(commitment) = self.commitment_recv.try_recv() {
while let Ok(commitment) = self.commitment_recv.try_recv() {
if let Ok(payload) = ExecutionPayload::try_from(&commitment) {
if self.is_latest_commitment(payload.block_number) {
tracing::info!("new commitment for block: {}", payload.block_number);
Expand Down
13 changes: 5 additions & 8 deletions opstack/src/server/net/block_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::primitives::Address;
use libp2p::gossipsub::{IdentTopic, Message, MessageAcceptance, TopicHash};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::Sender;

use crate::SequencerCommitment;

Expand All @@ -12,16 +12,13 @@ pub struct BlockHandler {
}

impl BlockHandler {
pub fn new(signer: Address, chain_id: u64) -> (Self, Receiver<SequencerCommitment>) {
let (send, recv) = channel(256);
let handler = Self {
pub fn new(signer: Address, chain_id: u64, sender: Sender<SequencerCommitment>) -> Self {
Self {
chain_id,
signer,
commitment_sender: send,
commitment_sender: sender,
blocks_v3_topic: IdentTopic::new(format!("/optimism/{}/2/blocks", chain_id)),
};

(handler, recv)
}
}

pub fn topics(&self) -> Vec<TopicHash> {
Expand Down
75 changes: 75 additions & 0 deletions opstack/src/server/poller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::time::Duration;

use alloy::primitives::Address;
use eyre::Result;
use futures::future::join_all;
use reqwest::{Client, ClientBuilder};
use tokio::{sync::mpsc::Sender, time::sleep};
use tracing::warn;
use url::Url;

use crate::SequencerCommitment;

pub fn start(urls: Vec<Url>, signer: Address, chain_id: u64, sender: Sender<SequencerCommitment>) {
tokio::spawn(async move {
let client = ClientBuilder::new()
.timeout(Duration::from_millis(500))
.build()
.unwrap();

let mut final_urls = Vec::new();
for url in urls {
if let Ok(replica_chain_id) = get_chain_id(&client, &url).await {
if chain_id == replica_chain_id {
final_urls.push(url);
} else {
warn!("received bad chain id from {}", url);
}
} else {
warn!("received no chain id from {}", url);
}
}

loop {
join_all(
final_urls
.iter()
.map(|url| get_commitment(&client, url, sender.clone(), signer, chain_id)),
)
.await;
sleep(Duration::from_millis(500)).await;
}
});
}

async fn get_commitment(
client: &Client,
url: &Url,
sender: Sender<SequencerCommitment>,
signer: Address,
chain_id: u64,
) -> Result<()> {
let commitment = client
.get(url.join("latest")?)
.send()
.await?
.json::<SequencerCommitment>()
.await?;

if commitment.verify(signer, chain_id).is_ok() {
sender.send(commitment).await?;
}

Ok(())
}

async fn get_chain_id(client: &Client, url: &Url) -> Result<u64> {
let chain_id = client
.get(url.join("chain_id")?)
.send()
.await?
.json::<u64>()
.await?;

Ok(chain_id)
}

0 comments on commit 1d1b50b

Please sign in to comment.