Skip to content

Commit

Permalink
Merge pull request #3275 from autonomys/remove-internal-proving-bench…
Browse files Browse the repository at this point in the history
…mark

Remove internal proving benchmark
  • Loading branch information
nazar-pc authored Dec 3, 2024
2 parents 7d92855 + c539999 commit 54fb379
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 159 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Latest Release](https://img.shields.io/github/v/release/autonomys/subspace?display_name=tag&style=flat-square)](https://github.com/autonomys/subspace/releases)
[![Downloads Latest](https://img.shields.io/github/downloads/autonomys/subspace/latest/total?style=flat-square)](https://github.com/autonomys/subspace/releases/latest)
[![Rust](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rust.yml?branch=main)](https://github.com/autonomys/subspace/actions/workflows/rust.yaml)
[![Rust](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rust.yml?branch=main)](https://github.com/autonomys/subspace/actions/workflows/rust.yml)
[![Rust Docs](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rustdoc.yml?branch=main)](https://autonomys.github.io/subspace)

This is a mono repository for [Subspace Network](https://subspace.network/) implementation, primarily containing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, Semaphore};
use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
Expand Down Expand Up @@ -36,7 +36,6 @@ use subspace_farmer::utils::{
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_kzg::Kzg;
use subspace_proof_of_space::Table;
use tokio::sync::Barrier;
use tracing::{error, info, info_span, warn, Instrument};

const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
Expand All @@ -59,8 +58,7 @@ pub(super) struct FarmerArgs {
/// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that
/// farmer will make sure to not exceed (and will pre-allocated all the space on startup to
/// ensure it will not run out of space in runtime). Optionally, `record-chunks-mode` can be
/// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during
/// startup.
/// set to `ConcurrentChunks` (default) or `WholeSector`.
disk_farms: Vec<DiskFarm>,
/// Address for farming rewards
#[arg(long, value_parser = parse_ss58_reward_address)]
Expand Down Expand Up @@ -256,9 +254,6 @@ where
let farms = {
let node_client = node_client.clone();
let info_mutex = &AsyncMutex::new(());
let faster_read_sector_record_chunks_mode_barrier =
Arc::new(Barrier::new(disk_farms.len()));
let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1));
let registry = &Mutex::new(registry);

let mut farms = Vec::with_capacity(disk_farms.len());
Expand All @@ -272,10 +267,6 @@ where
let erasure_coding = erasure_coding.clone();
let plotter = Arc::clone(&plotter);
let global_mutex = Arc::clone(&global_mutex);
let faster_read_sector_record_chunks_mode_barrier =
Arc::clone(&faster_read_sector_record_chunks_mode_barrier);
let faster_read_sector_record_chunks_mode_concurrency =
Arc::clone(&faster_read_sector_record_chunks_mode_concurrency);

async move {
let farm_fut = SingleDiskFarm::new::<_, PosTable>(
Expand All @@ -297,9 +288,8 @@ where
max_plotting_sectors_per_farm,
disable_farm_locking,
read_sector_record_chunks_mode: disk_farm
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
.read_sector_record_chunks_mode
.unwrap_or(ReadSectorRecordChunksMode::ConcurrentChunks),
registry: Some(registry),
create,
},
Expand Down
16 changes: 3 additions & 13 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use subspace_kzg::Kzg;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::sync::Barrier;
use tracing::{error, info, info_span, warn, Instrument};

/// Get piece retry attempts number.
Expand Down Expand Up @@ -198,8 +197,7 @@ pub(crate) struct FarmingArgs {
/// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that
/// farmer will make sure to not exceed (and will pre-allocated all the space on startup to
/// ensure it will not run out of space in runtime). Optionally, `record-chunks-mode` can be
/// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during
/// startup.
/// set to `ConcurrentChunks` (default) or `WholeSector`.
disk_farms: Vec<DiskFarm>,
/// WebSocket RPC URL of the Subspace node to connect to
#[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")]
Expand Down Expand Up @@ -571,9 +569,6 @@ where

let (farms, plotting_delay_senders) = {
let info_mutex = &AsyncMutex::new(());
let faster_read_sector_record_chunks_mode_barrier =
Arc::new(Barrier::new(disk_farms.len()));
let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1));
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, Vec<_>>();
Expand All @@ -591,10 +586,6 @@ where
let erasure_coding = erasure_coding.clone();
let plotter = Arc::clone(&plotter);
let global_mutex = Arc::clone(&global_mutex);
let faster_read_sector_record_chunks_mode_barrier =
Arc::clone(&faster_read_sector_record_chunks_mode_barrier);
let faster_read_sector_record_chunks_mode_concurrency =
Arc::clone(&faster_read_sector_record_chunks_mode_concurrency);

async move {
let farm_fut = SingleDiskFarm::new::<_, PosTable>(
Expand All @@ -615,9 +606,8 @@ where
max_plotting_sectors_per_farm,
disable_farm_locking,
read_sector_record_chunks_mode: disk_farm
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
.read_sector_record_chunks_mode
.unwrap_or(ReadSectorRecordChunksMode::ConcurrentChunks),
registry: Some(registry),
create,
},
Expand Down
139 changes: 8 additions & 131 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::single_disk_farm::plotting::{
use crate::single_disk_farm::reward_signing::reward_signing;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::{mpsc, oneshot};
Expand All @@ -49,7 +49,6 @@ use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use rand::prelude::*;
use rayon::prelude::*;
use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
use serde::{Deserialize, Serialize};
Expand All @@ -64,27 +63,27 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::{fmt, fs, io, mem};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::pieces::Record;
use subspace_core_primitives::sectors::SectorIndex;
use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
use subspace_core_primitives::{PublicKey, ScalarBytes};
use subspace_core_primitives::PublicKey;
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_kzg::Kzg;
use subspace_networking::KnownPeersManager;
use subspace_proof_of_space::Table;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, Barrier};
use tokio::sync::broadcast;
use tokio::task;
use tracing::{debug, error, info, trace, warn, Instrument, Span};
use tracing::{error, info, trace, warn, Instrument, Span};

// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
// usize depending on chain parameters
Expand All @@ -95,10 +94,6 @@ const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
/// Reserve 1M of space for farm info (for potential future expansion)
const RESERVED_FARM_INFO: u64 = 1024 * 1024;
const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_secs(30);
/// Limit for reads in internal benchmark.
///
/// 4 seconds is proving time, hence 3 seconds for reads.
const INTERNAL_BENCHMARK_READ_TIMEOUT: Duration = Duration::from_millis(3500);

/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done
#[derive(Debug)]
Expand Down Expand Up @@ -317,13 +312,8 @@ where
pub max_plotting_sectors_per_farm: NonZeroUsize,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
/// Explicit mode to use for reading of sector record chunks instead of doing internal
/// benchmarking
pub read_sector_record_chunks_mode: Option<ReadSectorRecordChunksMode>,
/// Barrier before internal benchmarking between different farms
pub faster_read_sector_record_chunks_mode_barrier: Arc<Barrier>,
/// Limit concurrency of internal benchmarking between different farms
pub faster_read_sector_record_chunks_mode_concurrency: Arc<Semaphore>,
/// Mode to use for reading of sector record chunks instead
pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
/// Prometheus registry
pub registry: Option<&'a Mutex<&'a mut Registry>>,
/// Whether to create a farm if it doesn't yet exist
Expand Down Expand Up @@ -862,8 +852,6 @@ impl SingleDiskFarm {
max_plotting_sectors_per_farm,
disable_farm_locking,
read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
registry,
create,
} = options;
Expand Down Expand Up @@ -983,40 +971,6 @@ impl SingleDiskFarm {
let (farming_plot, farming_thread_pool) =
AsyncJoinOnDrop::new(farming_plot_fut, false).await??;

faster_read_sector_record_chunks_mode_barrier.wait().await;

let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) =
if let Some(mode) = read_sector_record_chunks_mode {
(mode, farming_plot, farming_thread_pool)
} else {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
let span = span.clone();
let plot_file = Arc::clone(&plot_file);

let read_sector_record_chunks_mode_fut = task::spawn_blocking(move || {
farming_thread_pool
.install(move || {
let _span_guard = span.enter();

faster_read_sector_record_chunks_mode(
&*plot_file,
&farming_plot,
sector_size,
metadata_header.plotted_sector_count,
)
.map(|mode| (mode, farming_plot))
})
.map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool))
});

AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await??
};

faster_read_sector_record_chunks_mode_barrier.wait().await;

let plotting_join_handle = task::spawn_blocking({
let sectors_metadata = Arc::clone(&sectors_metadata);
let handlers = Arc::clone(&handlers);
Expand Down Expand Up @@ -2423,80 +2377,3 @@ fn write_dummy_sector_metadata(
error,
})
}

fn faster_read_sector_record_chunks_mode<OP, FP>(
original_plot: &OP,
farming_plot: &FP,
sector_size: usize,
mut plotted_sector_count: SectorIndex,
) -> Result<ReadSectorRecordChunksMode, SingleDiskFarmError>
where
OP: FileExt + Sync,
FP: ReadAtSync,
{
info!("Benchmarking faster proving method");

let mut sector_bytes = vec![0u8; sector_size];

if plotted_sector_count == 0 {
thread_rng().fill_bytes(&mut sector_bytes);
original_plot.write_all_at(&sector_bytes, 0)?;

plotted_sector_count = 1;
}

let mut fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
let mut fastest_time = Duration::MAX;

for _ in 0..3 {
let sector_offset =
sector_size as u64 * thread_rng().gen_range(0..plotted_sector_count) as u64;
let farming_plot = farming_plot.offset(sector_offset);

// Reading the whole sector at once
{
let start = Instant::now();
farming_plot.read_at(&mut sector_bytes, 0)?;
let elapsed = start.elapsed();

debug!(?elapsed, "Whole sector");

if elapsed >= INTERNAL_BENCHMARK_READ_TIMEOUT {
debug!(
?elapsed,
"Reading whole sector is too slow, using chunks instead"
);

fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
break;
}

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::WholeSector;
fastest_time = elapsed;
}
}

// A lot simplified version of concurrent chunks
{
let start = Instant::now();
(0..Record::NUM_CHUNKS).into_par_iter().try_for_each(|_| {
let offset = thread_rng().gen_range(0_usize..sector_size / ScalarBytes::FULL_BYTES)
* ScalarBytes::FULL_BYTES;
farming_plot.read_at(&mut [0; ScalarBytes::FULL_BYTES], offset as u64)
})?;
let elapsed = start.elapsed();

debug!(?elapsed, "Chunks");

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
fastest_time = elapsed;
}
}
}

info!(?fastest_mode, "Faster proving method found");

Ok(fastest_mode)
}

0 comments on commit 54fb379

Please sign in to comment.