Skip to content

Commit

Permalink
op-pool maintains unaggregated_attestations and calculates bron kerbo…
Browse files Browse the repository at this point in the history
…sh from a static graph
  • Loading branch information
GeemoCandama committed Jul 28, 2023
1 parent 979faf0 commit 023f290
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 54 deletions.
92 changes: 67 additions & 25 deletions beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct CompactAttestationData {
pub target_root: Hash256,
}

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct CompactIndexedAttestation<T: EthSpec> {
pub attesting_indices: Vec<u64>,
pub aggregation_bits: BitList<T::MaxValidatorsPerCommittee>,
Expand Down Expand Up @@ -48,7 +48,8 @@ pub struct AttestationMap<T: EthSpec> {

#[derive(Debug, Default, PartialEq)]
pub struct AttestationDataMap<T: EthSpec> {
attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
pub aggregate_attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
pub unaggregate_attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
}

impl<T: EthSpec> SplitAttestation<T> {
Expand Down Expand Up @@ -155,25 +156,25 @@ impl<T: EthSpec> AttestationMap<T> {
.checkpoint_map
.entry(checkpoint)
.or_insert_with(AttestationDataMap::default);
let attestations = attestation_map
.attestations
.entry(data)
.or_insert_with(Vec::new);

// Greedily aggregate the attestation with all existing attestations.
// NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
// aggregation.
let mut aggregated = false;
let attestations = if indexed.attesting_indices.len() > 1 {
attestation_map
.aggregate_attestations
.entry(data)
.or_insert_with(Vec::new)
} else {
attestation_map
.unaggregate_attestations
.entry(data)
.or_insert_with(Vec::new)
};
let mut observed = false;
for existing_attestation in attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
} else if *existing_attestation == indexed {
aggregated = true;
if *existing_attestation == indexed {
observed = true;
}
}

if !aggregated {
if !observed {
attestations.push(indexed);
}
}
Expand Down Expand Up @@ -201,6 +202,13 @@ impl<T: EthSpec> AttestationMap<T> {
self.checkpoint_map
.retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1);
}

pub fn get_attestation_map(
&self,
checkpoint_key: &CheckpointKey,
) -> Option<&AttestationDataMap<T>> {
self.checkpoint_map.get(checkpoint_key)
}

/// Statistics about all attestations stored in the map.
pub fn stats(&self) -> AttestationStats {
Expand All @@ -222,24 +230,58 @@ impl<T: EthSpec> AttestationDataMap<T> {
&'a self,
checkpoint_key: &'a CheckpointKey,
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
self.attestations.iter().flat_map(|(data, vec_indexed)| {
vec_indexed.iter().map(|indexed| AttestationRef {
checkpoint: checkpoint_key,
data,
indexed,
})
})
let aggregates = self
.aggregate_attestations
.iter()
.flat_map(|(data, vec_indexed)| {
vec_indexed.iter().map(|indexed| AttestationRef {
checkpoint: checkpoint_key,
data,
indexed,
})
});

let unaggregates = self
.aggregate_attestations
.iter()
.flat_map(|(data, vec_indexed)| {
vec_indexed.iter().map(|indexed| AttestationRef {
checkpoint: checkpoint_key,
data,
indexed,
})
});

aggregates.chain(unaggregates)
}

pub fn stats(&self) -> AttestationStats {
let mut stats = AttestationStats::default();
let mut data_to_num_attestations: HashMap<&CompactAttestationData, usize> = HashMap::new();

for aggregates in self.attestations.values() {
for (data, aggregates) in self.aggregate_attestations.iter() {
stats.num_attestations += aggregates.len();
stats.num_attestation_data += 1;
stats.max_aggregates_per_data =
std::cmp::max(stats.max_aggregates_per_data, aggregates.len());

data_to_num_attestations.insert(data, aggregates.len());
}

for (data, unaggregates) in self.unaggregate_attestations.iter() {
stats.num_attestations += unaggregates.len();
if let Some(aggregates_num) = data_to_num_attestations.get(data) {
stats.max_aggregates_per_data = std::cmp::max(
stats.max_aggregates_per_data,
aggregates_num + unaggregates.len(),
);
} else {
stats.num_attestation_data += 1;
stats.max_aggregates_per_data =
std::cmp::max(stats.max_aggregates_per_data, unaggregates.len());
}
}

stats
}
}
11 changes: 5 additions & 6 deletions beacon_node/operation_pool/src/bron_kerbosch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/// Entry point for the Bron-Kerbosh algorithm. Takes a vector of `vertices` of type
/// `T : Compatible<T>`. Returns all the maximal cliques (as a matrix of indices) for the graph
/// `G = (V,E)` where `V` is `vertices` and `E` encodes the `is_compatible` relationship.
pub fn bron_kerbosh<T, F: Fn(&T, &T) -> bool>(
pub fn bron_kerbosch<T, F: Fn(&T, &T) -> bool>(
vertices: &Vec<T>,
is_compatible: F,
) -> Vec<Vec<usize>> {
Expand All @@ -25,7 +24,7 @@ pub fn bron_kerbosh<T, F: Fn(&T, &T) -> bool>(
.filter(|j| neighbourhoods[vi].contains(&ordering[*j]))
.map(|j| ordering[j])
.collect();
bron_kerbosh_aux(r, p, x, &neighbourhoods, &mut publish_clique)
bron_kerbosch_aux(r, p, x, &neighbourhoods, &mut publish_clique)
}

cliques
Expand Down Expand Up @@ -77,7 +76,7 @@ fn degeneracy_order(num_vertices: usize, neighbourhoods: &[Vec<usize>]) -> Vec<u
/// * `x` - a set of vertices that have been explored and shouldn't be added to r
/// * `neighbourhoods` - a data structure to hold the neighbourhoods of each vertex
/// * `publish_clique` - a callback function to call whenever a clique has been produced
fn bron_kerbosh_aux<F>(
fn bron_kerbosch_aux<F>(
r: Vec<usize>,
p: Vec<usize>,
x: Vec<usize>,
Expand Down Expand Up @@ -115,7 +114,7 @@ fn bron_kerbosh_aux<F>(
nx.retain(|e| n.contains(e));

// recursive call
bron_kerbosh_aux(nr, np, nx, neighbourhoods, publish_clique);
bron_kerbosch_aux(nr, np, nx, neighbourhoods, publish_clique);

// p \ { v }, x U { v }
mp.remove(mp.iter().position(|x| *x == v).unwrap());
Expand Down Expand Up @@ -164,6 +163,6 @@ mod tests {
edges.contains(&(*first, *second)) || edges.contains(&(*first, *second))
};

println!("{:?}", bron_kerbosh(&vertices, is_compatible));
println!("{:?}", bron_kerbosch(&vertices, is_compatible));
}
}
167 changes: 144 additions & 23 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ mod sync_aggregate_id;

pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::{earliest_attestation_validators, AttMaxCover};
use attestation_storage::{CompactIndexedAttestation, CompactAttestationData, AttestationDataMap};
pub use attestation_storage::{AttestationRef, SplitAttestation};
use bron_kerbosch::bron_kerbosch;
pub use max_cover::MaxCover;
pub use persistence::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
Expand Down Expand Up @@ -239,6 +241,96 @@ impl<T: EthSpec> OperationPool<T> {
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
})
}

#[allow(clippy::too_many_arguments)]
fn get_clique_aggregate_attestations_for_epoch<'a>(
&'a self,
checkpoint_key: &'a CheckpointKey,
all_attestations: &'a AttestationMap<T>,
state: &'a BeaconState<T>,
mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
num_valid: &mut i64,
spec: &'a ChainSpec,
) -> Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> {
let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = vec![];
if let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations })
= all_attestations.get_attestation_map(checkpoint_key) {
for (data, aggregates) in aggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
.collect();
*num_valid += aggregates.len() as i64;

let cliques = bron_kerbosch(&aggregates, is_compatible);

// This assumes that the values from bron_kerbosch are valid indices of
// aggregates.
let mut clique_aggregates = cliques
.iter()
.map(|clique| {
let mut res_att = aggregates[clique[0]].clone();
for ind in clique.iter().skip(1) {
res_att.aggregate(&aggregates[*ind]);
}
res_att
});

if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) {
for attestation in unaggregate_attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
{
*num_valid += 1;
for mut clique_aggregate in &mut clique_aggregates {
if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) {
clique_aggregate.aggregate(attestation);
}
}
}
}

cliqued_atts.extend(
clique_aggregates
.map(|indexed| (data, indexed))
);
}
}
for (data, attestations) in unaggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
if !aggregate_attestations.contains_key(&data) {
let mut valid_attestations = attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}));

