Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SingleAttestation refactor #6616

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
76 changes: 51 additions & 25 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?;
verify_attestation_propagation_slot_range::<T::SlotClock, T::EthSpec>(
&chain.slot_clock,
attestation.data().slot,
&chain.spec,
)?;

// Check the attestation's epoch matches its target.
if attestation.data().slot.epoch(T::EthSpec::slots_per_epoch())
Expand Down Expand Up @@ -538,7 +542,12 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
//
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
let head_block = verify_head_block_is_known(chain, attestation, None)?;
let head_block = verify_head_block_is_known(
chain,
attestation.data().beacon_block_root,
attestation.data().slot,
None,
)?;

// Check the attestation target root is consistent with the head root.
//
Expand All @@ -547,7 +556,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
//
// Whilst this attestation *technically* could be used to add value to a block, it is
// invalid in the spirit of the protocol. Here we choose safety over profit.
verify_attestation_target_root::<T::EthSpec>(&head_block, attestation)?;
verify_attestation_target_root::<T::EthSpec>(
&head_block,
attestation.data().target.root,
attestation.data().slot,
)?;

// Ensure that the attestation has participants.
if attestation.is_aggregation_bits_zero() {
Expand Down Expand Up @@ -807,7 +820,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?;
verify_attestation_propagation_slot_range::<T::SlotClock, T::EthSpec>(
&chain.slot_clock,
attestation.data().slot,
&chain.spec,
)?;

// Check to ensure that the attestation is "unaggregated". I.e., it has exactly one
// aggregation bit set.
Expand All @@ -823,11 +840,19 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> {
// attestation and do not delay consideration for later.
//
// Enforce a maximum skip distance for unaggregated attestations.
let head_block =
verify_head_block_is_known(chain, attestation, chain.config.import_max_skip_slots)?;
let head_block = verify_head_block_is_known(
chain,
attestation.data().beacon_block_root,
attestation.data().slot,
chain.config.import_max_skip_slots,
)?;

// Check the attestation target root is consistent with the head root.
verify_attestation_target_root::<T::EthSpec>(&head_block, attestation)?;
verify_attestation_target_root::<T::EthSpec>(
&head_block,
attestation.data().target.root,
attestation.data().slot,
)?;

Ok(())
}
Expand Down Expand Up @@ -1072,36 +1097,37 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
/// Case (1) is the exact thing we're trying to detect. However case (2) is a little different, but
/// it's still fine to reject here because there's no need for us to handle attestations that are
/// already finalized.
fn verify_head_block_is_known<T: BeaconChainTypes>(
pub fn verify_head_block_is_known<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
attestation: AttestationRef<T::EthSpec>,
attestation_beacon_block_root: Hash256,
attestation_slot: Slot,
max_skip_slots: Option<u64>,
) -> Result<ProtoBlock, Error> {
let block_opt = chain
.canonical_head
.fork_choice_read_lock()
.get_block(&attestation.data().beacon_block_root)
.get_block(&attestation_beacon_block_root)
.or_else(|| {
chain
.early_attester_cache
.get_proto_block(attestation.data().beacon_block_root)
.get_proto_block(attestation_beacon_block_root)
});

if let Some(block) = block_opt {
// Reject any block that exceeds our limit on skipped slots.
if let Some(max_skip_slots) = max_skip_slots {
if attestation.data().slot > block.slot + max_skip_slots {
if attestation_slot > block.slot + max_skip_slots {
return Err(Error::TooManySkippedSlots {
head_block_slot: block.slot,
attestation_slot: attestation.data().slot,
attestation_slot,
});
}
}

Ok(block)
} else if chain.is_pre_finalization_block(attestation.data().beacon_block_root)? {
} else if chain.is_pre_finalization_block(attestation_beacon_block_root)? {
Err(Error::HeadBlockFinalized {
beacon_block_root: attestation.data().beacon_block_root,
beacon_block_root: attestation_beacon_block_root,
})
} else {
// The block is either:
Expand All @@ -1111,21 +1137,20 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
// 2) A post-finalization block that we don't know about yet. We'll queue
// the attestation until the block becomes available (or we time out).
Err(Error::UnknownHeadBlock {
beacon_block_root: attestation.data().beacon_block_root,
beacon_block_root: attestation_beacon_block_root,
})
}
}

/// Verify that the `attestation` is within the acceptable gossip propagation range, with reference
/// Verify that the `attestation_slot` is within the acceptable gossip propagation range, with reference
/// to the current slot of the `chain`.
///
/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
pub fn verify_propagation_slot_range<S: SlotClock, E: EthSpec>(
pub fn verify_attestation_propagation_slot_range<S: SlotClock, E: EthSpec>(
slot_clock: &S,
attestation: AttestationRef<E>,
attestation_slot: Slot,
spec: &ChainSpec,
) -> Result<(), Error> {
let attestation_slot = attestation.data().slot;
let latest_permissible_slot = slot_clock
.now_with_future_tolerance(spec.maximum_gossip_clock_disparity())
.ok_or(BeaconChainError::UnableToReadSlot)?;
Expand Down Expand Up @@ -1203,11 +1228,12 @@ pub fn verify_attestation_signature<T: BeaconChainTypes>(
/// `attestation.data.beacon_block_root`.
pub fn verify_attestation_target_root<E: EthSpec>(
head_block: &ProtoBlock,
attestation: AttestationRef<E>,
attestation_target_root: Hash256,
attestation_slot: Slot,
) -> Result<(), Error> {
// Check the attestation target root.
let head_block_epoch = head_block.slot.epoch(E::slots_per_epoch());
let attestation_epoch = attestation.data().slot.epoch(E::slots_per_epoch());
let attestation_epoch = attestation_slot.epoch(E::slots_per_epoch());
if head_block_epoch > attestation_epoch {
// The epoch references an invalid head block from a future epoch.
//
Expand All @@ -1220,7 +1246,7 @@ pub fn verify_attestation_target_root<E: EthSpec>(
// Reference:
// https://github.com/ethereum/eth2.0-specs/pull/2001#issuecomment-699246659
return Err(Error::InvalidTargetRoot {
attestation: attestation.data().target.root,
attestation: attestation_target_root,
// It is not clear what root we should expect in this case, since the attestation is
// fundamentally invalid.
expected: None,
Expand All @@ -1239,9 +1265,9 @@ pub fn verify_attestation_target_root<E: EthSpec>(
};

// Reject any attestation with an invalid target root.
if target_root != attestation.data().target.root {
if target_root != attestation_target_root {
return Err(Error::InvalidTargetRoot {
attestation: attestation.data().target.root,
attestation: attestation_target_root,
expected: Some(target_root),
});
}
Expand Down
58 changes: 53 additions & 5 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_B
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::single_attestation_verification::SingleAttestationVerification;
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand All @@ -72,6 +73,7 @@ use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use attestation::SingleAttestation;
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes};
use execution_layer::{
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
Expand Down Expand Up @@ -2042,6 +2044,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

/// Accepts some `SingleAttestation` from the network and attempts to verify it, returning `Ok(_)` if
/// it is valid to be (re)broadcast on the gossip network.
pub fn verify_single_attestation_for_gossip(
&self,
single_attestation: &SingleAttestation,
subnet_id: Option<SubnetId>,
) -> Result<SingleAttestationVerification, AttestationError> {
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer =
metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);

let verify_attestation =
SingleAttestationVerification::verify(single_attestation, subnet_id, self)?;

// This method is called for API and gossip attestations, so this covers all unaggregated attestation events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_single_attestation_subscribers() {
// TODO(single-attestation) we should also emit the old attestation event?
event_handler.register(EventKind::SingleAttestation(Box::new(
verify_attestation.single_attestation.clone(),
)));
}
}

metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);

Ok(verify_attestation)
}

/// Performs the same validation as `Self::verify_aggregated_attestation_for_gossip`, but for
/// multiple attestations using batch BLS verification. Batch verification can provide
/// significant CPU-time savings compared to individual verification.
Expand Down Expand Up @@ -2169,6 +2200,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

/// Accepts a `SingleATtestation` object and attempts to verify it in the context of fork
/// choice. If it is valid it is applied to `self.fork_choice`.
pub fn apply_single_attestation_to_fork_choice(
&self,
single_attestation: &SingleAttestation,
) -> Result<(), Error> {
self.canonical_head
.fork_choice_write_lock()
.on_attestation(
self.slot()?,
single_attestation.data.clone(),
vec![single_attestation.attester_index as u64],
AttestationFromBlock::False,
)
.map_err(Into::into)
}

/// Accepts some attestation-type object and attempts to verify it in the context of fork
/// choice. If it is valid it is applied to `self.fork_choice`.
///
Expand All @@ -2178,13 +2226,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - `VerifiedAggregatedAttestation`
pub fn apply_attestation_to_fork_choice(
&self,
verified: &impl VerifiedAttestation<T>,
attestation_data: AttestationData,
attesting_indices: Vec<u64>,
) -> Result<(), Error> {
self.canonical_head
.fork_choice_write_lock()
.on_attestation(
self.slot()?,
verified.indexed_attestation().to_ref(),
attestation_data,
attesting_indices,
AttestationFromBlock::False,
)
.map_err(Into::into)
Expand All @@ -2200,12 +2250,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// and no error is returned.
pub fn add_to_naive_aggregation_pool(
&self,
unaggregated_attestation: &impl VerifiedAttestation<T>,
attestation: AttestationRef<T::EthSpec>,
) -> Result<(), AttestationError> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL);

let attestation = unaggregated_attestation.attestation();

match self.naive_aggregation_pool.write().insert(attestation) {
Ok(outcome) => trace!(
self.log,
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,8 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

match fork_choice.on_attestation(
current_slot,
indexed_attestation,
indexed_attestation.data().clone(),
indexed_attestation.attesting_indices_to_vec(),
AttestationFromBlock::True,
) {
Ok(()) => Ok(()),
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16;

pub struct ServerSentEventHandler<E: EthSpec> {
attestation_tx: Sender<EventKind<E>>,
single_attestation_tx: Sender<EventKind<E>>,
block_tx: Sender<EventKind<E>>,
blob_sidecar_tx: Sender<EventKind<E>>,
finalized_tx: Sender<EventKind<E>>,
Expand Down Expand Up @@ -37,6 +38,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {

pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (single_attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
Expand All @@ -56,6 +58,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {

Self {
attestation_tx,
single_attestation_tx,
block_tx,
blob_sidecar_tx,
finalized_tx,
Expand Down Expand Up @@ -154,6 +157,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
EventKind::SingleAttestation(_) => self
.single_attestation_tx
.send(kind)
.map(|count| log_count("single_attestation", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand All @@ -164,6 +171,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.attestation_tx.subscribe()
}

pub fn subscribe_single_attestation(&self) -> Receiver<EventKind<E>> {
self.single_attestation_tx.subscribe()
}

pub fn subscribe_block(&self) -> Receiver<EventKind<E>> {
self.block_tx.subscribe()
}
Expand Down Expand Up @@ -232,6 +243,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.attestation_tx.receiver_count() > 0
}

pub fn has_single_attestation_subscribers(&self) -> bool {
self.single_attestation_tx.receiver_count() > 0
}

pub fn has_block_subscribers(&self) -> bool {
self.block_tx.receiver_count() > 0
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
pub mod shuffling_cache;
pub mod single_attestation_verification;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
pub mod sync_committee_verification;
Expand Down
Loading
Loading