Skip to content

Commit

Permalink
confgi
Browse files Browse the repository at this point in the history
  • Loading branch information
dndll committed Mar 25, 2024
1 parent f32adbe commit 8698e0f
Show file tree
Hide file tree
Showing 19 changed files with 483 additions and 437 deletions.
33 changes: 0 additions & 33 deletions Dockerfile

This file was deleted.

1 change: 1 addition & 0 deletions bin/operator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Config {
pub rpc: near_light_client_rpc::Config,
pub protocol: near_light_client_protocol::config::Config,
pub succinct: crate::succinct::Config,
pub engine: crate::engine::Config,
}

impl Configurable for Config {}
Expand Down
78 changes: 52 additions & 26 deletions bin/operator/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result};
use futures::FutureExt;
use hashbrown::{hash_map::DefaultHashBuilder, HashMap};
use near_light_client_rpc::{prelude::Itertools, TransactionOrReceiptId};
use near_light_clientx::VERIFY_AMT;
use near_light_clientx::config::bps_from_network;
use priority_queue::PriorityQueue;
use serde::{Deserialize, Serialize};
pub use types::RegistryInfo;
Expand All @@ -19,22 +19,43 @@ use crate::succinct::{

mod types;

// TODO: decide if we can try to identity hash based on ids, they're already
// hashed
// Collision <> receipt & tx?
// TODO[Optimisation]: decide if we can try to identity hash based on ids,
// they're already hashed, perhaps a collision would be if receipt_id ++ tx_id
// are the same, unlikely
type Queue = PriorityQueue<TransactionOrReceiptIdNewtype, PriorityWeight, DefaultHashBuilder>;

#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct Config {
drain_interval: u64,
sync_interval: u64,
cleanup_interval: u64,
persist_interval: u64,
}

impl Default for Config {
fn default() -> Self {
Config {
drain_interval: 1,
sync_interval: 60 * 30,
cleanup_interval: 60,
persist_interval: 30,
}
}
}

pub struct Engine {
registry: HashMap<usize, RegistryInfo>,
succinct_client: Arc<succinct::Client>,
// TODO: persist me
proving_queue: Queue,
batches: HashMap<u32, Option<ProofId>>,
request_info: HashMap<ProofId, Option<ProofStatus>>,
config: Config,
verify_amt: usize,
}

impl Engine {
pub fn new(succinct_client: Arc<succinct::Client>) -> Self {
pub fn new(config: &super::Config, succinct_client: Arc<succinct::Client>) -> Self {
log::info!("starting queue manager");

let state = PersistedState::try_from("state.json");
Expand All @@ -54,6 +75,8 @@ impl Engine {
.map(|s| s.batches.clone())
.unwrap_or_default(),
request_info: state.map(|s| s.request_info).unwrap_or_default(),
config: config.engine.clone(),
verify_amt: bps_from_network(&config.rpc.network),
}
}

Expand All @@ -70,43 +93,44 @@ impl Engine {
} else {
1
};
log::debug!("adding to {:?} with weight: {weight}", tx);
log::debug!("enqueuing {:?} with weight: {weight}", tx);
self.proving_queue.push(tx.into(), weight);
Ok(())
}

fn make_batch(&mut self) -> Option<(u32, Vec<TransactionOrReceiptId>)> {
if self.proving_queue.len() >= VERIFY_AMT {
let id = self.batches.len() as u32;
let mut txs = vec![];
for _ in 0..VERIFY_AMT {
let (req, _) = self.proving_queue.pop().unwrap();
txs.push(req.0);
}
self.batches.insert(id, None);
Some((id, txs))
} else {
None
if self.proving_queue.len() < self.verify_amt {
return None;
}
let id = self.batches.len() as u32;
let mut txs = vec![];
for _ in 0..self.verify_amt {
let (req, _) = self.proving_queue.pop()?;
txs.push(req.0);
}
self.batches.insert(id, None);
Some((id, txs))
}
}

impl Actor for Engine {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(1), |_, ctx| {
ctx.run_interval(Duration::from_secs(self.config.drain_interval), |_, ctx| {
ctx.address().do_send(Drain)
});
ctx.run_interval(Duration::from_secs(60 * 30), |_, ctx| {
ctx.run_interval(Duration::from_secs(self.config.sync_interval), |_, ctx| {
ctx.address().do_send(Sync)
});
ctx.run_interval(Duration::from_secs(60), |_, ctx| {
ctx.address().do_send(Cleanup)
});
ctx.run_interval(Duration::from_secs(60), |_, ctx| {
ctx.address().do_send(Persist)
});
ctx.run_interval(
Duration::from_secs(self.config.cleanup_interval),
|_, ctx| ctx.address().do_send(Cleanup),
);
ctx.run_interval(
Duration::from_secs(self.config.persist_interval),
|_, ctx| ctx.address().do_send(Persist),
);
}
}

Expand Down Expand Up @@ -311,6 +335,8 @@ mod tests {
use super::*;
use crate::succinct::tests::mocks;

const VERIFY_AMT: usize = 64;

async fn manager() -> Engine {
let client = mocks().await;
Engine::new(Arc::new(client))
Expand Down
48 changes: 48 additions & 0 deletions bin/operator/src/engine/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,51 @@ pub struct RegistryInfo {
// Their weight in the shared queue
pub weight: PriorityWeight,
}

#[cfg(test)]
mod tests {

use std::str::FromStr;

use near_light_client_protocol::near_account_id::AccountId;
use test_utils::CryptoHash;

use super::*;

#[test]
fn test_transaction_or_receipt_id_eq() {
let transaction1 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Transaction {
transaction_hash: CryptoHash::default(),
sender_id: AccountId::from_str("sender1").unwrap(),
});
let transaction2 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Transaction {
transaction_hash: CryptoHash::default(),
sender_id: AccountId::from_str("sender1").unwrap(),
});
assert!(transaction1 == transaction2);

let receipt1 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Receipt {
receipt_id: CryptoHash::default(),
receiver_id: AccountId::from_str("receiver1").unwrap(),
});
let receipt2 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Receipt {
receipt_id: CryptoHash::default(),
receiver_id: AccountId::from_str("receiver1").unwrap(),
});
assert!(receipt1 == receipt2);

let transaction3 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Transaction {
transaction_hash: CryptoHash::default(),
sender_id: AccountId::from_str("sender2").unwrap(),
});
assert!(transaction1 != transaction3);

let receipt3 = TransactionOrReceiptIdNewtype(TransactionOrReceiptId::Receipt {
receipt_id: CryptoHash::default(),
receiver_id: AccountId::from_str("receiver2").unwrap(),
});
assert!(receipt1 != receipt3);

assert!(transaction1 != receipt1);
}
}
2 changes: 1 addition & 1 deletion bin/operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub async fn main() -> anyhow::Result<()> {

let client = Arc::new(SuccinctClient::new(&config).await?);

let engine = Engine::new(client.clone()).start();
let engine = Engine::new(&config, client.clone()).start();

let server_handle = RpcServer::new(client, engine.clone()).run(&config).await?;

Expand Down
Loading

0 comments on commit 8698e0f

Please sign in to comment.