diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index c3dea3dbb40..e0d99a7a903 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -60,9 +60,9 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, - CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, - SignedAggregateAndProof, Slot, SubnetId, + attestation::SingleAttestation, Attestation, AttestationRef, BeaconCommittee, + BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, + IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; @@ -317,12 +317,23 @@ pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { attestation: AttestationRef<'a, T::EthSpec>, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, + validator_index: usize, } impl VerifiedUnaggregatedAttestation<'_, T> { pub fn into_indexed_attestation(self) -> IndexedAttestation { self.indexed_attestation } + + pub fn single_attestation(&self) -> SingleAttestation { + // TODO(single-attestation) unwrap + SingleAttestation { + committee_index: self.attestation.committee_index().unwrap_or(0) as usize, + attester_index: self.validator_index, + data: self.attestation.data().clone(), + signature: self.attestation.signature().clone(), + } + } } /// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive @@ -1035,6 +1046,7 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { attestation, indexed_attestation, subnet_id, + validator_index: validator_index as usize, }) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 80766d57b33..bafa9115722 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2034,9 +2034,18 @@ impl BeaconChain { // This method is called for API and gossip attestations, so this covers all unaggregated attestation events if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_attestation_subscribers() { - event_handler.register(EventKind::Attestation(Box::new( - v.attestation().clone_as_attestation(), - ))); + let current_fork = self + .spec + .fork_name_at_slot::(v.attestation().data().slot); + if current_fork.electra_enabled() { + event_handler.register(EventKind::SingleAttestation(Box::new( + v.single_attestation(), + ))); + } else { + event_handler.register(EventKind::Attestation(Box::new( + v.attestation().clone_as_attestation(), + ))); + } } } metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 267d56220c9..f8ee14425b0 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -90,6 +90,10 @@ impl ServerSentEventHandler { .attestation_tx .send(kind) .map(|count| log_count("attestation", count)), + EventKind::SingleAttestation(_) => self + .attestation_tx + .send(kind) + .map(|count| log_count("attestation", count)), EventKind::Block(_) => self .block_tx .send(kind) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fe05f55a01a..b257ae18f85 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -86,7 +86,7 @@ use types::{ ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SyncCommitteeMessage, SyncContributionData, attestation::SingleAttestation }; use validator::pubkey_to_validator_index; use version::{ @@ -1831,21 +1831,26 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); + let beacon_pool_path_v2 = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()); + // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path_any + let post_beacon_pool_attestations = beacon_pool_path .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) - .and(reprocess_send_filter) + .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( // V1 and V2 are identical except V2 has a consensus version header in the request. // We only require this header for SSZ deserialization, which isn't supported for // this endpoint presently. - |_endpoint_version: EndpointVersion, - task_spawner: TaskSpawner, + |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, @@ -1865,6 +1870,38 @@ pub fn serve( }, ); + let post_beacon_pool_attestations_v2 = beacon_pool_path_v2 + .clone() + .and(warp::path("attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .and(reprocess_send_filter) + .and(log_filter.clone()) + .then( + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |task_spawner: TaskSpawner, + chain: Arc>, + attestations: Vec, + network_tx: UnboundedSender>, + reprocess_tx: Option>, + log: Logger| async move { + let result = crate::publish_attestations::publish_single_attestations( + task_spawner, + chain, + attestations, + network_tx, + reprocess_tx, + log, + ) + .await + .map(|()| warp::reply::json(&())); + convert_rejection(result).await + }, + ); + // GET beacon/pool/attestations?committee_index,slot let get_beacon_pool_attestations = beacon_pool_path_any .clone() @@ -4732,6 +4769,7 @@ pub fn serve( .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) .uor(post_beacon_pool_attestations) + .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 00654765325..5decbebbebf 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -50,7 +50,7 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::Attestation; +use types::{attestation::SingleAttestation, Attestation, EthSpec}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] @@ -82,15 +82,43 @@ fn verify_and_publish_attestation( .verify_unaggregated_attestation_for_gossip(attestation, None) .map_err(Error::Validation)?; - // Publish. - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::Attestation(Box::new(( - attestation.subnet_id(), - attestation.attestation().clone_as_attestation(), - )))], - }) - .map_err(|_| Error::Publication)?; + match attestation.attestation() { + types::AttestationRef::Base(_) => { + // Publish. + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::Attestation(Box::new(( + attestation.subnet_id(), + attestation.attestation().clone_as_attestation(), + )))], + }) + .map_err(|_| Error::Publication)?; + } + types::AttestationRef::Electra(attn) => { + chain + .with_committee_cache( + attn.data.target.root, + attn.data.slot.epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committees = + committee_cache.get_beacon_committees_at_slot(attn.data.slot)?; + + let single_attestation = attn.to_single_attestation(&committees)?; + + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::SingleAttestation(Box::new(( + attestation.subnet_id(), + single_attestation, + )))], + }) + .map_err(|_| BeaconChainError::UnableToPublish)?; + Ok(()) + }, + ) + .map_err(|_| Error::Publication)?; + } + } // Notify the validator monitor. chain @@ -129,6 +157,48 @@ fn verify_and_publish_attestation( } } +pub async fn publish_single_attestations( + task_spawner: TaskSpawner, + chain: Arc>, + single_attestations: Vec, + network_tx: UnboundedSender>, + reprocess_send: Option>, + log: Logger, +) -> Result<(), warp::Rejection> { + let mut attestations = vec![]; + for single_attestation in single_attestations { + let attestation = chain.with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committees = + committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?; + + let attestation = single_attestation.to_attestation::(&committees)?; + + Ok(attestation) + }, + ); + + if let Ok(attestation) = attestation { + attestations.push(attestation); + } + } + + publish_attestations( + task_spawner, + chain, + attestations, + network_tx, + reprocess_send, + log, + ) + .await +} + pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 9f68278e284..ad95281d957 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -6,6 +6,7 @@ use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; +use types::attestation::SingleAttestation; use types::{ Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, @@ -29,6 +30,8 @@ pub enum PubsubMessage { AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. Attestation(Box<(SubnetId, Attestation)>), + /// Gossipsub message providing notification of a `SingleAttestation`` with its shard id. + SingleAttestation(Box<(SubnetId, SingleAttestation)>), /// Gossipsub message providing notification of a voluntary exit. VoluntaryExit(Box), /// Gossipsub message providing notification of a new proposer slashing. @@ -128,6 +131,9 @@ impl PubsubMessage { PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) } + PubsubMessage::SingleAttestation(attestation_data) => { + GossipKind::Attestation(attestation_data.0) + } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, @@ -411,6 +417,7 @@ impl PubsubMessage { PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), + PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(), PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(), PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(), PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(), @@ -455,6 +462,14 @@ impl std::fmt::Display for PubsubMessage { data.1.data().slot, data.1.committee_index(), ), + PubsubMessage::SingleAttestation(data) => write!( + f, + "SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}", + *data.0, + data.1.data.slot, + data.1.committee_index, + data.1.attester_index, + ), PubsubMessage::VoluntaryExit(_data) => write!(f, "Voluntary Exit"), PubsubMessage::ProposerSlashing(_data) => write!(f, "Proposer Slashing"), PubsubMessage::AttesterSlashing(_data) => write!(f, "Attester Slashing"), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d81d964e7cf..cd32ac9ad26 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,7 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; +use attestation::SingleAttestation; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; @@ -28,7 +29,7 @@ use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, }; use rand::prelude::SliceRandom; -use slog::{debug, error, trace, warn, Logger}; +use slog::{debug, error, info, trace, warn, Logger}; use slot_clock::ManualSlotClock; use std::path::PathBuf; use std::sync::Arc; @@ -84,6 +85,49 @@ impl NetworkBeaconProcessor { .map_err(Into::into) } + /// Create a new `Work` event for some `SingleAttestation`. + pub fn send_single_attestation( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + single_attestation: SingleAttestation, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + ) -> Result<(), Error> { + info!(self.log, "SENDING A SINGLE ATTESTATION"); + let result = self.chain.with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committees = + committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?; + + let attestation = single_attestation.to_attestation(&committees)?; + + Ok(self.send_unaggregated_attestation( + message_id.clone(), + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + )) + }, + ); + + match result { + Ok(result) => result, + Err(e) => { + warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e); + Ok(()) + } + } + } + /// Create a new `Work` event for some unaggregated attestation. pub fn send_unaggregated_attestation( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 0a99b6af0cf..d3da341e1c8 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -398,6 +398,17 @@ impl Router { timestamp_now(), ), ), + PubsubMessage::SingleAttestation(subnet_attestation) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor.send_single_attestation( + message_id, + peer_id, + subnet_attestation.1, + subnet_attestation.0, + should_process, + timestamp_now(), + ), + ), PubsubMessage::BeaconBlock(block) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_gossip_beacon_block( message_id, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c187399ebd7..b123dd51a4e 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -5,6 +5,7 @@ use crate::{ Error as ServerError, CONSENSUS_BLOCK_VALUE_HEADER, CONSENSUS_VERSION_HEADER, EXECUTION_PAYLOAD_BLINDED_HEADER, EXECUTION_PAYLOAD_VALUE_HEADER, }; +use attestation::SingleAttestation; use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use mediatype::{names, MediaType, MediaTypeList}; use reqwest::header::HeaderMap; @@ -1110,6 +1111,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes { #[serde(bound = "E: EthSpec", untagged)] pub enum EventKind { Attestation(Box>), + SingleAttestation(Box), Block(SseBlock), BlobSidecar(SseBlobSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), @@ -1136,6 +1138,7 @@ impl EventKind { EventKind::Block(_) => "block", EventKind::BlobSidecar(_) => "blob_sidecar", EventKind::Attestation(_) => "attestation", + EventKind::SingleAttestation(_) => "single_attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 190964736fe..59536cc6bf8 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,6 +1,6 @@ use crate::slot_data::SlotData; use crate::{test_utils::TestRandom, Hash256, Slot}; -use crate::{Checkpoint, ForkVersionDeserialize}; +use crate::{BeaconCommittee, Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; @@ -24,6 +24,7 @@ pub enum Error { IncorrectStateVariant, InvalidCommitteeLength, InvalidCommitteeIndex, + InvalidAggregationBit, } impl From for Error { @@ -350,6 +351,70 @@ impl AttestationElectra { Ok(()) } } + + pub fn get_aggregation_bits(&self) -> Vec { + self.aggregation_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } + + pub fn to_single_attestation( + &self, + committees: &[BeaconCommittee], + ) -> Result { + let committee_indices = self.get_committee_indices(); + + if committee_indices.len() != 1 { + return Err(Error::InvalidCommitteeLength); + } + + let aggregation_bits = self.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + return Err(Error::InvalidAggregationBit); + } + + let committee_index = *committee_indices + .first() + .ok_or(Error::InvalidCommitteeIndex)?; + + let aggregation_bit = *aggregation_bits + .first() + .ok_or(Error::InvalidAggregationBit)?; + + let beacon_committee = committees + .get(committee_index as usize) + .ok_or(Error::InvalidCommitteeIndex)?; + + let attester_indices = beacon_committee + .committee + .iter() + .enumerate() + .filter_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .collect::>(); + + if attester_indices.len() != 1 { + return Err(Error::InvalidAggregationBit); + }; + + let attester_index = *attester_indices + .first() + .ok_or(Error::InvalidAggregationBit)?; + + Ok(SingleAttestation { + committee_index: committee_index as usize, + attester_index, + data: self.data.clone(), + signature: self.signature.clone(), + }) + } } impl AttestationBase { @@ -428,6 +493,71 @@ impl SlotData for AttestationRef<'_, E> { } } +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Decode, + Encode, + TestRandom, + Derivative, + arbitrary::Arbitrary, + TreeHash, + PartialEq, +)] +pub struct SingleAttestation { + pub committee_index: usize, + pub attester_index: usize, + pub data: AttestationData, + pub signature: AggregateSignature, +} + +impl SingleAttestation { + pub fn to_attestation( + &self, + committees: &[BeaconCommittee], + ) -> Result, Error> { + let beacon_committee = committees + .get(self.committee_index) + .ok_or(Error::InvalidAggregationBit)?; + let aggregation_bits = beacon_committee + .committee + .iter() + .enumerate() + .filter_map(|(i, &validator_index)| { + if self.attester_index == validator_index { + return Some(i); + } + None + }) + .collect::>(); + + if aggregation_bits.len() != 1 { + return Err(Error::InvalidAggregationBit); + } + + let aggregation_bit = aggregation_bits.first().unwrap(); + + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(self.committee_index, true) + .map_err(|_| Error::InvalidCommitteeIndex)?; + + let mut aggregation_bits = BitList::with_capacity(beacon_committee.committee.len()) + .map_err(|_| Error::InvalidCommitteeLength)?; + + aggregation_bits.set(*aggregation_bit, true)?; + + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: self.data.clone(), + signature: self.signature.clone(), + })) + } +} + #[derive(Debug, Clone, Encode, Decode, PartialEq)] #[ssz(enum_behaviour = "union")] pub enum AttestationOnDisk {