Skip to content

Commit

Permalink
Merge pull request #3276 from autonomys/split-produce-bundle
Browse files Browse the repository at this point in the history
Split produce_bundle into smaller methods
  • Loading branch information
teor2345 authored Dec 3, 2024
2 parents 54fb379 + 3685d2c commit cd44905
Showing 1 changed file with 204 additions and 80 deletions.
284 changes: 204 additions & 80 deletions domains/client/domain-operator/src/domain_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use sp_blockchain::HeaderBackend;
use sp_consensus_slots::Slot;
use sp_domains::core_api::DomainCoreApi;
use sp_domains::{
Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey,
OperatorSignature, SealedBundleHeader, SealedSingletonReceipt, SingletonReceipt,
Bundle, BundleHeader, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId,
OperatorPublicKey, OperatorSignature, ProofOfElection, SealedBundleHeader,
SealedSingletonReceipt, SingletonReceipt,
};
use sp_keystore::KeystorePtr;
use sp_messenger::MessengerApi;
Expand All @@ -22,6 +23,19 @@ use std::sync::Arc;
use subspace_runtime_primitives::Balance;
use tracing::info;

/// Type alias for block hash.
pub type BlockHashFor<Block> = <Block as BlockT>::Hash;

/// Type alias for block header.
pub type HeaderFor<Block> = <Block as BlockT>::Header;

/// Type alias for bundle header.
pub type BundleHeaderFor<Block, CBlock> =
BundleHeader<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;

/// Type alias for extrinsics.
pub type ExtrinsicFor<Block> = <Block as BlockT>::Extrinsic;

type OpaqueBundle<Block, CBlock> = sp_domains::OpaqueBundle<
NumberFor<CBlock>,
<CBlock as BlockT>::Hash,
Expand Down Expand Up @@ -170,18 +184,26 @@ where
})
}

pub async fn produce_bundle(
&mut self,
#[expect(clippy::type_complexity)]
pub fn claim_bundle_slot(
&self,
operator_id: OperatorId,
slot_info: OperatorSlotInfo,
) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
slot_info: &OperatorSlotInfo,
domain_best_number: NumberFor<Block>,
consensus_chain_best_hash: BlockHashFor<CBlock>,
) -> sp_blockchain::Result<
Option<(
NumberFor<Block>,
NumberFor<Block>,
ProofOfElection,
OperatorPublicKey,
)>,
> {
let OperatorSlotInfo {
slot,
proof_of_time,
} = slot_info;

let domain_best_number = self.client.info().best_number;
let consensus_chain_best_hash = self.consensus_client.info().best_hash;
let domain_best_number_onchain = self
.consensus_client
.runtime_api()
Expand Down Expand Up @@ -211,7 +233,7 @@ where
let skip_out_of_order_slot = self.skip_out_of_order_slot
&& self
.last_processed_slot
.map(|last_slot| last_slot >= slot)
.map(|last_slot| last_slot >= *slot)
.unwrap_or(false);

is_operator_lagging || skip_out_of_order_slot
Expand All @@ -227,98 +249,200 @@ where

if let Some((proof_of_election, operator_signing_key)) =
self.bundle_producer_election_solver.solve_challenge(
slot,
*slot,
consensus_chain_best_hash,
self.domain_id,
operator_id,
proof_of_time,
*proof_of_time,
)?
{
tracing::info!("📦 Claimed slot {slot}");

Ok(Some((
domain_best_number_onchain,
head_receipt_number,
proof_of_election,
operator_signing_key,
)))
} else {
Ok(None)
}
}

pub fn prepare_receipt(
&self,
slot_info: &OperatorSlotInfo,
domain_best_number_onchain: NumberFor<Block>,
head_receipt_number: NumberFor<Block>,
proof_of_election: &ProofOfElection,
operator_signing_key: &OperatorPublicKey,
) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
// When the receipt gap is greater than one, the operator needs to produce a receipt
// instead of a bundle
if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() {
info!(
?domain_best_number_onchain,
?head_receipt_number,
"🔖 Producing singleton receipt at slot {:?}",
slot_info.slot
);

let receipt = self
.domain_bundle_proposer
.load_next_receipt(domain_best_number_onchain, head_receipt_number)?;

// When the receipt gap is greater than one the operator need to produce receipt
// instead of bundle
if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() {
info!(
?domain_best_number_onchain,
?head_receipt_number,
"🔖 Producing singleton receipt at slot {:?}",
slot_info.slot
);

let singleton_receipt = SingletonReceipt {
proof_of_election,
receipt,
};
let singleton_receipt = SingletonReceipt {
proof_of_election: proof_of_election.clone(),
receipt,
};

let signature = {
let to_sign: BlockHashFor<Block> = singleton_receipt.hash();
self.sign(operator_signing_key, to_sign.as_ref())?
};

let signature = {
let to_sign: <Block as BlockT>::Hash = singleton_receipt.hash();
self.sign(&operator_signing_key, to_sign.as_ref())?
let sealed_singleton_receipt: SealedSingletonReceiptFor<Block, CBlock> =
SealedSingletonReceipt {
singleton_receipt,
signature,
};

let sealed_singleton_receipt: SealedSingletonReceiptFor<Block, CBlock> =
SealedSingletonReceipt {
singleton_receipt,
signature,
};
return Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt)));
}
Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt)))
} else {
Ok(None)
}
}

