diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01343ff3b15..08f163dcc0a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4344,27 +4344,6 @@ impl BeaconChain { .op_pool .get_bls_to_execution_changes(&state, &self.spec); - // Iterate through the naive aggregation pool and ensure all the attestations from there - // are included in the operation pool. - let unagg_import_timer = - metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); - for attestation in self.naive_aggregation_pool.read().iter() { - let import = |attestation: &Attestation| { - let attesting_indices = get_attesting_indices_from_state(&state, attestation)?; - self.op_pool - .insert_attestation(attestation.clone(), attesting_indices) - }; - if let Err(e) = import(attestation) { - // Don't stop block production if there's an error, just create a log. - error!( - self.log, - "Attestation did not transfer to op pool"; - "reason" => ?e - ); - } - } - drop(unagg_import_timer); - // Override the beacon node's graffiti with graffiti from the validator, if present. let graffiti = match validator_graffiti { Some(graffiti) => graffiti, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 27bcc4d8a13..68b41a6c87d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1608,6 +1608,20 @@ pub fn serve( format!("Naive aggregation pool: {:?}", e), )); } + + if let Err(e) = chain.add_to_block_inclusion_pool(attestation) { + error!(log, + "Failure adding verified attestation to the operation pool"; + "error" => ?e, + "request_index" => index, + "committee_index" => committee_index, + "slot" => slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Operation pool: {:?}", e), + )); + } } if num_already_known > 0 { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index cde4da9ffcc..8de91566ea3 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -372,6 +372,16 @@ impl NetworkBeaconProcessor { ) } + if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) { + debug!( + self.log, + "Attestation invalid for op pool"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root, + ); + } + metrics::inc_counter( &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, ); diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 6b9c9a9e31f..b03f316ea8f 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -49,7 +49,8 @@ pub struct AttestationMap { #[derive(Debug, Default, PartialEq)] pub struct AttestationDataMap { pub aggregate_attestations: HashMap>>, - pub unaggregate_attestations: HashMap>>, + pub unaggregate_attestations: + HashMap>>, } impl SplitAttestation { @@ -202,12 +203,12 @@ impl AttestationMap { self.checkpoint_map .retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1); } - + pub fn get_attestation_map( &self, checkpoint_key: &CheckpointKey, ) -> Option<&AttestationDataMap> { - self.checkpoint_map.get(checkpoint_key) + self.checkpoint_map.get(checkpoint_key) } /// Statistics about all attestations stored in the map. diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 598026316d0..41b9a22d01d 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -12,7 +12,7 @@ mod sync_aggregate_id; pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use attestation::{earliest_attestation_validators, AttMaxCover}; -use attestation_storage::{CompactIndexedAttestation, CompactAttestationData, AttestationDataMap}; +use attestation_storage::{AttestationDataMap, CompactAttestationData, CompactIndexedAttestation}; pub use attestation_storage::{AttestationRef, SplitAttestation}; use bron_kerbosch::bron_kerbosch; pub use max_cover::MaxCover; @@ -241,7 +241,7 @@ impl OperationPool { AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) }) } - + #[allow(clippy::too_many_arguments)] fn get_clique_aggregate_attestations_for_epoch<'a>( &'a self, @@ -251,72 +251,76 @@ impl OperationPool { mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, num_valid: &mut i64, spec: &'a ChainSpec, - ) -> Vec<(&CompactAttestationData, CompactIndexedAttestation)> { + ) -> Vec<(&CompactAttestationData, CompactIndexedAttestation)> { let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation)> = vec![]; - if let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations }) - = all_attestations.get_attestation_map(checkpoint_key) { + if let Some(AttestationDataMap { + aggregate_attestations, + unaggregate_attestations, + }) = all_attestations.get_attestation_map(checkpoint_key) + { for (data, aggregates) in aggregate_attestations { if data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= data.slot + T::slots_per_epoch() { + && state.slot() <= data.slot + T::slots_per_epoch() + { let aggregates: Vec<&CompactIndexedAttestation> = aggregates .iter() - .filter(|indexed| validity_filter(&AttestationRef { + .filter(|indexed| { + validity_filter(&AttestationRef { checkpoint: checkpoint_key, data: &data, - indexed - })) + indexed, + }) + }) .collect(); *num_valid += aggregates.len() as i64; let cliques = bron_kerbosch(&aggregates, is_compatible); - + // This assumes that the values from bron_kerbosch are valid indices of // aggregates. - let mut clique_aggregates = cliques - .iter() - .map(|clique| { - let mut res_att = aggregates[clique[0]].clone(); - for ind in clique.iter().skip(1) { - res_att.aggregate(&aggregates[*ind]); - } - res_att - }); + let mut clique_aggregates = cliques.iter().map(|clique| { + let mut res_att = aggregates[clique[0]].clone(); + for ind in clique.iter().skip(1) { + res_att.aggregate(&aggregates[*ind]); + } + res_att + }); if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) { - for attestation in unaggregate_attestations - .iter() - .filter(|indexed| validity_filter(&AttestationRef { + for attestation in unaggregate_attestations.iter().filter(|indexed| { + validity_filter(&AttestationRef { checkpoint: checkpoint_key, data: &data, - indexed - })) - { + indexed, + }) + }) { *num_valid += 1; for mut clique_aggregate in &mut clique_aggregates { - if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) { + if !clique_aggregate + .attesting_indices + .contains(&attestation.attesting_indices[0]) + { clique_aggregate.aggregate(attestation); } } } } - cliqued_atts.extend( - clique_aggregates - .map(|indexed| (data, indexed)) - ); + cliqued_atts.extend(clique_aggregates.map(|indexed| (data, indexed))); } } for (data, attestations) in unaggregate_attestations { if data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= data.slot + T::slots_per_epoch() { + && state.slot() <= data.slot + T::slots_per_epoch() + { if !aggregate_attestations.contains_key(&data) { - let mut valid_attestations = attestations - .iter() - .filter(|indexed| validity_filter(&AttestationRef { + let mut valid_attestations = attestations.iter().filter(|indexed| { + validity_filter(&AttestationRef { checkpoint: checkpoint_key, data: &data, - indexed - })); + indexed, + }) + }); if let Some(first) = valid_attestations.next() { let mut agg = first.clone(); @@ -364,50 +368,51 @@ impl OperationPool { let mut num_prev_valid = 0_i64; let mut num_curr_valid = 0_i64; - let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key { + let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key { self.get_clique_aggregate_attestations_for_epoch( - &prev_epoch_key, - &*all_attestations, - state, - prev_epoch_validity_filter, - &mut num_prev_valid, - spec + &prev_epoch_key, + &*all_attestations, + state, + prev_epoch_validity_filter, + &mut num_prev_valid, + spec, ) } else { vec![] }; - let prev_epoch_cliqued_atts: Vec> = prev_cliqued_atts.iter() - .map(|(data, indexed)| AttestationRef { - checkpoint: &prev_epoch_key, - data, - indexed, - }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }) - .collect(); + let prev_epoch_cliqued_atts: Vec> = prev_cliqued_atts + .iter() + .map(|(data, indexed)| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + .collect(); let curr_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch( - &curr_epoch_key, - &*all_attestations, - state, - curr_epoch_validity_filter, - &mut num_curr_valid, - spec + &curr_epoch_key, + &*all_attestations, + state, + curr_epoch_validity_filter, + &mut num_curr_valid, + spec, ); - let curr_epoch_cliqued_atts: Vec> = curr_cliqued_atts.iter() - .map(|(data, indexed)| AttestationRef { - checkpoint: &prev_epoch_key, - data, - indexed, - }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }) - .collect(); - + let curr_epoch_cliqued_atts: Vec> = curr_cliqued_atts + .iter() + .map(|(data, indexed)| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + .collect(); let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min( @@ -426,7 +431,11 @@ impl OperationPool { if prev_epoch_key == curr_epoch_key { vec![] } else { - maximum_cover(prev_epoch_cliqued_atts, prev_epoch_limit, "prev_epoch_attestations") + maximum_cover( + prev_epoch_cliqued_atts, + prev_epoch_limit, + "prev_epoch_attestations", + ) } }, move || { @@ -822,13 +831,15 @@ impl OperationPool { } } -fn is_compatible(x: &&CompactIndexedAttestation, y: &&CompactIndexedAttestation) -> bool { +fn is_compatible( + x: &&CompactIndexedAttestation, + y: &&CompactIndexedAttestation, +) -> bool { let x_attester_set: HashSet<_> = x.attesting_indices.iter().collect(); let y_attester_set: HashSet<_> = y.attesting_indices.iter().collect(); x_attester_set.is_disjoint(&y_attester_set) } - /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>( operations: I,