if let Some(first) = valid_attestations.next() {
let mut agg = first.clone();
for attestation in valid_attestations {
agg.aggregate(&attestation);
}
cliqued_atts.push((data, agg));
}
}
}
}
}
cliqued_atts
}

/// Get a list of attestations for inclusion in a block.
///
Expand Down Expand Up @@ -272,28 +364,50 @@ impl<T: EthSpec> OperationPool<T> {
let mut num_prev_valid = 0_i64;
let mut num_curr_valid = 0_i64;

let prev_epoch_att = self
.get_valid_attestations_for_epoch(
&prev_epoch_key,
&*all_attestations,
state,
&reward_cache,
total_active_balance,
prev_epoch_validity_filter,
spec,
let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key {
self.get_clique_aggregate_attestations_for_epoch(
&prev_epoch_key,
&*all_attestations,
state,
prev_epoch_validity_filter,
&mut num_prev_valid,
spec
)
.inspect(|_| num_prev_valid += 1);
let curr_epoch_att = self
.get_valid_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
&reward_cache,
total_active_balance,
curr_epoch_validity_filter,
spec,
)
.inspect(|_| num_curr_valid += 1);
} else {
vec![]
};

let prev_epoch_cliqued_atts: Vec<AttMaxCover<T>> = prev_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let curr_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
curr_epoch_validity_filter,
&mut num_curr_valid,
spec
);

let curr_epoch_cliqued_atts: Vec<AttMaxCover<T>> = curr_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();


let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
Expand All @@ -312,13 +426,13 @@ impl<T: EthSpec> OperationPool<T> {
if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations")
maximum_cover(prev_epoch_cliqued_atts, prev_epoch_limit, "prev_epoch_attestations")
}
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(
curr_epoch_att,
curr_epoch_cliqued_atts,
T::MaxAttestations::to_usize(),
"curr_epoch_attestations",
)
Expand Down Expand Up @@ -708,6 +822,13 @@ impl<T: EthSpec> OperationPool<T> {
}
}

fn is_compatible<T: EthSpec>(x: &&CompactIndexedAttestation<T>, y: &&CompactIndexedAttestation<T>) -> bool {
let x_attester_set: HashSet<_> = x.attesting_indices.iter().collect();
let y_attester_set: HashSet<_> = y.attesting_indices.iter().collect();
x_attester_set.is_disjoint(&y_attester_set)
}


/// Filter up to a maximum number of operations out of an iterator.
fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>(
operations: I,
Expand Down

0 comments on commit 023f290

Please sign in to comment.