diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 420c83081c7..36ca527b8b4 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -165,7 +165,7 @@ impl RpcBlock { let inner = if !custody_columns.is_empty() { RpcBlockInner::BlockAndCustodyColumns( block, - RuntimeVariableList::new(custody_columns, spec.number_of_columns)?, + RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)?, ) } else { RpcBlockInner::Block(block) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 72806a74d27..117b7deed16 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -116,21 +116,13 @@ impl DataAvailabilityChecker { spec: Arc, log: Logger, ) -> Result { - let custody_subnet_count = if import_all_data_columns { - spec.data_column_sidecar_subnet_count as usize - } else { - spec.custody_requirement as usize - }; - - let subnet_sampling_size = - std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize); - let sampling_column_count = - subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet()); + let custody_group_count = spec.custody_group_count(import_all_data_columns); + let sampling_size = spec.sampling_size(custody_group_count); let inner = DataAvailabilityCheckerInner::new( OVERFLOW_LRU_CAPACITY, store, - sampling_column_count, + sampling_size as usize, spec.clone(), )?; Ok(Self { @@ -147,7 +139,7 @@ impl DataAvailabilityChecker { } pub(crate) fn is_supernode(&self) -> bool { - self.get_sampling_column_count() == self.spec.number_of_columns + self.get_sampling_column_count() == self.spec.number_of_columns as usize } /// Checks if the block root is currenlty in the availability cache awaiting import because @@ -424,7 +416,7 @@ impl DataAvailabilityChecker { .map(CustodyDataColumn::into_inner) .collect::>(); let all_data_columns = - RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns); + RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns as usize); // verify kzg for all data columns at once if !all_data_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 40361574aff..4bbe7b27e37 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -546,7 +546,7 @@ impl DataAvailabilityCheckerInner { // If we're sampling all columns, it means we must be custodying all columns. let custody_column_count = self.sampling_column_count(); - let total_column_count = self.spec.number_of_columns; + let total_column_count = self.spec.number_of_columns as usize; let received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { @@ -555,7 +555,7 @@ impl DataAvailabilityCheckerInner { if custody_column_count != total_column_count { return ReconstructColumnsDecision::No("not required for full node"); } - if received_column_count == self.spec.number_of_columns { + if received_column_count == total_column_count { return ReconstructColumnsDecision::No("all columns received"); } if received_column_count < total_column_count / 2 { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 6cfd26786aa..1bd17485ab6 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -423,7 +423,7 @@ fn verify_data_column_sidecar( data_column: &DataColumnSidecar, spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { - if data_column.index >= spec.number_of_columns as u64 { + if data_column.index >= spec.number_of_columns { return Err(GossipDataColumnError::InvalidColumnIndex(data_column.index)); } if data_column.kzg_commitments.is_empty() { @@ -611,7 +611,7 @@ fn verify_index_matches_subnet( spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { let expected_subnet: u64 = - DataColumnSubnetId::from_column_index::(data_column.index as usize, spec).into(); + DataColumnSubnetId::from_column_index(data_column.index, spec).into(); if expected_subnet != subnet { return Err(GossipDataColumnError::InvalidSubnetId { received: subnet, diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 1680c0298d1..280dceb82e1 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -192,7 +192,7 @@ fn build_data_column_sidecars( blob_cells_and_proofs_vec: Vec, spec: &ChainSpec, ) -> Result, String> { - let number_of_columns = spec.number_of_columns; + let number_of_columns = spec.number_of_columns as usize; let mut columns = vec![Vec::with_capacity(E::max_blobs_per_block()); number_of_columns]; let mut column_kzg_proofs = vec![Vec::with_capacity(E::max_blobs_per_block()); number_of_columns]; diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs index 53f8c71f54e..92eccce0f50 100644 --- a/beacon_node/beacon_chain/src/observed_data_sidecars.rs +++ b/beacon_node/beacon_chain/src/observed_data_sidecars.rs @@ -59,7 +59,7 @@ impl ObservableDataSidecar for DataColumnSidecar { } fn max_num_of_items(spec: &ChainSpec) -> usize { - spec.number_of_columns + spec.number_of_columns as usize } } diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index ce29480ffdb..9cf7e3f3682 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -25,8 +25,8 @@ pub const ETH2_ENR_KEY: &str = "eth2"; pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets"; /// The ENR field specifying the sync committee subnet bitfield. pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets"; -/// The ENR field specifying the peerdas custody subnet count. -pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc"; +/// The ENR field specifying the peerdas custody group count. +pub const PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY: &str = "cgc"; /// Extension trait for ENR's within Eth2. pub trait Eth2Enr { @@ -38,8 +38,8 @@ pub trait Eth2Enr { &self, ) -> Result, &'static str>; - /// The peerdas custody subnet count associated with the ENR. - fn custody_subnet_count(&self, spec: &ChainSpec) -> Result; + /// The peerdas custody group count associated with the ENR. + fn custody_group_count(&self, spec: &ChainSpec) -> Result; fn eth2(&self) -> Result; } @@ -67,16 +67,16 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } - fn custody_subnet_count(&self, spec: &ChainSpec) -> Result { - let csc = self - .get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) - .ok_or("ENR custody subnet count non-existent")? - .map_err(|_| "Could not decode the ENR custody subnet count")?; + fn custody_group_count(&self, spec: &ChainSpec) -> Result { + let cgc = self + .get_decodable::(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) + .ok_or("ENR custody group count non-existent")? + .map_err(|_| "Could not decode the ENR custody group count")?; - if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count { - Ok(csc) + if cgc >= spec.custody_requirement && cgc <= spec.number_of_custody_groups { + Ok(cgc) } else { - Err("Invalid custody subnet count in ENR") + Err("Invalid custody group count in ENR") } } @@ -253,14 +253,14 @@ pub fn build_enr( &bitfield.as_ssz_bytes().into(), ); - // only set `csc` if PeerDAS fork epoch has been scheduled + // only set `cgc` if PeerDAS fork epoch has been scheduled if spec.is_peer_das_scheduled() { - let custody_subnet_count = if config.subscribe_all_data_column_subnets { - spec.data_column_sidecar_subnet_count + let custody_group_count = if config.subscribe_all_data_column_subnets { + spec.number_of_custody_groups } else { spec.custody_requirement }; - builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count); + builder.add_value(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count); } builder @@ -287,11 +287,11 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { && (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4()) && (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6()) // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and - // PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will + // PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will // likely only be true for non-validating nodes. && local_enr.get_decodable::(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY) && local_enr.get_decodable::(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY) - && local_enr.get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + && local_enr.get_decodable::(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) } /// Loads enr from the given directory @@ -348,7 +348,7 @@ mod test { } #[test] - fn custody_subnet_count_default() { + fn custody_group_count_default() { let config = NetworkConfig { subscribe_all_data_column_subnets: false, ..NetworkConfig::default() @@ -358,13 +358,13 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec).unwrap(), + enr.custody_group_count::(&spec).unwrap(), spec.custody_requirement, ); } #[test] - fn custody_subnet_count_all() { + fn custody_group_count_all() { let config = NetworkConfig { subscribe_all_data_column_subnets: true, ..NetworkConfig::default() @@ -373,8 +373,8 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec).unwrap(), - spec.data_column_sidecar_subnet_count, + enr.custody_group_count::(&spec).unwrap(), + spec.number_of_custody_groups, ); } diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index 02ff0cc3ca4..5da6e3a477c 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -4,7 +4,8 @@ use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; use itertools::Itertools; use slog::trace; use std::ops::Deref; -use types::{ChainSpec, DataColumnSubnetId}; +use types::data_column_custody_group::compute_subnets_for_node; +use types::ChainSpec; /// Returns the predicate for a given subnet. pub fn subnet_predicate( @@ -37,13 +38,9 @@ where .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), Subnet::DataColumn(s) => { - if let Ok(custody_subnet_count) = enr.custody_subnet_count::(&spec) { - DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &spec, - ) - .map_or(false, |mut subnets| subnets.contains(s)) + if let Ok(custody_group_count) = enr.custody_group_count::(&spec) { + compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec) + .contains(s) } else { false } diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index cb9c007b91a..b36cb8075d1 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -93,11 +93,11 @@ pub static PEERS_PER_CLIENT: LazyLock> = LazyLock::new(|| { ) }); -pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock> = LazyLock::new(|| { +pub static PEERS_PER_CUSTODY_GROUP_COUNT: LazyLock> = LazyLock::new(|| { try_create_int_gauge_vec( - "peers_per_custody_subnet_count", - "The current count of peers by custody subnet count", - &["custody_subnet_count"], + "peers_per_custody_group_count", + "The current count of peers by custody group count", + &["custody_group_count"], ) }); diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 4df2566dacb..b6e8bb06ec6 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -34,6 +34,9 @@ pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::net::IpAddr; use strum::IntoEnumIterator; +use types::data_column_custody_group::{ + compute_subnets_from_custody_group, get_custody_groups, CustodyIndex, +}; pub mod config; mod network_behaviour; @@ -99,6 +102,8 @@ pub struct PeerManager { /// discovery queries for subnet peers if we disconnect from existing sync /// committee subnet peers. sync_committee_subnets: HashMap, + /// A mapping of all custody groups to column subnets to avoid re-computation. + subnets_by_custody_group: HashMap>, /// The heartbeat interval to perform routine maintenance. heartbeat: tokio::time::Interval, /// Keeps track of whether the discovery service is enabled or not. @@ -158,6 +163,16 @@ impl PeerManager { // Set up the peer manager heartbeat interval let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL)); + // Compute subnets for all custody groups + let subnets_by_custody_group = (0..network_globals.spec.number_of_custody_groups) + .map(|custody_index| { + let subnets = + compute_subnets_from_custody_group(custody_index, &network_globals.spec) + .collect(); + (custody_index, subnets) + }) + .collect::>>(); + Ok(PeerManager { network_globals, events: SmallVec::new(), @@ -168,6 +183,7 @@ impl PeerManager { target_peers: target_peer_count, temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT), sync_committee_subnets: Default::default(), + subnets_by_custody_group, heartbeat, discovery_enabled, metrics_enabled, @@ -709,22 +725,39 @@ impl PeerManager { "peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number()); } - let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok(); + let custody_group_count_opt = meta_data.custody_group_count().copied().ok(); peer_info.set_meta_data(meta_data); if self.network_globals.spec.is_peer_das_scheduled() { // Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to // prioritize PeerDAS peers. - if let Some(custody_subnet_count) = custody_subnet_count_opt { - match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) { - Ok(custody_subnets) => { + if let Some(custody_group_count) = custody_group_count_opt { + match self.compute_peer_custody_groups(peer_id, custody_group_count) { + Ok(custody_groups) => { + let custody_subnets = custody_groups + .into_iter() + .flat_map(|custody_index| { + self.subnets_by_custody_group + .get(&custody_index) + .cloned() + .unwrap_or_else(|| { + warn!( + self.log, + "Custody group not found in subnet mapping"; + "custody_index" => custody_index, + "peer_id" => %peer_id + ); + vec![] + }) + }) + .collect(); peer_info.set_custody_subnets(custody_subnets); } Err(err) => { - debug!(self.log, "Unable to compute peer custody subnets from metadata"; + debug!(self.log, "Unable to compute peer custody groups from metadata"; "info" => "Sending goodbye to peer", "peer_id" => %peer_id, - "custody_subnet_count" => custody_subnet_count, + "custody_group_count" => custody_group_count, "error" => ?err, ); invalid_meta_data = true; @@ -1308,7 +1341,7 @@ impl PeerManager { let mut peers_connected = 0; let mut clients_per_peer = HashMap::new(); let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new(); - let mut peers_per_custody_subnet_count: HashMap = HashMap::new(); + let mut peers_per_custody_group_count: HashMap = HashMap::new(); for (_, peer_info) in self.network_globals.peers.read().connected_peers() { peers_connected += 1; @@ -1341,8 +1374,8 @@ impl PeerManager { .or_default() += 1; if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() { - *peers_per_custody_subnet_count - .entry(meta_data.custody_subnet_count) + *peers_per_custody_group_count + .entry(meta_data.custody_group_count) .or_default() += 1; } } @@ -1350,11 +1383,11 @@ impl PeerManager { // PEERS_CONNECTED metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected); - // CUSTODY_SUBNET_COUNT - for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() { + // CUSTODY_GROUP_COUNT + for (custody_group_count, peer_count) in peers_per_custody_group_count.into_iter() { metrics::set_gauge_vec( - &metrics::PEERS_PER_CUSTODY_SUBNET_COUNT, - &[&custody_subnet_count.to_string()], + &metrics::PEERS_PER_CUSTODY_GROUP_COUNT, + &[&custody_group_count.to_string()], peer_count, ) } @@ -1383,43 +1416,23 @@ impl PeerManager { } } - fn compute_peer_custody_subnets( + fn compute_peer_custody_groups( &self, peer_id: &PeerId, - custody_subnet_count: u64, - ) -> Result, String> { + custody_group_count: u64, + ) -> Result, String> { // If we don't have a node id, we cannot compute the custody duties anyway let node_id = peer_id_to_node_id(peer_id)?; let spec = &self.network_globals.spec; - if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count) - .contains(&custody_subnet_count) + if !(spec.custody_requirement..=spec.number_of_custody_groups) + .contains(&custody_group_count) { - return Err("Invalid custody subnet count in metadata: out of range".to_string()); + return Err("Invalid custody group count in metadata: out of range".to_string()); } - let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id.raw(), - custody_subnet_count, - spec, - ) - .map(|subnets| subnets.collect()) - .unwrap_or_else(|e| { - // This is an unreachable scenario unless there's a bug, as we've validated the csc - // just above. - error!( - self.log, - "Computing peer custody subnets failed unexpectedly"; - "info" => "Falling back to default custody requirement subnets", - "peer_id" => %peer_id, - "custody_subnet_count" => custody_subnet_count, - "error" => ?e - ); - DataColumnSubnetId::compute_custody_requirement_subnets::(node_id.raw(), spec) - .collect() - }); - - Ok(custody_subnets) + let custody_groups = get_custody_groups(node_id.raw(), custody_group_count, spec); + Ok(custody_groups) } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index d2effd4d037..a3e5158b52f 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,4 +1,4 @@ -use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; +use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; use crate::discovery::{peer_id_to_node_id, CombinedKey}; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId}; use itertools::Itertools; @@ -13,6 +13,7 @@ use std::{ fmt::Formatter, }; use sync_status::SyncStatus; +use types::data_column_custody_group::compute_subnets_for_node; use types::{ChainSpec, DataColumnSubnetId, EthSpec}; pub mod client; @@ -695,8 +696,8 @@ impl PeerDB { if supernode { enr.insert( - PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, - &spec.data_column_sidecar_subnet_count, + PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, + &spec.number_of_custody_groups, &enr_key, ) .expect("u64 can be encoded"); @@ -714,19 +715,14 @@ impl PeerDB { if supernode { let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); let all_subnets = (0..spec.data_column_sidecar_subnet_count) - .map(|csc| csc.into()) + .map(|subnet_id| subnet_id.into()) .collect(); peer_info.set_custody_subnets(all_subnets); } else { let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); - let subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id.raw(), - spec.custody_requirement, - spec, - ) - .expect("should compute custody subnets") - .collect(); + let subnets = + compute_subnets_for_node(node_id.raw(), spec.custody_requirement, spec).collect(); peer_info.set_custody_subnets(subnets); } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index ee8c27f474c..14117d0b942 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -89,7 +89,7 @@ impl PeerInfo { } /// Returns if the peer is subscribed to a given `Subnet` from the metadata attnets/syncnets field. - /// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_column_count` field or ENR `csc` field. + /// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_group_count` field or ENR `cgc` field. pub fn on_subnet_metadata(&self, subnet: &Subnet) -> bool { if let Some(meta_data) = &self.meta_data { match subnet { @@ -101,7 +101,9 @@ impl PeerInfo { .syncnets() .map_or(false, |s| s.get(**id as usize).unwrap_or(false)) } - Subnet::DataColumn(column) => return self.custody_subnets.contains(column), + Subnet::DataColumn(subnet_id) => { + return self.is_assigned_to_custody_subnet(subnet_id) + } } } false diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 5d86936d41d..536241a483d 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1111,7 +1111,7 @@ mod tests { seq_number: 1, attnets: EnrAttestationBitfield::::default(), syncnets: EnrSyncCommitteeBitfield::::default(), - custody_subnet_count: 1, + custody_group_count: 1, }) } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index bb8bfb0e206..71bea66e168 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -137,7 +137,7 @@ pub struct MetaData { #[superstruct(only(V2, V3))] pub syncnets: EnrSyncCommitteeBitfield, #[superstruct(only(V3))] - pub custody_subnet_count: u64, + pub custody_group_count: u64, } impl MetaData { @@ -180,13 +180,13 @@ impl MetaData { seq_number: metadata.seq_number, attnets: metadata.attnets.clone(), syncnets: Default::default(), - custody_subnet_count: spec.custody_requirement, + custody_group_count: spec.custody_requirement, }), MetaData::V2(metadata) => MetaData::V3(MetaDataV3 { seq_number: metadata.seq_number, attnets: metadata.attnets.clone(), syncnets: metadata.syncnets.clone(), - custody_subnet_count: spec.custody_requirement, + custody_group_count: spec.custody_requirement, }), md @ MetaData::V3(_) => md.clone(), } @@ -362,7 +362,7 @@ impl DataColumnsByRangeRequest { DataColumnsByRangeRequest { start_slot: 0, count: 0, - columns: vec![0; spec.number_of_columns], + columns: vec![0; spec.number_of_columns as usize], } .as_ssz_bytes() .len() diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index afcbfce1732..ad068ae3fcf 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -198,15 +198,12 @@ impl Network { )?; // Construct the metadata - let custody_subnet_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { - if config.subscribe_all_data_column_subnets { - ctx.chain_spec.data_column_sidecar_subnet_count - } else { - ctx.chain_spec.custody_requirement - } + let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { + ctx.chain_spec + .custody_group_count(config.subscribe_all_data_column_subnets) }); let meta_data = - utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log); + utils::load_or_build_metadata(&config.network_dir, custody_group_count, &log); let seq_number = *meta_data.seq_number(); let globals = NetworkGlobals::new( enr, diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 490928c08c3..7c80912361f 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -164,8 +164,8 @@ pub fn strip_peer_id(addr: &mut Multiaddr) { /// Load metadata from persisted file. Return default metadata if loading fails. pub fn load_or_build_metadata( - network_dir: &std::path::Path, - custody_subnet_count: Option, + network_dir: &Path, + custody_group_count_opt: Option, log: &slog::Logger, ) -> MetaData { // We load a V2 metadata version by default (regardless of current fork) @@ -216,12 +216,12 @@ pub fn load_or_build_metadata( }; // Wrap the MetaData - let meta_data = if let Some(custody_count) = custody_subnet_count { + let meta_data = if let Some(custody_group_count) = custody_group_count_opt { MetaData::V3(MetaDataV3 { attnets: meta_data.attnets, seq_number: meta_data.seq_number, syncnets: meta_data.syncnets, - custody_subnet_count: custody_count, + custody_group_count, }) } else { MetaData::V2(meta_data) @@ -283,8 +283,8 @@ pub(crate) fn save_metadata_to_disk( ) { let _ = std::fs::create_dir_all(dir); // We always store the metadata v2 to disk because - // custody_subnet_count parameter doesn't need to be persisted across runs. - // custody_subnet_count is what the user sets it for the current run. + // custody_group_count parameter doesn't need to be persisted across runs. + // custody_group_count is what the user sets it for the current run. // This is to prevent ugly branching logic when reading the metadata from disk. let metadata_bytes = metadata.metadata_v2().as_ssz_bytes(); match File::create(dir.join(METADATA_FILENAME)).and_then(|mut f| f.write_all(&metadata_bytes)) { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 92583b7b5d0..1117477e4ba 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -3,10 +3,12 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId}; -use itertools::Itertools; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; +use types::data_column_custody_group::{ + compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups, +}; use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; pub struct NetworkGlobals { @@ -27,8 +29,8 @@ pub struct NetworkGlobals { /// The current state of the backfill sync. pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. - pub sampling_subnets: Vec, - pub sampling_columns: Vec, + pub sampling_subnets: HashSet, + pub sampling_columns: HashSet, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -48,30 +50,27 @@ impl NetworkGlobals { let (sampling_subnets, sampling_columns) = if spec.is_peer_das_scheduled() { let node_id = enr.node_id().raw(); - let custody_subnet_count = local_metadata - .custody_subnet_count() + let custody_group_count = local_metadata + .custody_group_count() .copied() - .expect("custody subnet count must be set if PeerDAS is scheduled"); + .expect("custody group count must be set if PeerDAS is scheduled"); - let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot); + let sampling_size = spec.sampling_size(custody_group_count); + let custody_groups = get_custody_groups(node_id, sampling_size, &spec); - let sampling_subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id, - subnet_sampling_size, - &spec, - ) - .expect("sampling subnet count must be valid") - .collect::>(); + let sampling_subnets = custody_groups + .iter() + .flat_map(|custody_index| compute_subnets_from_custody_group(*custody_index, &spec)) + .collect::>(); - let sampling_columns = sampling_subnets + let sampling_columns = custody_groups .iter() - .flat_map(|subnet| subnet.columns::(&spec)) - .sorted() - .collect(); + .flat_map(|custody_index| compute_columns_for_custody_group(*custody_index, &spec)) + .collect::>(); (sampling_subnets, sampling_columns) } else { - (vec![], vec![]) + (HashSet::new(), HashSet::new()) }; NetworkGlobals { @@ -159,8 +158,8 @@ impl NetworkGlobals { pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec { self.peers .read() - .good_custody_subnet_peer(DataColumnSubnetId::from_column_index::( - column_index as usize, + .good_custody_subnet_peer(DataColumnSubnetId::from_column_index( + column_index, &self.spec, )) .cloned() @@ -178,7 +177,7 @@ impl NetworkGlobals { seq_number: 0, attnets: Default::default(), syncnets: Default::default(), - custody_subnet_count: spec.custody_requirement, + custody_group_count: spec.custody_requirement, }); Self::new_test_globals_with_metadata(trusted_peers, metadata, log, config, spec) } @@ -209,9 +208,9 @@ mod test { let mut spec = E::default_spec(); spec.eip7594_fork_epoch = Some(Epoch::new(0)); - let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; - let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot); - let metadata = get_metadata(custody_subnet_count); + let custody_group_count = spec.number_of_custody_groups / 2; + let subnet_sampling_size = spec.sampling_size(custody_group_count); + let metadata = get_metadata(custody_group_count); let config = Arc::new(NetworkConfig::default()); let globals = NetworkGlobals::::new_test_globals_with_metadata( @@ -233,9 +232,9 @@ mod test { let mut spec = E::default_spec(); spec.eip7594_fork_epoch = Some(Epoch::new(0)); - let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; - let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot); - let metadata = get_metadata(custody_subnet_count); + let custody_group_count = spec.number_of_custody_groups / 2; + let subnet_sampling_size = spec.sampling_size(custody_group_count); + let metadata = get_metadata(custody_group_count); let config = Arc::new(NetworkConfig::default()); let globals = NetworkGlobals::::new_test_globals_with_metadata( @@ -251,12 +250,12 @@ mod test { ); } - fn get_metadata(custody_subnet_count: u64) -> MetaData { + fn get_metadata(custody_group_count: u64) -> MetaData { MetaData::V3(MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets: Default::default(), - custody_subnet_count, + custody_group_count, }) } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d81d964e7cf..2d15d39c6fc 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1122,10 +1122,8 @@ impl NetworkBeaconProcessor { messages: columns .into_iter() .map(|d| { - let subnet = DataColumnSubnetId::from_column_index::( - d.index as usize, - &chain.spec, - ); + let subnet = + DataColumnSubnetId::from_column_index(d.index, &chain.spec); PubsubMessage::DataColumnSidecar(Box::new((subnet, d))) }) .collect(), @@ -1139,7 +1137,8 @@ impl NetworkBeaconProcessor { let blob_publication_batch_interval = chain.config.blob_publication_batch_interval; let blob_publication_batches = chain.config.blob_publication_batches; - let batch_size = chain.spec.number_of_columns / blob_publication_batches; + let number_of_columns = chain.spec.number_of_columns as usize; + let batch_size = number_of_columns / blob_publication_batches; let mut publish_count = 0usize; for batch in data_columns_to_publish.chunks(batch_size) { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b6b7b315f3f..814994de57e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -36,11 +36,12 @@ use requests::{ }; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_custody_group::{compute_columns_for_custody_group, CustodyIndex}; use types::{ BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, Slot, @@ -459,7 +460,7 @@ impl SyncNetworkContext { fn make_columns_by_range_requests( &self, request: BlocksByRangeRequest, - custody_indexes: &Vec, + custody_indexes: &HashSet, ) -> Result, RpcRequestSendError> { let mut peer_id_to_request_map = HashMap::new(); diff --git a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml index 1eca01bbeef..a38922db163 100644 --- a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml @@ -138,7 +138,8 @@ MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 16384 BLOB_SIDECAR_SUBNET_COUNT: 6 # DAS -CUSTODY_REQUIREMENT: 4 -DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 NUMBER_OF_COLUMNS: 128 -SAMPLES_PER_SLOT: 8 \ No newline at end of file +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 500555a2694..c10d31b0378 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -121,7 +121,8 @@ MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 16384 BLOB_SIDECAR_SUBNET_COUNT: 6 # DAS -CUSTODY_REQUIREMENT: 4 -DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 NUMBER_OF_COLUMNS: 128 -SAMPLES_PER_SLOT: 8 \ No newline at end of file +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 diff --git a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml index d67d77d3bea..50a8ad8b4e5 100644 --- a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml @@ -125,7 +125,8 @@ MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096 BLOB_SIDECAR_SUBNET_COUNT: 6 # DAS -CUSTODY_REQUIREMENT: 4 -DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 NUMBER_OF_COLUMNS: 128 -SAMPLES_PER_SLOT: 8 \ No newline at end of file +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 18591fecdcd..27060806ae1 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -147,7 +147,8 @@ MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096 BLOB_SIDECAR_SUBNET_COUNT: 6 # DAS -CUSTODY_REQUIREMENT: 4 -DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 NUMBER_OF_COLUMNS: 128 -SAMPLES_PER_SLOT: 8 \ No newline at end of file +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index b08a6180bf0..251a85bf51d 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -121,7 +121,8 @@ MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096 BLOB_SIDECAR_SUBNET_COUNT: 6 # DAS -CUSTODY_REQUIREMENT: 4 -DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 NUMBER_OF_COLUMNS: 128 -SAMPLES_PER_SLOT: 8 \ No newline at end of file +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 0b33a76ff19..c5e09b86a45 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -196,10 +196,11 @@ pub struct ChainSpec { * DAS params */ pub eip7594_fork_epoch: Option, - pub custody_requirement: u64, + pub number_of_columns: u64, + pub number_of_custody_groups: u64, pub data_column_sidecar_subnet_count: u64, - pub number_of_columns: usize, pub samples_per_slot: u64, + pub custody_requirement: u64, /* * Networking @@ -605,10 +606,24 @@ impl ChainSpec { } } - pub fn data_columns_per_subnet(&self) -> usize { - self.number_of_columns - .safe_div(self.data_column_sidecar_subnet_count as usize) - .expect("Subnet count must be greater than 0") + /// Returns the number of column sidecars to sample per slot. + pub fn sampling_size(&self, custody_group_count: u64) -> u64 { + let columns_per_custody_group = self + .number_of_columns + .safe_div(self.number_of_custody_groups) + .expect("Custody group count must be greater than 0"); + + let custody_column_count = columns_per_custody_group * custody_group_count; + + std::cmp::max(custody_column_count, self.samples_per_slot) + } + + pub fn custody_group_count(&self, is_supernode: bool) -> u64 { + if is_supernode { + self.number_of_custody_groups + } else { + self.custody_requirement + } } /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. @@ -808,10 +823,11 @@ impl ChainSpec { * DAS params */ eip7594_fork_epoch: None, - custody_requirement: 4, - data_column_sidecar_subnet_count: 128, number_of_columns: 128, + number_of_custody_groups: 128, + data_column_sidecar_subnet_count: 128, samples_per_slot: 8, + custody_requirement: 4, /* * Network specific @@ -1127,10 +1143,12 @@ impl ChainSpec { * DAS params */ eip7594_fork_epoch: None, - custody_requirement: 4, - data_column_sidecar_subnet_count: 128, number_of_columns: 128, + number_of_custody_groups: 128, + data_column_sidecar_subnet_count: 128, samples_per_slot: 8, + custody_requirement: 4, + /* * Network specific */ @@ -1360,18 +1378,21 @@ pub struct Config { #[serde(with = "serde_utils::quoted_u64")] max_per_epoch_activation_exit_churn_limit: u64, - #[serde(default = "default_custody_requirement")] + #[serde(default = "default_number_of_columns")] #[serde(with = "serde_utils::quoted_u64")] - custody_requirement: u64, + number_of_columns: u64, + #[serde(default = "default_number_of_custody_groups")] + #[serde(with = "serde_utils::quoted_u64")] + number_of_custody_groups: u64, #[serde(default = "default_data_column_sidecar_subnet_count")] #[serde(with = "serde_utils::quoted_u64")] data_column_sidecar_subnet_count: u64, - #[serde(default = "default_number_of_columns")] - #[serde(with = "serde_utils::quoted_u64")] - number_of_columns: u64, #[serde(default = "default_samples_per_slot")] #[serde(with = "serde_utils::quoted_u64")] samples_per_slot: u64, + #[serde(default = "default_custody_requirement")] + #[serde(with = "serde_utils::quoted_u64")] + custody_requirement: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1510,6 +1531,10 @@ const fn default_number_of_columns() -> u64 { 128 } +const fn default_number_of_custody_groups() -> u64 { + 128 +} + const fn default_samples_per_slot() -> u64 { 8 } @@ -1704,10 +1729,11 @@ impl Config { max_per_epoch_activation_exit_churn_limit: spec .max_per_epoch_activation_exit_churn_limit, - custody_requirement: spec.custody_requirement, + number_of_columns: spec.number_of_columns, + number_of_custody_groups: spec.number_of_custody_groups, data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, - number_of_columns: spec.number_of_columns as u64, samples_per_slot: spec.samples_per_slot, + custody_requirement: spec.custody_requirement, } } @@ -1777,10 +1803,11 @@ impl Config { min_per_epoch_churn_limit_electra, max_per_epoch_activation_exit_churn_limit, - custody_requirement, - data_column_sidecar_subnet_count, number_of_columns, + number_of_custody_groups, + data_column_sidecar_subnet_count, samples_per_slot, + custody_requirement, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -1854,10 +1881,11 @@ impl Config { max_request_data_column_sidecars, ), - custody_requirement, + number_of_columns, + number_of_custody_groups, data_column_sidecar_subnet_count, - number_of_columns: number_of_columns as usize, samples_per_slot, + custody_requirement, ..chain_spec.clone() }) diff --git a/consensus/types/src/data_column_custody_group.rs b/consensus/types/src/data_column_custody_group.rs new file mode 100644 index 00000000000..ecd8ef80b34 --- /dev/null +++ b/consensus/types/src/data_column_custody_group.rs @@ -0,0 +1,113 @@ +use crate::{ChainSpec, ColumnIndex, DataColumnSubnetId}; +use alloy_primitives::U256; +use itertools::Itertools; +use maplit::hashset; +use std::collections::HashSet; + +pub type CustodyIndex = u64; + +/// The `get_custody_groups` function is used to determine the custody groups that a node is +/// assigned to. +/// +/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups +pub fn get_custody_groups( + raw_node_id: [u8; 32], + custody_group_count: u64, + spec: &ChainSpec, +) -> HashSet { + // assert custody_group_count <= NUMBER_OF_CUSTODY_GROUPS + let mut custody_groups: HashSet = hashset![]; + let mut current_id = U256::from_be_slice(&raw_node_id); + while custody_groups.len() < custody_group_count as usize { + let mut node_id_bytes = [0u8; 32]; + node_id_bytes.copy_from_slice(current_id.as_le_slice()); + let hash = ethereum_hashing::hash_fixed(&node_id_bytes); + let hash_prefix: [u8; 8] = hash[0..8] + .try_into() + .expect("hash_fixed produces a 32 byte array"); + let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); + let custody_group = hash_prefix_u64 % spec.number_of_custody_groups; + custody_groups.insert(custody_group); + + if current_id == U256::MAX { + current_id = U256::ZERO + } + current_id += U256::from(1u64) + } + + // assert len(custody_groups) == len(set(custody_groups)) + custody_groups +} + +/// Returns the columns that are associated with a given custody group. +/// +/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#compute_columns_for_custody_group +pub fn compute_columns_for_custody_group( + custody_group: CustodyIndex, + spec: &ChainSpec, +) -> impl Iterator { + // assert custody_group < NUMBER_OF_CUSTODY_GROUPS + let number_of_custody_groups = spec.number_of_custody_groups; + let columns_per_group = spec.number_of_columns / number_of_custody_groups; + (0..columns_per_group).map(move |i| number_of_custody_groups * i + custody_group) +} + +pub fn compute_subnets_for_node( + raw_node_id: [u8; 32], + custody_group_count: u64, + spec: &ChainSpec, +) -> impl Iterator + '_ { + get_custody_groups(raw_node_id, custody_group_count, spec) + .into_iter() + .flat_map(|custody_group| compute_subnets_from_custody_group(custody_group, spec)) +} + +/// Returns the subnets that are associated with a given custody group. +pub fn compute_subnets_from_custody_group( + custody_group: CustodyIndex, + spec: &ChainSpec, +) -> impl Iterator + '_ { + compute_columns_for_custody_group(custody_group, spec) + .map(|column_index| DataColumnSubnetId::from_column_index(column_index, spec)) + .unique() +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_compute_columns_for_custody_group() { + let number_of_custody_groups = 64; + let spec = ChainSpec { + number_of_columns: 128, + number_of_custody_groups, + ..ChainSpec::mainnet() + }; + let columns_per_custody_group = spec.number_of_columns / number_of_custody_groups; + + for custody_group in 0..number_of_custody_groups { + let columns = + compute_columns_for_custody_group(custody_group, &spec).collect::>(); + assert_eq!(columns.len(), columns_per_custody_group as usize); + } + } + + #[test] + fn test_compute_subnets_from_custody_group() { + let spec = ChainSpec { + number_of_columns: 256, + number_of_custody_groups: 64, + data_column_sidecar_subnet_count: 128, + ..ChainSpec::mainnet() + }; + let subnets_per_custody_group = + spec.data_column_sidecar_subnet_count / spec.number_of_custody_groups; + + for custody_group in 0..spec.number_of_custody_groups { + let subnets = + compute_subnets_from_custody_group(custody_group, &spec).collect::>(); + assert_eq!(subnets.len(), subnets_per_custody_group as usize); + } + } +} diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index df61d711c19..5b3eef24ccc 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,11 +1,8 @@ //! Identifies each data column subnet by an integer identifier. use crate::data_column_sidecar::ColumnIndex; -use crate::{ChainSpec, EthSpec}; -use alloy_primitives::U256; -use itertools::Itertools; +use crate::ChainSpec; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; use std::fmt::{self, Display}; use std::ops::{Deref, DerefMut}; @@ -18,76 +15,14 @@ impl DataColumnSubnetId { id.into() } - pub fn from_column_index(column_index: usize, spec: &ChainSpec) -> Self { - (column_index - .safe_rem(spec.data_column_sidecar_subnet_count as usize) + pub fn from_column_index(column_index: ColumnIndex, spec: &ChainSpec) -> Self { + column_index + .safe_rem(spec.data_column_sidecar_subnet_count) .expect( "data_column_sidecar_subnet_count should never be zero if this function is called", - ) as u64) + ) .into() } - - #[allow(clippy::arithmetic_side_effects)] - pub fn columns(&self, spec: &ChainSpec) -> impl Iterator { - let subnet = self.0; - let data_column_sidecar_subnet = spec.data_column_sidecar_subnet_count; - let columns_per_subnet = spec.data_columns_per_subnet() as u64; - (0..columns_per_subnet).map(move |i| data_column_sidecar_subnet * i + subnet) - } - - /// Compute required subnets to subscribe to given the node id. - #[allow(clippy::arithmetic_side_effects)] - pub fn compute_custody_subnets( - raw_node_id: [u8; 32], - custody_subnet_count: u64, - spec: &ChainSpec, - ) -> Result, Error> { - if custody_subnet_count > spec.data_column_sidecar_subnet_count { - return Err(Error::InvalidCustodySubnetCount(custody_subnet_count)); - } - - let mut subnets: HashSet = HashSet::new(); - let mut current_id = U256::from_be_slice(&raw_node_id); - while (subnets.len() as u64) < custody_subnet_count { - let mut node_id_bytes = [0u8; 32]; - node_id_bytes.copy_from_slice(current_id.as_le_slice()); - let hash = ethereum_hashing::hash_fixed(&node_id_bytes); - let hash_prefix: [u8; 8] = hash[0..8] - .try_into() - .expect("hash_fixed produces a 32 byte array"); - let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); - let subnet = hash_prefix_u64 % spec.data_column_sidecar_subnet_count; - - if !subnets.contains(&subnet) { - subnets.insert(subnet); - } - - if current_id == U256::MAX { - current_id = U256::ZERO - } - current_id += U256::from(1u64) - } - Ok(subnets.into_iter().map(DataColumnSubnetId::new)) - } - - /// Compute the custody subnets for a given node id with the default `custody_requirement`. - /// This operation should be infallable, and empty iterator is returned if it fails unexpectedly. - pub fn compute_custody_requirement_subnets( - node_id: [u8; 32], - spec: &ChainSpec, - ) -> impl Iterator { - Self::compute_custody_subnets::(node_id, spec.custody_requirement, spec) - .expect("should compute default custody subnets") - } - - pub fn compute_custody_columns( - raw_node_id: [u8; 32], - custody_subnet_count: u64, - spec: &ChainSpec, - ) -> Result, Error> { - Self::compute_custody_subnets::(raw_node_id, custody_subnet_count, spec) - .map(|subnet| subnet.flat_map(|subnet| subnet.columns::(spec)).sorted()) - } } impl Display for DataColumnSubnetId { @@ -139,88 +74,3 @@ impl From for Error { Error::ArithError(e) } } - -#[cfg(test)] -mod test { - use crate::data_column_subnet_id::DataColumnSubnetId; - use crate::MainnetEthSpec; - use crate::Uint256; - use crate::{EthSpec, GnosisEthSpec, MinimalEthSpec}; - - type E = MainnetEthSpec; - - #[test] - fn test_compute_subnets_for_data_column() { - let spec = E::default_spec(); - let node_ids = [ - "0", - "88752428858350697756262172400162263450541348766581994718383409852729519486397", - "18732750322395381632951253735273868184515463718109267674920115648614659369468", - "27726842142488109545414954493849224833670205008410190955613662332153332462900", - "39755236029158558527862903296867805548949739810920318269566095185775868999998", - "31899136003441886988955119620035330314647133604576220223892254902004850516297", - "58579998103852084482416614330746509727562027284701078483890722833654510444626", - "28248042035542126088870192155378394518950310811868093527036637864276176517397", - "60930578857433095740782970114409273483106482059893286066493409689627770333527", - "103822458477361691467064888613019442068586830412598673713899771287914656699997", - ] - .into_iter() - .map(|v| Uint256::from_str_radix(v, 10).unwrap().to_be_bytes::<32>()) - .collect::>(); - - let custody_requirement = 4; - for node_id in node_ids { - let computed_subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id, - custody_requirement, - &spec, - ) - .unwrap(); - let computed_subnets: Vec<_> = computed_subnets.collect(); - - // the number of subnets is equal to the custody requirement - assert_eq!(computed_subnets.len() as u64, custody_requirement); - - let subnet_count = spec.data_column_sidecar_subnet_count; - for subnet in computed_subnets { - let columns: Vec<_> = subnet.columns::(&spec).collect(); - // the number of columns is equal to the specified number of columns per subnet - assert_eq!(columns.len(), spec.data_columns_per_subnet()); - - for pair in columns.windows(2) { - // each successive column index is offset by the number of subnets - assert_eq!(pair[1] - pair[0], subnet_count); - } - } - } - } - - #[test] - fn test_compute_custody_requirement_subnets_never_panics() { - let node_id = [1u8; 32]; - test_compute_custody_requirement_subnets_with_spec::(node_id); - test_compute_custody_requirement_subnets_with_spec::(node_id); - test_compute_custody_requirement_subnets_with_spec::(node_id); - } - - fn test_compute_custody_requirement_subnets_with_spec(node_id: [u8; 32]) { - let _ = DataColumnSubnetId::compute_custody_requirement_subnets::( - node_id, - &E::default_spec(), - ); - } - - #[test] - fn test_columns_subnet_conversion() { - let spec = E::default_spec(); - for subnet in 0..spec.data_column_sidecar_subnet_count { - let subnet_id = DataColumnSubnetId::new(subnet); - for column_index in subnet_id.columns::(&spec) { - assert_eq!( - subnet_id, - DataColumnSubnetId::from_column_index::(column_index as usize, &spec) - ); - } - } - } -} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index dd304c6296c..a81822ae81a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -104,6 +104,7 @@ pub mod slot_data; pub mod sqlite; pub mod blob_sidecar; +pub mod data_column_custody_group; pub mod data_column_sidecar; pub mod data_column_subnet_id; pub mod light_client_header;