diff --git a/Cargo.lock b/Cargo.lock index 05f7918d0..312cf7e80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4926,6 +4926,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", + "bytes", "chrono", "ethers-core", "ethers-providers", diff --git a/guests/eth-block/Cargo.lock b/guests/eth-block/Cargo.lock index 1a77541b6..1203d25e7 100644 --- a/guests/eth-block/Cargo.lock +++ b/guests/eth-block/Cargo.lock @@ -3741,6 +3741,7 @@ version = "0.1.0" dependencies = [ "alloy-sol-types", "anyhow", + "bytes", "chrono", "ethers-core", "ethers-providers", diff --git a/guests/op-block/Cargo.lock b/guests/op-block/Cargo.lock index a10f13769..2d894d2ad 100644 --- a/guests/op-block/Cargo.lock +++ b/guests/op-block/Cargo.lock @@ -3741,6 +3741,7 @@ version = "0.1.0" dependencies = [ "alloy-sol-types", "anyhow", + "bytes", "chrono", "ethers-core", "ethers-providers", diff --git a/guests/op-derive/Cargo.lock b/guests/op-derive/Cargo.lock index 0a73d81ab..c485993e5 100644 --- a/guests/op-derive/Cargo.lock +++ b/guests/op-derive/Cargo.lock @@ -3706,6 +3706,7 @@ version = "0.1.0" dependencies = [ "alloy-sol-types", "anyhow", + "bytes", "chrono", "ethers-core", "ethers-providers", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 82a1058dd..1a143850b 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = { version = "1.0", default-features = false } alloy-sol-types = { version = "0.4", default-features = false, optional = true } +bytes = "1.5" ethers-core = { version = "2.0", features = ["optimism"] } hashbrown = { workspace = true } once_cell = { version = "1.18", default-features = false } diff --git a/lib/src/optimism/batcher_channel.rs b/lib/src/optimism/batcher_channel.rs index da11e705d..05838d1bd 100644 --- a/lib/src/optimism/batcher_channel.rs +++ b/lib/src/optimism/batcher_channel.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::VecDeque, io::Read}; +use std::{ + collections::{BTreeMap, VecDeque}, + io::Read, +}; -use anyhow::{ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; +use bytes::Buf; use libflate::zlib::Decoder; use zeth_primitives::{ batch::Batch, - rlp::{Decodable, Header}, + rlp::Decodable, transactions::{ethereum::EthereumTxEssence, Transaction, TxEssence}, Address, BlockNumber, }; @@ -65,12 +69,15 @@ impl BatcherChannels { continue; } - for frame in Frame::process_l1_transaction(&tx.essence)? { + #[cfg(not(target_os = "zkvm"))] + log::debug!("received batcher tx: {}", tx.hash()); + + for frame in Frame::process_batcher_transaction(&tx.essence)? { #[cfg(not(target_os = "zkvm"))] log::debug!( "received frame: channel_id: {}, frame_number: {}, is_last: {}", frame.channel_id, - frame.frame_number, + frame.number, frame.is_last ); @@ -88,7 +95,7 @@ impl BatcherChannels { self.channels.remove(channel_index); } else { // Add frame to channel - channel.process_frame(frame); + channel.add_frame(frame).context("failed to add frame")?; } } else { // Create new channel. From the spec: @@ -103,14 +110,16 @@ impl BatcherChannels { // "After successfully inserting a new frame, the ChannelBank is pruned: channels // are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE." { - while self.total_frame_data_len() as u64 > self.max_channel_bank_size { - let _dropped_channel = self.channels.pop_front().unwrap(); + let mut total_size = self.total_size(); + while total_size as u64 > self.max_channel_bank_size { + let dropped_channel = self.channels.pop_front().unwrap(); + total_size -= dropped_channel.size; #[cfg(not(target_os = "zkvm"))] log::debug!( - "dropped channel: {} (frames_data_len: {})", - _dropped_channel.id, - _dropped_channel.frames_data_len + "pruned channel: {} (channel_size: {})", + dropped_channel.id, + dropped_channel.size ); } } @@ -136,90 +145,103 @@ impl BatcherChannels { self.batches.pop_front() } - fn total_frame_data_len(&self) -> usize { - let mut out = 0; - for channel in &self.channels { - out += channel.frames_data_len; - } - out + fn total_size(&self) -> usize { + self.channels.iter().map(|c| c.size).sum() } - fn channel_index(&self, channel_id: u128) -> Option { + fn channel_index(&self, channel_id: ChannelId) -> Option { self.channels.iter().position(|c| c.id == channel_id) } } -#[derive(Debug)] +/// A [ChannelId] is a unique identifier for a [Channel]. +type ChannelId = u128; + +/// A [Channel] is a set of batches that are split into at least one, but possibly +/// multiple frames. Frames are allowed to be ingested in any order. +#[derive(Clone, Debug, Default)] struct Channel { - id: u128, + /// The channel ID. + id: ChannelId, + /// The number of the L1 block that opened this channel. open_l1_block: u64, - // From the spec: - // "the sum of all buffered frame data of the channel, with an additional frame-overhead of - // 200 bytes per frame." - frames_data_len: usize, - frames: Vec, - expected_frames_len: Option, + /// The number of the frame that closes this channel. + close_frame_number: Option, + /// All frames belonging to this channel by their frame number. + frames: BTreeMap, + /// The estimated memory size, used to drop the channel if we have too much data. + size: usize, } impl Channel { const FRAME_OVERHEAD: usize = 200; + /// Creates a new channel from the given frame. fn new(open_l1_block: u64, frame: Frame) -> Self { - let expected_frames_len = if frame.is_last { - Some(frame.frame_number as usize + 1) - } else { - None - }; - - Self { + let mut channel = Self { id: frame.channel_id, open_l1_block, - frames_data_len: Self::FRAME_OVERHEAD + frame.frame_data.len(), - frames: vec![frame], - expected_frames_len, - } + close_frame_number: None, + frames: BTreeMap::new(), + size: 0, + }; + + // cannot fail for an empty channel + channel.add_frame(frame).unwrap(); + + channel } - fn contains(&self, frame_number: u16) -> bool { - self.frames - .iter() - .any(|existing_frame| existing_frame.frame_number == frame_number) + /// Returns true if the channel is closed, i.e. the closing frame has been received. + fn is_closed(&self) -> bool { + self.close_frame_number.is_some() } - fn process_frame(&mut self, frame: Frame) { + /// Returns true if the channel is ready to be read. + fn is_ready(&self) -> bool { // From the spec: - // "Duplicate frames (by frame number) for frames that have not been pruned from the - // channel-bank are dropped." - if self.contains(frame.frame_number) { - #[cfg(not(target_os = "zkvm"))] - log::debug!( - "channel {} dropping duplicate frame {}", - self.id, - frame.frame_number - ); + // "A channel is ready if: + // - The channel is closed + // - The channel has a contiguous sequence of frames until the closing frame" + matches!(self.close_frame_number, Some(n) if n as usize == self.frames.len() - 1) + } - return; + fn add_frame(&mut self, frame: Frame) -> Result<()> { + ensure!( + frame.channel_id == self.id, + "frame channel_id does not match channel id" + ); + if frame.is_last && self.is_closed() { + bail!("channel is already closed"); + } + ensure!( + !self.frames.contains_key(&frame.number), + "duplicate frame number" + ); + if let Some(close_frame_number) = self.close_frame_number { + ensure!( + frame.number < close_frame_number, + "frame number >= close_frame_number" + ); } // From the spec: - // "Duplicate closes (new frame is_last == 1, but the channel has already seen a closing - // frame and has not yet been pruned from the channel-bank) are dropped." + // "If a frame is closing any existing higher-numbered frames are removed from the + // channel." if frame.is_last { - if self.expected_frames_len.is_some() { - #[cfg(not(target_os = "zkvm"))] - log::debug!( - "channel {} dropping duplicate close-frame {}", - self.id, - frame.frame_number - ); - - return; - } - self.expected_frames_len = Some(frame.frame_number as usize + 1); + // mark channel as closed + self.close_frame_number = Some(frame.number); + // prune frames with a number higher than the closing frame and update size + self.frames + .split_off(&frame.number) + .values() + .for_each(|pruned| self.size -= Self::FRAME_OVERHEAD + pruned.data.len()); } - self.frames_data_len += Self::FRAME_OVERHEAD + frame.frame_data.len(); - self.frames.push(frame); + self.size += Self::FRAME_OVERHEAD + frame.data.len(); + self.frames.insert(frame.number, frame); + + Ok(()) } fn read_batches(&self, l1_block_number: BlockNumber) -> Result> { @@ -228,10 +250,8 @@ impl Channel { let mut batches = Vec::new(); while !channel_data.is_empty() { - let batch_data = Header::decode_bytes(&mut channel_data, false) - .context("failed to decode batch data")?; - - let mut batch = Batch::decode(&mut &batch_data[..])?; + let mut batch = + Batch::decode(&mut channel_data).context("failed to decode batch data")?; batch.inclusion_block_number = l1_block_number; batches.push(batch); @@ -240,19 +260,11 @@ impl Channel { Ok(batches) } - fn is_ready(&self) -> bool { - self.expected_frames_len == Some(self.frames.len()) - } - fn decompress(&self) -> Result> { let compressed = { let mut buf = Vec::new(); - - let mut sorted_frames: Vec<&Frame> = self.frames.iter().collect(); - sorted_frames.sort_by_key(|f| f.frame_number); - - for frame in sorted_frames { - buf.extend(&frame.frame_data) + for frame in self.frames.values() { + buf.extend(&frame.data) } buf @@ -274,55 +286,246 @@ impl Channel { } } +/// A [Frame] is a chunk of data belonging to a [Channel]. Batcher transactions carry one +/// or multiple frames. The reason to split a channel into frames is that a channel might +/// too large to include in a single batcher transaction. #[derive(Debug, Default, Clone)] struct Frame { - pub channel_id: u128, - pub frame_number: u16, - pub frame_data: Vec, + /// The channel ID this frame belongs to. + pub channel_id: ChannelId, + /// The index of this frame within the channel. + pub number: u16, + /// A sequence of bytes belonging to the channel. + pub data: Vec, + /// Whether this is the last frame of the channel. pub is_last: bool, } impl Frame { const HEADER_SIZE: usize = 22; - - pub fn process_l1_transaction(tx_essence: &EthereumTxEssence) -> Result> { - let (version, mut frame_data) = tx_essence.data().split_first().context("invalid data")?; + const MAX_FRAME_DATA_LENGTH: u32 = 1_000_000; + + /// Processes a batcher transaction and returns the list of contained frames. + pub fn process_batcher_transaction(tx_essence: &EthereumTxEssence) -> Result> { + let (version, mut rollup_payload) = tx_essence + .data() + .split_first() + .context("empty transaction data")?; ensure!(version == &0, "Invalid version: {}", version); let mut frames = Vec::new(); - while !frame_data.is_empty() { - let frame = Frame::parse(frame_data)?; - frame_data = { - let bytes_read = Self::HEADER_SIZE + frame.frame_data.len() + 1; - &frame_data[bytes_read..] - }; + while !rollup_payload.is_empty() { + // From the spec: + // "If any one frame fails to parse, the all frames in the transaction are rejected." + let frame = Frame::decode(&mut rollup_payload).context("invalid frame")?; frames.push(frame); } Ok(frames) } - fn parse(data: &[u8]) -> Result { - ensure!(Self::HEADER_SIZE < data.len(), "Insufficient frame data"); - - let channel_id = u128::from_be_bytes(data[0..16].try_into()?); - let frame_number = u16::from_be_bytes(data[16..18].try_into()?); - let frame_data_len = u32::from_be_bytes(data[18..22].try_into()?); + /// Decodes a [Frame] from the given buffer, advancing the buffer's position. + fn decode(buf: &mut &[u8]) -> Result { + ensure!(buf.remaining() > Self::HEADER_SIZE, "input too short"); - let frame_data_end = Self::HEADER_SIZE + frame_data_len as usize; - ensure!(frame_data_end <= data.len(), "frame_data_end too large"); - ensure!(data[frame_data_end] <= 1, "Invalid byte at frame_data_end"); + let channel_id = buf.get_u128(); + let frame_number = buf.get_u16(); + // From the spec: + // "frame_data_length is the length of frame_data in bytes. It is capped to 1,000,000." + let frame_data_length = buf.get_u32(); + ensure!( + frame_data_length <= Self::MAX_FRAME_DATA_LENGTH, + "frame_data_length too large" + ); + + let frame_data = buf + .get(..frame_data_length as usize) + .context("input too short")?; + buf.advance(frame_data_length as usize); - let frame_data = data[22..frame_data_end].to_vec(); - let is_last = data[frame_data_end] != 0; + // From the spec: + // "is_last is a single byte with a value of 1 if the frame is the last in the channel, + // 0 if there are frames in the channel. Any other value makes the frame invalid." + ensure!(buf.has_remaining(), "input too short"); + let is_last = match buf.get_u8() { + 0 => false, + 1 => true, + _ => bail!("invalid is_last value"), + }; - let frame = Self { + Ok(Self { channel_id, - frame_number, - frame_data, + number: frame_number, + data: frame_data.to_vec(), is_last, - }; + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // test vectors from https://github.com/ethereum-optimism/optimism/blob/711f33b4366f6cd268a265e7ed8ccb37085d86a2/op-node/rollup/derive/channel_test.go + mod channel { + use super::*; + + const CHANNEL_ID: ChannelId = 0xff; + + fn new_channel() -> Channel { + Channel { + id: CHANNEL_ID, + ..Default::default() + } + } + + #[test] + fn frame_validity() { + // wrong channel + { + let frame = Frame { + channel_id: 0xee, + ..Default::default() + }; + + let mut channel = new_channel(); + channel.add_frame(frame).unwrap_err(); + assert_eq!(channel.size, 0); + } + + // double close + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"four".to_vec(), + is_last: true, + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 1, + is_last: true, + ..Default::default() + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 204); + channel.add_frame(frame_b).unwrap_err(); + assert_eq!(channel.size, 204); + } + + // duplicate frame + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"four".to_vec(), + ..Default::default() + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"seven__".to_vec(), + ..Default::default() + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 204); + channel.add_frame(frame_b).unwrap_err(); + assert_eq!(channel.size, 204); + } - Ok(frame) + // duplicate closing frame + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"four".to_vec(), + is_last: true, + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"seven__".to_vec(), + is_last: true, + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 204); + channel.add_frame(frame_b).unwrap_err(); + assert_eq!(channel.size, 204); + } + + // frame past closing + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"four".to_vec(), + is_last: true, + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 10, + data: b"seven__".to_vec(), + ..Default::default() + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 204); + channel.add_frame(frame_b).unwrap_err(); + assert_eq!(channel.size, 204); + } + + // prune after close frame + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 10, + data: b"seven__".to_vec(), + is_last: false, + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 2, + data: b"four".to_vec(), + is_last: true, + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 207); + channel.add_frame(frame_b).unwrap(); + assert_eq!(channel.size, 204); + } + + // multiple valid frames + { + let frame_a = Frame { + channel_id: CHANNEL_ID, + number: 1, + data: b"seven__".to_vec(), + is_last: true, + }; + let frame_b = Frame { + channel_id: CHANNEL_ID, + number: 0, + data: b"four".to_vec(), + ..Default::default() + }; + + let mut channel = new_channel(); + channel.add_frame(frame_a).unwrap(); + assert_eq!(channel.size, 207); + assert_eq!(channel.is_ready(), false); + channel.add_frame(frame_b).unwrap(); + assert_eq!(channel.size, 411); + assert_eq!(channel.is_ready(), true); + } + } } } diff --git a/primitives/src/batch.rs b/primitives/src/batch.rs index 3ba271149..1028ba4b4 100644 --- a/primitives/src/batch.rs +++ b/primitives/src/batch.rs @@ -17,11 +17,20 @@ use std::cmp::Ordering; use alloy_primitives::{BlockNumber, Bytes, B256}; use alloy_rlp::{Decodable, Encodable}; use alloy_rlp_derive::{RlpDecodable, RlpEncodable}; -use bytes::Buf; +use serde::{Deserialize, Serialize}; pub type RawTransaction = Bytes; -#[derive(Debug, Clone, Eq, PartialEq, RlpEncodable, RlpDecodable)] +/// A batch contains information to build one Optimism block. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Batch { + pub inclusion_block_number: BlockNumber, + pub essence: BatchEssence, +} + +/// Represents the core details of a [Batch], specifically the portion that is derived +/// from the batcher transactions. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RlpEncodable, RlpDecodable)] pub struct BatchEssence { pub parent_hash: B256, pub epoch_num: u64, @@ -30,12 +39,6 @@ pub struct BatchEssence { pub transactions: Vec, } -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Batch { - pub inclusion_block_number: BlockNumber, - pub essence: BatchEssence, -} - impl PartialOrd for Batch { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -72,28 +75,64 @@ impl Batch { impl Encodable for Batch { #[inline] fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { - out.put_u8(0); + // wrap the RLP-essence inside a bytes payload + alloy_rlp::Header { + list: false, + payload_length: self.essence.length() + 1, + } + .encode(out); + out.put_u8(0x00); self.essence.encode(out); } #[inline] fn length(&self) -> usize { - self.essence.length() + 1 + let bytes_length = self.essence.length() + 1; + alloy_rlp::length_of_length(bytes_length) + bytes_length } } impl Decodable for Batch { fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { - match buf.first() { - Some(0) => { - buf.advance(1); - Ok(Self { - inclusion_block_number: 0, - essence: BatchEssence::decode(buf)?, - }) - } + let bytes = alloy_rlp::Header::decode_bytes(buf, false)?; + match bytes.split_first() { + Some((0, mut payload)) => Ok(Self { + inclusion_block_number: 0, + essence: BatchEssence::decode(&mut payload)?, + }), Some(_) => Err(alloy_rlp::Error::Custom("invalid version")), None => Err(alloy_rlp::Error::InputTooShort), } } } + +#[cfg(test)] +mod tests { + use hex_literal::hex; + use serde_json::json; + + use super::*; + + #[test] + fn rlp_roundtrip() { + let expected = hex!("b85000f84da0dbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3840109d8fea0438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec0410884647f5ea9c0"); + let batch: Batch = serde_json::from_value(json!({ + "inclusion_block_number": 0, + "essence": { + "parent_hash": "0xdbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3", + "epoch_num": 17422590, + "epoch_hash": "0x438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec04108", + "timestamp": 1686068905, + "transactions": [] + } + })) + .unwrap(); + + let encoded = alloy_rlp::encode(&batch); + assert_eq!(encoded.len(), batch.length()); + assert_eq!(encoded, expected); + + let decoded = Batch::decode(&mut &encoded[..]).unwrap(); + assert_eq!(batch, decoded); + } +}