#[expect(clippy::too_many_arguments)]
pub async fn prepare_bundle(
&mut self,
operator_id: OperatorId,
slot_info: &OperatorSlotInfo,
consensus_chain_best_hash: BlockHashFor<CBlock>,
domain_best_number_onchain: NumberFor<Block>,
head_receipt_number: NumberFor<Block>,
proof_of_election: ProofOfElection,
// TODO: remove when skip_empty_bundle_production is split out
domain_best_number: NumberFor<Block>,
// TODO: remove Option when skip_empty_bundle_production is split out
) -> sp_blockchain::Result<Option<(BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>)>>
{
let tx_range = self
.consensus_client
.runtime_api()
.domain_tx_range(consensus_chain_best_hash, self.domain_id)
.map_err(|error| {
sp_blockchain::Error::Application(Box::from(format!(
"Error getting tx range: {error}"
)))
})?;

let receipt = self
.domain_bundle_proposer
.load_next_receipt(domain_best_number_onchain, head_receipt_number)?;

let tx_range = self
let (bundle_header, extrinsics) = self
.domain_bundle_proposer
.propose_bundle_at(proof_of_election.clone(), tx_range, operator_id, receipt)
.await?;

// if there are no extrinsics and no receipts to confirm, skip the bundle
if self.skip_empty_bundle_production
&& extrinsics.is_empty()
&& !self
.consensus_client
.runtime_api()
.domain_tx_range(consensus_chain_best_hash, self.domain_id)
.map_err(|error| {
sp_blockchain::Error::Application(Box::from(format!(
"Error getting tx range: {error}"
)))
})?;
let (bundle_header, extrinsics) = self
.domain_bundle_proposer
.propose_bundle_at(proof_of_election, tx_range, operator_id, receipt)
.await?;

// if there are no extrinsics and no receipts to confirm, skip the bundle
if self.skip_empty_bundle_production
&& extrinsics.is_empty()
&& !self
.consensus_client
.runtime_api()
.non_empty_er_exists(consensus_chain_best_hash, self.domain_id)?
{
tracing::warn!(
?domain_best_number,
"Skipping empty bundle production on slot {slot}"
);
return Ok(None);
}

self.last_processed_slot.replace(slot);

info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
.non_empty_er_exists(consensus_chain_best_hash, self.domain_id)?
{
tracing::warn!(
?domain_best_number,
"Skipping empty bundle production on slot {}",
slot_info.slot,
);
return Ok(None);
}

let signature = {
let to_sign = bundle_header.hash();
self.sign(&operator_signing_key, to_sign.as_ref())?
};
self.last_processed_slot.replace(slot_info.slot);

let bundle = Bundle {
sealed_header: SealedBundleHeader::new(bundle_header, signature),
extrinsics,
};
Ok(Some((bundle_header, extrinsics)))
}

// TODO: Re-enable the bundle gossip over X-Net when the compact bundle is supported.
// if let Err(e) = self.bundle_sender.unbounded_send(signed_bundle.clone()) {
// tracing::error!(error = ?e, "Failed to send transaction bundle");
// }
pub fn seal_bundle(
&self,
bundle_header: BundleHeaderFor<Block, CBlock>,
operator_signing_key: &OperatorPublicKey,
extrinsics: Vec<ExtrinsicFor<Block>>,
) -> sp_blockchain::Result<DomainProposal<Block, CBlock>> {
let signature = {
let to_sign = bundle_header.hash();
self.sign(operator_signing_key, to_sign.as_ref())?
};

Ok(Some(DomainProposal::Bundle(bundle.into_opaque_bundle())))
} else {
Ok(None)
let bundle = Bundle {
sealed_header: SealedBundleHeader::new(bundle_header, signature),
extrinsics,
};

// TODO: Re-enable the bundle gossip over X-Net when the compact bundle is supported.
// if let Err(e) = self.bundle_sender.unbounded_send(signed_bundle.clone()) {
// tracing::error!(error = ?e, "Failed to send transaction bundle");
// }

Ok(DomainProposal::Bundle(bundle.into_opaque_bundle()))
}

pub async fn produce_bundle(
&mut self,
operator_id: OperatorId,
slot_info: OperatorSlotInfo,
) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
let domain_best_number = self.client.info().best_number;
let consensus_chain_best_hash = self.consensus_client.info().best_hash;

let Some((
domain_best_number_onchain,
head_receipt_number,
proof_of_election,
operator_signing_key,
)) = self.claim_bundle_slot(
operator_id,
&slot_info,
domain_best_number,
consensus_chain_best_hash,
)?
else {
return Ok(None);
};

if let Some(receipt) = self.prepare_receipt(
&slot_info,
domain_best_number_onchain,
head_receipt_number,
&proof_of_election,
&operator_signing_key,
)? {
return Ok(Some(receipt));
}

let Some((bundle_header, extrinsics)) = self
.prepare_bundle(
operator_id,
&slot_info,
consensus_chain_best_hash,
domain_best_number_onchain,
head_receipt_number,
proof_of_election,
domain_best_number,
)
.await?
else {
return Ok(None);
};

info!("🔖 Producing bundle at slot {:?}", slot_info.slot);

let bundle = self.seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;

Ok(Some(bundle))
}
}

0 comments on commit cd44905

Please sign in to comment.