diff --git a/Cargo.lock b/Cargo.lock index e401850f5cd..1e835788f78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2766,7 +2766,7 @@ dependencies = [ [[package]] name = "massa_hash" version = "0.1.0" -source = "git+https://github.com/massalabs/massa#426a6ef7119a92249fed5f3b60e60cfa82ca2d08" +source = "git+https://github.com/massalabs/massa#93333bf18e605b7e9f5f9b7c1b39d2813be73f8e" dependencies = [ "blake3", "bs58", @@ -3028,7 +3028,7 @@ dependencies = [ [[package]] name = "massa_serialization" version = "0.1.0" -source = "git+https://github.com/massalabs/massa#426a6ef7119a92249fed5f3b60e60cfa82ca2d08" +source = "git+https://github.com/massalabs/massa#93333bf18e605b7e9f5f9b7c1b39d2813be73f8e" dependencies = [ "displaydoc", "nom", @@ -3056,7 +3056,7 @@ dependencies = [ [[package]] name = "massa_signature" version = "0.1.0" -source = "git+https://github.com/massalabs/massa#426a6ef7119a92249fed5f3b60e60cfa82ca2d08" +source = "git+https://github.com/massalabs/massa#93333bf18e605b7e9f5f9b7c1b39d2813be73f8e" dependencies = [ "bs58", "displaydoc", @@ -3606,7 +3606,7 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "peernet" version = "0.1.0" -source = "git+https://github.com/massalabs/PeerNet?rev=9300cb35b09347f3848c9a232f9aaf3f3f100ece#9300cb35b09347f3848c9a232f9aaf3f3f100ece" +source = "git+https://github.com/massalabs/PeerNet?rev=cf4cb09deda8cdd9b8893cc9595ac3b255aa74c9#cf4cb09deda8cdd9b8893cc9595ac3b255aa74c9" dependencies = [ "crossbeam", "enum_delegate", diff --git a/massa-async-pool/src/changes.rs b/massa-async-pool/src/changes.rs index 9292e1b9648..3051292fe5a 100644 --- a/massa-async-pool/src/changes.rs +++ b/massa-async-pool/src/changes.rs @@ -1,6 +1,6 @@ -///! Copyright (c) 2022 MASSA LABS +//! Copyright (c) 2022 MASSA LABS -///! This file provides structures representing changes to the asynchronous message pool +//! This file provides structures representing changes to the asynchronous message pool use std::ops::Bound::Included; use crate::{ diff --git a/massa-models/src/config/constants.rs b/massa-models/src/config/constants.rs index f780da547d5..f1330ef95fd 100644 --- a/massa-models/src/config/constants.rs +++ b/massa-models/src/config/constants.rs @@ -51,14 +51,14 @@ lazy_static::lazy_static! { ) ) } else { - 1683291600000.into() // Friday, May 5, 2023 01:00:00 PM UTC + 1683498600000.into() // Sunday, May 7, 2023 10:30:00 PM UTC }; /// TESTNET: time when the blockclique is ended. pub static ref END_TIMESTAMP: Option = if cfg!(feature = "sandbox") { None } else { - Some(1685556000000.into()) // Sunday, April 30, 2023 06:00:00 PM UTC + Some(1685556000000.into()) // Sunday, May 30, 2023 06:00:00 PM UTC }; /// `KeyPair` to sign genesis blocks. pub static ref GENESIS_KEY: KeyPair = KeyPair::from_str("S1UxdCJv5ckDK8z87E5Jq5fEfSVLi2cTHgtpfZy7iURs3KpPns8") @@ -68,9 +68,9 @@ lazy_static::lazy_static! { /// node version pub static ref VERSION: Version = { if cfg!(feature = "sandbox") { - "SAND.22.0" + "SAND.22.1" } else { - "TEST.22.0" + "TEST.22.1" } .parse() .unwrap() diff --git a/massa-models/src/version.rs b/massa-models/src/version.rs index 0b0fb06d08e..b033e5fed36 100644 --- a/massa-models/src/version.rs +++ b/massa-models/src/version.rs @@ -64,6 +64,7 @@ impl serde::Serialize for Version { } /// Serializer for `Version` +#[derive(Clone)] pub struct VersionSerializer { u32_serializer: U32VarIntSerializer, } @@ -108,6 +109,7 @@ impl Serializer for VersionSerializer { } /// Serializer for `Version` +#[derive(Clone)] pub struct VersionDeserializer { u32_deserializer: U32VarIntDeserializer, } @@ -189,7 +191,10 @@ impl Deserializer for VersionDeserializer { impl Version { /// true if instance and major are the same pub fn is_compatible(&self, other: &Version) -> bool { - self.instance == other.instance && self.major == other.major + self.instance == other.instance + && self.major == other.major + && self.minor >= 1 + && other.minor >= 1 } } diff --git a/massa-node/Cargo.toml b/massa-node/Cargo.toml index 1029739b6c2..b2b004f0ff9 100644 --- a/massa-node/Cargo.toml +++ b/massa-node/Cargo.toml @@ -18,7 +18,7 @@ tracing = { version = "0.1", features = [ "max_level_debug", "release_max_level_debug", ] } -peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" } +peernet = { git = "https://github.com/massalabs/PeerNet", rev = "cf4cb09deda8cdd9b8893cc9595ac3b255aa74c9" } tracing-subscriber = "0.3" paw = "1.0" structopt = { version = "0.3", features = ["paw"] } diff --git a/massa-node/base_config/config.toml b/massa-node/base_config/config.toml index 3846b8ecef5..8c8991e80cc 100644 --- a/massa-node/base_config/config.toml +++ b/massa-node/base_config/config.toml @@ -155,10 +155,6 @@ keypair_file = "config/node_privkey.key" # path to the initial peers file initial_peers_file = "base_config/initial_peers.json" - # maximum of incoming connections - max_incoming_connections = 15 - # maximum of outgoing connections - max_outgoing_connections = 10 # Limit of read/write number of bytes per second with a peer (Should be a 10 multiple) read_write_limit_bytes_per_second = 2_000_000_000 # timeout after which without answer a hanshake is ended @@ -196,12 +192,23 @@ operation_announcement_interval = 300 # max number of operation per message, same as network param but can be smaller max_operations_per_message = 1024 + # Number of millis seconds between each try out connections + try_connection_timer = 5000 + # Number of millis seconds that create a timeout for out connections + timeout_connection = 1000 # time threshold after which operation are not propagated max_operations_propagation_time = 32000 # time threshold after which endorsement are not propagated max_endorsements_propagation_time = 48000 # number of thread tester thread_tester_count = 5 + # Peer default category limits + default_category_info = { target_out_connections = 10, max_in_connections_per_ip = 2, max_in_connections_pre_handshake = 100, max_in_connections_post_handshake = 15} + # Peer categories limits + [protocol.peers_categories] + Bootstrap = { target_out_connections = 1, max_in_connections_per_ip = 1, max_in_connections_pre_handshake = 8, max_in_connections_post_handshake = 1} + +[network] [bootstrap] # list of bootstrap (ip, node id) diff --git a/massa-node/base_config/initial_peers.json b/massa-node/base_config/initial_peers.json index 3b2bf98e37a..d21da8f8a2e 100644 --- a/massa-node/base_config/initial_peers.json +++ b/massa-node/base_config/initial_peers.json @@ -1,26 +1,51 @@ { "P12UbyLJDS7zimGWf3LTHe8hYY67RdLke1iDRZqJbQQLHQSKPW8j": { - "149.202.86.103:31244": "Tcp" + "listeners": { + "149.202.86.103:31244": "Tcp" + }, + "category": "Bootstrap" + }, "P12vxrYTQzS5TRzxLfFNYxn6PyEsphKWkdqx2mVfEuvJ9sPF43uq": { - "149.202.89.125:31244": "Tcp" + "listeners": { + "149.202.89.125:31244": "Tcp" + }, + "category": "Bootstrap" }, "P12rPDBmpnpnbECeAKDjbmeR19dYjAUwyLzsa8wmYJnkXLCNF28E": { - "158.69.120.215:31244": "Tcp" + "listeners": { + "158.69.120.215:31244": "Tcp" + }, + "category": "Bootstrap" }, "P1XxexKa3XNzvmakNmPawqFrE9Z2NFhfq1AhvV1Qx4zXq5p1Bp9": { - "158.69.23.120:31244": "Tcp" + "listeners": { + "158.69.23.120:31244": "Tcp" + }, + "category": "Bootstrap" }, "P1qxuqNnx9kyAMYxUfsYiv2gQd5viiBX126SzzexEdbbWd2vQKu": { - "198.27.74.5:31244": "Tcp" + "listeners": { + "198.27.74.5:31244": "Tcp" + }, + "category": "Bootstrap" }, "P1hdgsVsd4zkNp8cF1rdqqG6JPRQasAmx12QgJaJHBHFU1fRHEH": { - "198.27.74.52:31244": "Tcp" + "listeners": { + "198.27.74.52:31244": "Tcp" + }, + "category": "Bootstrap" }, "P1gEdBVEbRFbBxBtrjcTDDK9JPbJFDay27uiJRE3vmbFAFDKNh7": { - "54.36.174.177:31244": "Tcp" + "listeners": { + "54.36.174.177:31244": "Tcp" + }, + "category": "Bootstrap" }, "P13Ykon8Zo73PTKMruLViMMtE2rEG646JQ4sCcee2DnopmVM3P5": { - "51.75.60.228:31244": "Tcp" + "listeners": { + "51.75.60.228:31244": "Tcp" + }, + "category": "Bootstrap" } } \ No newline at end of file diff --git a/massa-node/base_config/openrpc.json b/massa-node/base_config/openrpc.json index 751e5316354..ab035d18ab7 100644 --- a/massa-node/base_config/openrpc.json +++ b/massa-node/base_config/openrpc.json @@ -2,7 +2,7 @@ "openrpc": "1.2.4", "info": { "title": "Massa OpenRPC Specification", - "version": "TEST.22.0", + "version": "TEST.22.1", "description": "Massa OpenRPC Specification document. Find more information on https://docs.massa.net/en/latest/technical-doc/api.html", "termsOfService": "https://open-rpc.org", "contact": { diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index 893451db625..800dfa9ae5b 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -186,9 +186,15 @@ async fn launch( // NOTE: this is temporary, since we cannot currently handle bootstrap from remaining ledger if args.keep_ledger || args.restart_from_snapshot_at_period.is_some() { info!("Loading old ledger for next episode"); - } else if SETTINGS.ledger.disk_ledger_path.exists() { - std::fs::remove_dir_all(SETTINGS.ledger.disk_ledger_path.clone()) - .expect("disk ledger delete failed"); + } else { + if SETTINGS.ledger.disk_ledger_path.exists() { + std::fs::remove_dir_all(SETTINGS.ledger.disk_ledger_path.clone()) + .expect("disk ledger delete failed"); + } + if SETTINGS.execution.hd_cache_path.exists() { + std::fs::remove_dir_all(SETTINGS.execution.hd_cache_path.clone()) + .expect("disk hd cache delete failed"); + } } // Create final ledger @@ -479,8 +485,6 @@ async fn launch( initial_peers: SETTINGS.protocol.initial_peers_file.clone(), listeners, keypair_file: SETTINGS.protocol.keypair_file.clone(), - max_in_connections: SETTINGS.protocol.max_incoming_connections, - max_out_connections: SETTINGS.protocol.max_outgoing_connections, max_known_blocks_saved_size: SETTINGS.protocol.max_known_blocks_size, asked_operations_buffer_capacity: SETTINGS.protocol.max_known_ops_size, thread_tester_count: SETTINGS.protocol.thread_tester_count, @@ -515,8 +519,16 @@ async fn launch( max_size_peers_announcement: MAX_PEERS_IN_ANNOUNCEMENT_LIST, read_write_limit_bytes_per_second: SETTINGS.protocol.read_write_limit_bytes_per_second as u128, - routable_ip: SETTINGS.protocol.routable_ip, + try_connection_timer: SETTINGS.protocol.try_connection_timer, + timeout_connection: SETTINGS.protocol.timeout_connection, + routable_ip: SETTINGS + .protocol + .routable_ip + .or(SETTINGS.network.routable_ip), debug: false, + peers_categories: SETTINGS.protocol.peers_categories.clone(), + default_category_info: SETTINGS.protocol.default_category_info, + version: *VERSION, }; let (protocol_controller, protocol_channels) = diff --git a/massa-node/src/settings.rs b/massa-node/src/settings.rs index 162aa1e26c2..0b3098f42ba 100644 --- a/massa-node/src/settings.rs +++ b/massa-node/src/settings.rs @@ -1,10 +1,11 @@ // Copyright (c) 2022 MASSA LABS //! Build here the default node settings from the configuration file toml -use std::path::PathBuf; +use std::{collections::HashMap, path::PathBuf}; use massa_bootstrap::IpType; use massa_models::{config::build_massa_settings, node::NodeId}; +use massa_protocol_exports::PeerCategoryInfo; use massa_time::MassaTime; use serde::Deserialize; use std::net::{IpAddr, SocketAddr}; @@ -124,6 +125,7 @@ pub struct Settings { pub protocol: ProtocolSettings, pub consensus: ConsensusSettings, pub api: APISettings, + pub network: NetworkSettings, pub bootstrap: BootstrapSettings, pub pool: PoolSettings, pub execution: ExecutionSettings, @@ -163,6 +165,13 @@ pub struct ConsensusSettings { pub broadcast_filled_blocks_channel_capacity: usize, } +// TODO: Remove one date. Kept for retro compatibility. +#[derive(Debug, Deserialize, Clone)] +pub struct NetworkSettings { + /// Ip seen by others. If none the bind ip is used + pub routable_ip: Option, +} + /// Protocol Configuration, read from toml user configuration file #[derive(Debug, Deserialize, Clone)] pub struct ProtocolSettings { @@ -214,14 +223,18 @@ pub struct ProtocolSettings { pub routable_ip: Option, /// Time threshold to have a connection to a node pub connect_timeout: MassaTime, - /// Max number of connection in - pub max_incoming_connections: usize, - /// Max number of connection out - pub max_outgoing_connections: usize, /// Number of tester threads pub thread_tester_count: u8, /// Number of bytes we can read/write by seconds in a connection (must be a 10 multiple) pub read_write_limit_bytes_per_second: u64, + /// try connection timer + pub try_connection_timer: MassaTime, + /// Timeout connection + pub timeout_connection: MassaTime, + /// Peers limits per category + pub peers_categories: HashMap, + /// Limits for default category + pub default_category_info: PeerCategoryInfo, } /// gRPC settings diff --git a/massa-protocol-exports/Cargo.toml b/massa-protocol-exports/Cargo.toml index 3e943ae9e85..a4f4dbbe544 100644 --- a/massa-protocol-exports/Cargo.toml +++ b/massa-protocol-exports/Cargo.toml @@ -10,7 +10,7 @@ thiserror = "1.0" nom = "7.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" } +peernet = { git = "https://github.com/massalabs/PeerNet", rev = "cf4cb09deda8cdd9b8893cc9595ac3b255aa74c9" } tempfile = { version = "3.3", optional = true } # use with testing feature mockall = "0.11.4" diff --git a/massa-protocol-exports/src/bootstrap_peers.rs b/massa-protocol-exports/src/bootstrap_peers.rs index f2b80e80be0..e91410d07bd 100644 --- a/massa-protocol-exports/src/bootstrap_peers.rs +++ b/massa-protocol-exports/src/bootstrap_peers.rs @@ -19,6 +19,13 @@ use peernet::peer_id::PeerId; use peernet::transports::TransportType; use serde::{Deserialize, Serialize}; +/// Peer info provided in bootstrap +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PeerData { + pub listeners: HashMap, + pub category: String, +} + /// Peers that are transmitted during bootstrap #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BootstrapPeers(pub Vec<(PeerId, HashMap)>); diff --git a/massa-protocol-exports/src/lib.rs b/massa-protocol-exports/src/lib.rs index 1ee23259690..961ef5dcea3 100644 --- a/massa-protocol-exports/src/lib.rs +++ b/massa-protocol-exports/src/lib.rs @@ -3,13 +3,15 @@ mod controller_trait; mod error; mod settings; -pub use bootstrap_peers::{BootstrapPeers, BootstrapPeersDeserializer, BootstrapPeersSerializer}; +pub use bootstrap_peers::{ + BootstrapPeers, BootstrapPeersDeserializer, BootstrapPeersSerializer, PeerData, +}; pub use controller_trait::{ProtocolController, ProtocolManager}; pub use error::ProtocolError; pub use peernet::peer::PeerConnectionType; pub use peernet::peer_id::PeerId; pub use peernet::transports::TransportType; -pub use settings::ProtocolConfig; +pub use settings::{PeerCategoryInfo, ProtocolConfig}; #[cfg(feature = "testing")] pub mod test_exports; diff --git a/massa-protocol-exports/src/settings.rs b/massa-protocol-exports/src/settings.rs index db7674bca4b..620966a8d70 100644 --- a/massa-protocol-exports/src/settings.rs +++ b/massa-protocol-exports/src/settings.rs @@ -6,10 +6,19 @@ use std::{ path::PathBuf, }; +use massa_models::version::Version; use massa_time::MassaTime; use peernet::transports::TransportType; use serde::Deserialize; +#[derive(Debug, Deserialize, Clone, Copy)] +pub struct PeerCategoryInfo { + pub target_out_connections: usize, + pub max_in_connections_pre_handshake: usize, + pub max_in_connections_post_handshake: usize, + pub max_in_connections_per_ip: usize, +} + /// Dynamic protocol configuration mix in static settings and constants configurations. #[derive(Debug, Deserialize, Clone)] pub struct ProtocolConfig { @@ -19,10 +28,6 @@ pub struct ProtocolConfig { pub listeners: HashMap, /// initial peers path pub initial_peers: PathBuf, - /// max number of in connections - pub max_in_connections: usize, - /// max number of out connections - pub max_out_connections: usize, /// after `ask_block_timeout` milliseconds we try to ask a block to another node pub ask_block_timeout: MassaTime, /// Max known blocks we keep in block_handler @@ -135,10 +140,20 @@ pub struct ProtocolConfig { pub max_size_listeners_per_peer: u64, /// Last start period pub last_start_period: u64, + /// try connection timer + pub try_connection_timer: MassaTime, + /// Timeout connection + pub timeout_connection: MassaTime, /// Number of bytes per second that can be read/write in a connection (should be a 10 multiplier) pub read_write_limit_bytes_per_second: u128, /// Optional routable ip pub routable_ip: Option, /// debug prints pub debug: bool, + /// Peers categories infos + pub peers_categories: HashMap, + /// Default category infos + pub default_category_info: PeerCategoryInfo, + /// Version + pub version: Version, } diff --git a/massa-protocol-exports/src/test_exports/config.rs b/massa-protocol-exports/src/test_exports/config.rs index 6ebc39a6094..2f8a7ed2835 100644 --- a/massa-protocol-exports/src/test_exports/config.rs +++ b/massa-protocol-exports/src/test_exports/config.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::ProtocolConfig; +use crate::{settings::PeerCategoryInfo, ProtocolConfig}; use massa_models::config::ENDORSEMENT_COUNT; use massa_time::MassaTime; use tempfile::NamedTempFile; @@ -8,8 +8,6 @@ use tempfile::NamedTempFile; impl Default for ProtocolConfig { fn default() -> Self { ProtocolConfig { - max_in_connections: 10, - max_out_connections: 10, keypair_file: NamedTempFile::new() .expect("cannot create temp file") .path() @@ -75,8 +73,18 @@ impl Default for ProtocolConfig { max_size_peers_announcement: 100, last_start_period: 0, read_write_limit_bytes_per_second: 1024 * 1000, + timeout_connection: MassaTime::from_millis(1000), + try_connection_timer: MassaTime::from_millis(5000), routable_ip: None, debug: true, + peers_categories: HashMap::default(), + default_category_info: PeerCategoryInfo { + max_in_connections_pre_handshake: 10, + max_in_connections_post_handshake: 10, + target_out_connections: 10, + max_in_connections_per_ip: 0, + }, + version: "TEST.22.1".parse().unwrap(), } } } diff --git a/massa-protocol-worker/Cargo.toml b/massa-protocol-worker/Cargo.toml index e3d3021c84e..bbd80e0515b 100644 --- a/massa-protocol-worker/Cargo.toml +++ b/massa-protocol-worker/Cargo.toml @@ -12,7 +12,7 @@ crossbeam = "0.8" serde_json = "1.0" nom = "7.1" num_enum = "0.5" -peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" } +peernet = { git = "https://github.com/massalabs/PeerNet", rev = "cf4cb09deda8cdd9b8893cc9595ac3b255aa74c9" } tempfile = { version = "3.3", optional = true } # use with testing feature rayon = "1.7.0" lru = "0.10.0" diff --git a/massa-protocol-worker/src/connectivity.rs b/massa-protocol-worker/src/connectivity.rs index 81a90a30cfe..4f0411e8518 100644 --- a/massa-protocol-worker/src/connectivity.rs +++ b/massa-protocol-worker/src/connectivity.rs @@ -5,21 +5,21 @@ use crossbeam::{ use massa_consensus_exports::ConsensusController; use massa_models::stats::NetworkStats; use massa_pool_exports::PoolController; -use massa_protocol_exports::{BootstrapPeers, ProtocolConfig, ProtocolError}; +use massa_protocol_exports::{PeerCategoryInfo, ProtocolConfig, ProtocolError}; use massa_storage::Storage; use parking_lot::RwLock; -use peernet::{ - peer::PeerConnectionType, - transports::{OutConnectionConfig, TransportType}, -}; +use peernet::{peer::PeerConnectionType, transports::OutConnectionConfig}; use peernet::{peer_id::PeerId, transports::TcpOutConnectionConfig}; -use std::collections::HashMap; use std::net::SocketAddr; +use std::{collections::HashMap, net::IpAddr}; use std::{num::NonZeroUsize, sync::Arc}; use std::{thread::JoinHandle, time::Duration}; use tracing::{info, warn}; -use crate::{handlers::peer_handler::models::SharedPeerDB, worker::ProtocolChannels}; +use crate::{ + handlers::peer_handler::models::{InitialPeers, PeerState, SharedPeerDB}, + worker::ProtocolChannels, +}; use crate::{handlers::peer_handler::PeerManagementHandler, messages::MessagesHandler}; use crate::{ handlers::{ @@ -44,7 +44,6 @@ pub enum ConnectivityCommand { #[allow(clippy::too_many_arguments)] pub(crate) fn start_connectivity_thread( - config: ProtocolConfig, peer_id: PeerId, mut network_controller: Box, consensus_controller: Box, @@ -53,20 +52,15 @@ pub(crate) fn start_connectivity_thread( channel_endorsements: (Sender, Receiver), channel_operations: (Sender, Receiver), channel_peers: (Sender, Receiver), - bootstrap_peers: Option, + initial_peers: InitialPeers, peer_db: SharedPeerDB, storage: Storage, protocol_channels: ProtocolChannels, messages_handler: MessagesHandler, + peer_categories: HashMap, PeerCategoryInfo)>, + default_category: PeerCategoryInfo, + config: ProtocolConfig, ) -> Result<(Sender, JoinHandle<()>), ProtocolError> { - let initial_peers = if let Some(bootstrap_peers) = bootstrap_peers { - bootstrap_peers.0.into_iter().collect() - } else { - serde_json::from_str::>>( - &std::fs::read_to_string(&config.initial_peers)?, - )? - }; - let handle = std::thread::Builder::new() .name("protocol-connectivity".to_string()) .spawn({ @@ -88,18 +82,20 @@ pub(crate) fn start_connectivity_thread( std::thread::sleep(Duration::from_millis(100)); // Create cache outside of the op handler because it could be used by other handlers + let total_in_slots = config.peers_categories.values().map(|v| v.max_in_connections_post_handshake).sum::() + config.default_category_info.max_in_connections_post_handshake; + let total_out_slots = config.peers_categories.values().map(| v| v.target_out_connections).sum::() + config.default_category_info.target_out_connections; let operation_cache = Arc::new(RwLock::new(OperationCache::new( NonZeroUsize::new(config.max_known_ops_size).unwrap(), - NonZeroUsize::new(config.max_in_connections + config.max_out_connections).unwrap(), + NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), ))); let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new( NonZeroUsize::new(config.max_known_endorsements_size).unwrap(), - NonZeroUsize::new(config.max_in_connections + config.max_out_connections).unwrap(), + NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), ))); let block_cache = Arc::new(RwLock::new(BlockCache::new( NonZeroUsize::new(config.max_known_blocks_size).unwrap(), - NonZeroUsize::new(config.max_in_connections + config.max_out_connections).unwrap(), + NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), ))); // Start handlers @@ -111,6 +107,8 @@ pub(crate) fn start_connectivity_thread( protocol_channels.peer_management_handler, messages_handler, network_controller.get_active_connections(), + peer_categories.iter().map(|(key, value)|(key.clone(), (value.0.clone(), value.1.target_out_connections))).collect(), + config.default_category_info.target_out_connections, &config, ); @@ -192,7 +190,9 @@ pub(crate) fn start_connectivity_thread( banned_peer_count, known_peer_count, }; - let peers: HashMap = network_controller.get_active_connections().get_peers_connected(); + let peers: HashMap = network_controller.get_active_connections().get_peers_connected().into_iter().map(|(peer_id, peer)| { + (peer_id, (peer.0, peer.1)) + }).collect(); responder.send((stats, peers)).unwrap_or_else(|_| warn!("Failed to send stats to responder")); } Err(_) => { @@ -201,46 +201,62 @@ pub(crate) fn start_connectivity_thread( } } } - default(Duration::from_millis(1000)) => { - if config.debug { - println!("nb peers connected: {}", network_controller.get_active_connections().get_peer_ids_connected().len()); - } - // Check if we need to connect to peers - let nb_connection_to_try = { - let nb_connection_to_try = network_controller.get_active_connections().get_max_out_connections().saturating_sub(network_controller.get_active_connections().get_nb_out_connections()); - if nb_connection_to_try == 0 { - continue; - } - nb_connection_to_try - }; - if config.debug { - println!("Trying to connect to {} peers", nb_connection_to_try); - } - // Get the best peers + default(config.try_connection_timer.to_duration()) => { + let peers_connected = network_controller.get_active_connections().get_peers_connected(); { - let peer_db_read = peer_management_handler.peer_db.read(); - let best_peers = peer_db_read.get_best_peers(nb_connection_to_try); - for peer_id in best_peers { - let Some(peer_info) = peer_db_read.peers.get(&peer_id) else { - warn!("Peer {} not found in peer_db", peer_id); - continue; - }; - //TODO: Adapt for multiple listeners - let (addr, _) = peer_info.last_announce.listeners.iter().next().unwrap(); - if peer_info.last_announce.listeners.is_empty() { + let peer_db_read = peer_db.read(); + 'iter_peers: for (_, peer_id) in &peer_db_read.index_by_newest { + if peers_connected.contains_key(peer_id) { continue; } - { - if !addr.ip().to_canonical().is_global() || !network_controller.get_active_connections().check_addr_accepted(addr) { + if let Some(peer_info) = peer_db_read.peers.get(peer_id).and_then(|peer| { + if peer.state == PeerState::Trusted { + Some(peer.clone()) + } else { + None + } + }) { + if peer_info.last_announce.listeners.is_empty() { continue; } + //TODO: Adapt for multiple listeners + let (addr, _) = peer_info.last_announce.listeners.iter().next().unwrap(); + let canonical_ip = addr.ip().to_canonical(); + if !canonical_ip.is_global() { + continue; + } + // Check if the peer is in a category and we didn't reached out target yet + let mut category_found = false; + for (name, (ips, infos)) in &peer_categories { + if ips.contains(&canonical_ip) { + if infos.target_out_connections > peers_connected.iter().filter(|(_, (_, connection_type, category))| { + if connection_type == &PeerConnectionType::OUT && let Some(category) = category { + category == name + } else { + false + } + }).count() { + category_found = true; + break; + } else { + continue 'iter_peers; + } + } + } + + if !category_found && peers_connected.iter().filter(|(_, (_, connection_type, category))| { + connection_type == &PeerConnectionType::OUT && category.is_none() + }).count() >= default_category.target_out_connections { + continue; + } + info!("Trying to connect to addr {} of peer {}", addr, peer_id); + // We only manage TCP for now + if let Err(err) = network_controller.try_connect(*addr, config.timeout_connection.to_duration(), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100))))) { + warn!("Failed to connect to peer {:?}: {:?}", addr, err); + } + break; } - info!("Trying to connect to addr {} of peer {}", addr, peer_id); - // We only manage TCP for now - if let Err(err) = network_controller.try_connect(*addr, Duration::from_millis(200), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100))))) { - warn!("Failed to connect to peer {:?}: {:?}", addr, err); - } - }; + } } } } diff --git a/massa-protocol-worker/src/handlers/peer_handler/mod.rs b/massa-protocol-worker/src/handlers/peer_handler/mod.rs index c43def1bafd..44820bb4e09 100644 --- a/massa-protocol-worker/src/handlers/peer_handler/mod.rs +++ b/massa-protocol-worker/src/handlers/peer_handler/mod.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::net::IpAddr; use std::{collections::HashMap, net::SocketAddr, thread::JoinHandle, time::Duration}; use crossbeam::channel::tick; @@ -6,8 +7,10 @@ use crossbeam::{ channel::{Receiver, Sender}, select, }; +use massa_models::version::{VersionDeserializer, VersionSerializer}; use massa_protocol_exports::{BootstrapPeers, ProtocolConfig}; use massa_serialization::{DeserializeError, Deserializer, Serializer}; +use peernet::types::PUBLIC_KEY_SIZE_BYTES; use rand::{rngs::StdRng, RngCore, SeedableRng}; use peernet::messages::MessagesSerializer; @@ -69,6 +72,8 @@ impl PeerManagementHandler { (sender_cmd, receiver_cmd): (Sender, Receiver), messages_handler: MessagesHandler, mut active_connections: Box, + target_out_connections: HashMap, usize)>, + default_target_out_connections: usize, config: &ProtocolConfig, ) -> Self { let message_serializer = PeerManagementMessageSerializer::new(); @@ -78,6 +83,8 @@ impl PeerManagementHandler { active_connections.clone(), peer_db.clone(), messages_handler, + target_out_connections, + default_target_out_connections, ); let thread_join = std::thread::Builder::new() @@ -239,6 +246,8 @@ impl PeerManagementHandler { pub struct MassaHandshake { pub announcement_serializer: AnnouncementSerializer, pub announcement_deserializer: AnnouncementDeserializer, + pub version_serializer: VersionSerializer, + pub version_deserializer: VersionDeserializer, pub config: ProtocolConfig, pub peer_db: SharedPeerDB, peer_mngt_msg_serializer: crate::messages::MessagesSerializer, @@ -259,6 +268,8 @@ impl MassaHandshake { max_listeners: config.max_size_listeners_per_peer, }, ), + version_serializer: VersionSerializer::new(), + version_deserializer: VersionDeserializer::new(), config, peer_mngt_msg_serializer: crate::messages::MessagesSerializer::new() .with_peer_management_message_serializer(PeerManagementMessageSerializer::new()), @@ -276,7 +287,14 @@ impl InitConnectionHandler for MassaHandshake { messages_handler: MassaMessagesHandler, ) -> PeerNetResult { let mut bytes = PeerId::from_public_key(keypair.get_public_key()).to_bytes(); - //TODO: Add version in announce + self.version_serializer + .serialize(&self.config.version, &mut bytes) + .map_err(|err| { + PeerNetError::HandshakeError.error( + "Massa Handshake", + Some(format!("Failed to serialize version: {}", err)), + ) + })?; bytes.push(0); let listeners_announcement = Announcement::new(listeners.clone(), self.config.routable_ip, keypair).unwrap(); @@ -304,14 +322,7 @@ impl InitConnectionHandler for MassaHandshake { Some("Failed to deserialize PeerId".to_string()), ) })?)?; - { - let peer_db_read = self.peer_db.read(); - if let Some(info) = peer_db_read.peers.get(&peer_id) { - if info.state == PeerState::Banned { - debug!("Banned peer tried to connect: {:?}", peer_id); - } - } - } + offset += PUBLIC_KEY_SIZE_BYTES; { let peer_db_read = self.peer_db.read(); if let Some(info) = peer_db_read.peers.get(&peer_id) { @@ -332,7 +343,22 @@ impl InitConnectionHandler for MassaHandshake { }); } - offset += 32; + let (received, version) = self + .version_deserializer + .deserialize::(&received[offset..]) + .map_err(|err| { + PeerNetError::HandshakeError.error( + "Massa Handshake", + Some(format!("Failed to deserialize version: {}", err)), + ) + })?; + if !self.config.version.is_compatible(&version) { + return Err(PeerNetError::HandshakeError.error( + "Massa Handshake", + Some(format!("Received version incompatible: {}", version)), + )); + } + offset = 0; let id = received.get(offset).ok_or( PeerNetError::HandshakeError .error("Massa Handshake", Some("Failed to get id".to_string())), @@ -346,7 +372,7 @@ impl InitConnectionHandler for MassaHandshake { .map_err(|err| { PeerNetError::HandshakeError.error( "Massa Handshake", - Some(format!("Failed to serialize announcement: {}", err)), + Some(format!("Failed to deserialize announcement: {}", err)), ) })?; if peer_id @@ -418,17 +444,14 @@ impl InitConnectionHandler for MassaHandshake { // check their signature peer_id.verify_signature(&self_random_hash, &other_signature)?; - Ok((peer_id.clone(), announcement)) + Ok((peer_id.clone(), Some(announcement))) } 1 => { let (received, id) = self .message_handlers .deserialize_id(&received[offset..], &peer_id)?; self.message_handlers.handle(id, received, &peer_id)?; - Err(PeerNetError::HandshakeError.error( - "Massa Handshake", - Some("Handshake failed received a message that our connection has been refused".to_string()), - )) + Ok((peer_id.clone(), None)) } _ => Err(PeerNetError::HandshakeError .error("Massa Handshake", Some("Invalid message id".to_string()))), @@ -438,7 +461,7 @@ impl InitConnectionHandler for MassaHandshake { let mut peer_db_write = self.peer_db.write(); // if handshake failed, we set the peer state to HandshakeFailed match &res { - Ok((peer_id, announcement)) => { + Ok((peer_id, Some(announcement))) => { info!("Peer connected: {:?}", peer_id); //TODO: Hacky organize better when multiple ip/listeners if !announcement.listeners.is_empty() { @@ -461,6 +484,16 @@ impl InitConnectionHandler for MassaHandshake { state: PeerState::Trusted, }); } + Ok((_peer_id, None)) => { + peer_db_write.peers.entry(peer_id).and_modify(|info| { + //TODO: Add the peerdb but for now impossible as we don't have announcement and we need one to place in peerdb + info.state = PeerState::HandshakeFailed; + }); + return Err(PeerNetError::HandshakeError.error( + "Massa Handshake", + Some("Distant peer don't have slot for us.".to_string()), + )); + } Err(_) => { peer_db_write.peers.entry(peer_id).and_modify(|info| { //TODO: Add the peerdb but for now impossible as we don't have announcement and we need one to place in peerdb @@ -493,18 +526,44 @@ impl InitConnectionHandler for MassaHandshake { ) -> PeerNetResult<()> { //TODO: Fix this clone let keypair = keypair.clone(); - let mut endpoint = endpoint.clone(); + let mut endpoint = endpoint.try_clone()?; let db = self.peer_db.clone(); let serializer = self.peer_mngt_msg_serializer.clone(); + let version_serializer = self.version_serializer.clone(); + let version = self.config.version; std::thread::spawn(move || { let peers_to_send = db.read().get_rand_peers_to_send(100); let mut buf = PeerId::from_public_key(keypair.get_public_key()).to_bytes(); + if let Err(err) = version_serializer + .serialize(&version, &mut buf) + .map_err(|err| { + PeerNetError::HandshakeError.error( + "Massa Handshake", + Some(format!( + "Failed serialize version, Err: {:?}", + err.to_string() + )), + ) + }) + { + warn!("{}", err.to_string()); + return; + } buf.push(1); let msg = PeerManagementMessage::ListPeers(peers_to_send).into(); - serializer.serialize_id(&msg, &mut buf).unwrap(); - serializer.serialize(&msg, &mut buf).unwrap(); - endpoint.send(buf.as_slice()).unwrap(); - std::thread::sleep(Duration::from_millis(500)); + if let Err(err) = serializer.serialize_id(&msg, &mut buf) { + warn!("Failed to serialize id message: {}", err); + return; + } + if let Err(err) = serializer.serialize(&msg, &mut buf) { + warn!("Failed to serialize message: {}", err); + return; + } + //TODO: Make it non blockable + if let Err(err) = endpoint.send(buf.as_slice()) { + warn!("Failed to send message: {}", err); + return; + } endpoint.shutdown(); }); Ok(()) diff --git a/massa-protocol-worker/src/handlers/peer_handler/models.rs b/massa-protocol-worker/src/handlers/peer_handler/models.rs index 5db7214b64e..4c26fb0d6b4 100644 --- a/massa-protocol-worker/src/handlers/peer_handler/models.rs +++ b/massa-protocol-worker/src/handlers/peer_handler/models.rs @@ -1,11 +1,12 @@ use crossbeam::channel::Sender; use massa_protocol_exports::{BootstrapPeers, ProtocolError}; +use massa_time::MassaTime; use parking_lot::RwLock; use peernet::{peer_id::PeerId, transports::TransportType}; use rand::seq::SliceRandom; use std::cmp::Reverse; use std::collections::BTreeSet; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use tracing::log::info; @@ -18,8 +19,10 @@ pub type InitialPeers = HashMap>; #[derive(Default)] pub struct PeerDB { pub peers: HashMap, - /// last is the oldest value (only routable peers) + /// peers tested successfully last is the oldest value (only routable peers) //TODO: need to be pruned pub index_by_newest: BTreeSet<(Reverse, PeerId)>, + /// Tested addresses used to avoid testing the same address too often. //TODO: Need to be pruned + pub tested_addresses: HashMap, } pub type SharedPeerDB = Arc>; @@ -73,35 +76,22 @@ impl PeerDB { }; } - /// get best peers for a given number of peers - /// returns a vector of peer ids - pub fn get_best_peers(&self, nb_peers: usize) -> Vec { - self.index_by_newest - .iter() - .filter_map(|(_, peer_id)| { - self.peers.get(peer_id).and_then(|peer| { - if peer.state == PeerState::Trusted { - Some(peer_id.clone()) - } else { - None - } - }) - }) - .take(nb_peers) - .collect() - } - /// Retrieve the peer with the oldest test date. - pub fn get_oldest_peer(&self) -> Option<(PeerId, PeerInfo)> { - self.index_by_newest.last().map(|data| { - let peer_id = data.1.clone(); - let peer_info = self - .peers - .get(&peer_id) - .unwrap_or_else(|| panic!("Peer {:?} not found", peer_id)) - .clone(); - (peer_id, peer_info) - }) + pub fn get_oldest_peer(&self, cooldown: Duration) -> Option { + match self + .tested_addresses + .iter() + .min_by_key(|(_, timestamp)| *(*timestamp)) + { + Some((addr, timestamp)) => { + if timestamp.estimate_instant().ok()?.elapsed() > cooldown { + Some(*addr) + } else { + None + } + } + None => None, + } } /// Select max 100 peers to send to another peer @@ -110,6 +100,7 @@ impl PeerDB { &self, nb_peers: usize, ) -> Vec<(PeerId, HashMap)> { + //TODO: Add ourself let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backward") diff --git a/massa-protocol-worker/src/handlers/peer_handler/tester.rs b/massa-protocol-worker/src/handlers/peer_handler/tester.rs index 340e86c5f94..a74847d614d 100644 --- a/massa-protocol-worker/src/handlers/peer_handler/tester.rs +++ b/massa-protocol-worker/src/handlers/peer_handler/tester.rs @@ -1,14 +1,16 @@ use std::{ collections::HashMap, - net::SocketAddr, + net::{IpAddr, SocketAddr}, thread::JoinHandle, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; use crate::messages::MessagesHandler; use crossbeam::channel::Sender; -use massa_protocol_exports::ProtocolConfig; +use massa_models::version::{Version, VersionDeserializer}; +use massa_protocol_exports::{PeerConnectionType, ProtocolConfig}; use massa_serialization::{DeserializeError, Deserializer}; +use massa_time::MassaTime; use peernet::{ config::PeerNetConfiguration, error::{PeerNetError, PeerNetResult}, @@ -32,7 +34,9 @@ use crate::wrap_network::ActiveConnectionsTrait; #[derive(Clone)] pub struct TesterHandshake { peer_db: SharedPeerDB, + our_version: Version, announcement_deserializer: AnnouncementDeserializer, + version_deserializer: VersionDeserializer, } impl TesterHandshake { @@ -44,6 +48,8 @@ impl TesterHandshake { max_listeners: config.max_size_listeners_per_peer, }, ), + our_version: config.version, + version_deserializer: VersionDeserializer::new(), } } } @@ -71,18 +77,32 @@ impl InitConnectionHandler for TesterHandshake { })?)?; let res = { { - // check if peer is banned else set state to InHandshake + // check if peer is banned let mut peer_db_write = self.peer_db.write(); if let Some(info) = peer_db_write.peers.get_mut(&peer_id) { if info.state == super::PeerState::Banned { return Err(PeerNetError::HandshakeError .error("Tester Handshake", Some(String::from("Peer is banned")))); - } else { - info.state = super::PeerState::InHandshake; } } } - let id = data.get(32).ok_or( + + let (data, version) = self + .version_deserializer + .deserialize::(&data[32..]) + .map_err(|err| { + PeerNetError::HandshakeError.error( + "Tester Handshake", + Some(format!("Failed to deserialize version: {}", err)), + ) + })?; + if !self.our_version.is_compatible(&version) { + return Err(PeerNetError::HandshakeError.error( + "Massa Handshake", + Some(format!("Received version incompatible: {}", version)), + )); + } + let id = data.first().ok_or( PeerNetError::HandshakeError .error("Massa Handshake", Some("Failed to get id".to_string())), )?; @@ -90,7 +110,7 @@ impl InitConnectionHandler for TesterHandshake { 0 => { let (_, announcement) = self .announcement_deserializer - .deserialize::(&data[33..]) + .deserialize::(&data[1..]) .map_err(|err| { PeerNetError::HandshakeError.error( "Tester Handshake", @@ -134,12 +154,12 @@ impl InitConnectionHandler for TesterHandshake { Ok(peer_id.clone()) } 1 => { - let (received, id) = messages_handler.deserialize_id(&data[33..], &peer_id)?; + let (received, id) = messages_handler.deserialize_id(&data[1..], &peer_id)?; messages_handler.handle(id, received, &peer_id)?; Err(PeerNetError::HandshakeError.error( - "Massa Handshake", - Some("Tester Handshake failed received a message that our connection has been refused".to_string()), - )) + "Massa Handshake", + Some("Tester Handshake failed received a message that our connection has been refused".to_string()), + )) //TODO: Add the peerdb but for now impossible as we don't have announcement and we need one to place in peerdb } _ => Err(PeerNetError::HandshakeError @@ -180,6 +200,8 @@ impl Tester { active_connections: Box, peer_db: SharedPeerDB, messages_handler: MessagesHandler, + target_out_connections: HashMap, usize)>, + default_target_out_connections: usize, ) -> ( Sender<(PeerId, HashMap)>, Vec, @@ -197,6 +219,8 @@ impl Tester { config.clone(), test_receiver.clone(), messages_handler.clone(), + target_out_connections.clone(), + default_target_out_connections, )); } @@ -210,6 +234,8 @@ impl Tester { protocol_config: ProtocolConfig, receiver: crossbeam::channel::Receiver<(PeerId, HashMap)>, messages_handler: MessagesHandler, + target_out_connections: HashMap, usize)>, + default_target_out_connections: usize, ) -> Self { tracing::log::debug!("running new tester"); @@ -218,53 +244,104 @@ impl Tester { .spawn(move || { let db = peer_db.clone(); let active_connections = active_connections.clone(); - let mut config = PeerNetConfiguration::default( + let config = PeerNetConfiguration::default( TesterHandshake::new(peer_db, protocol_config.clone()), messages_handler, ); - config.max_out_connections = 1; let mut network_manager = PeerNetManager::new(config); + let protocol_config = protocol_config.clone(); loop { crossbeam::select! { recv(receiver) -> res => { match res { Ok(listener) => { - if let Some(peer_info) = db.read().peers.get(&listener.0) { - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backward") - .as_millis(); - let elapsed_secs = (timestamp.saturating_sub(peer_info.last_announce.timestamp)) / 1000; - if elapsed_secs < 60 { - continue; - } + if listener.1.is_empty() { + continue; } - // receive new listener to test - listener.1.iter().for_each(|(addr, _transport)| { - // Don't launch test if peer is already connected to us as a normal connection. - // Maybe we need to have a way to still update his last announce timestamp because he is a great peer - if active_connections.check_addr_accepted(addr) { - //Don't test our local addresses - for (local_addr, _transport) in protocol_config.listeners.iter() { - if addr == local_addr { - return; + //Test + let peers_connected = active_connections.get_peers_connected(); + let slots_out_connections: HashMap, usize)> = target_out_connections + .iter() + .map(|(key, value)| { + let mut value = value.clone(); + value.1 = value.1.saturating_sub(peers_connected.iter().filter(|(_, (_, ty, category))| { + if ty == &PeerConnectionType::IN { + return false; + } + if let Some(category) = category { + category == key + } else { + false + } + }).count()); + (key.clone(), value) + }) + .collect(); + let slot_default_category = default_target_out_connections.saturating_sub(peers_connected.iter().filter(|(_, (_, ty, category))| { + if ty == &PeerConnectionType::IN { + return false; + } + if category.is_some() { + return false; + } + true + }).count()); + { + let now = MassaTime::now().unwrap(); + let mut db = db.write(); + // receive new listener to test + for (addr, _) in listener.1.iter() { + //Find category of that address + let ip_canonical = addr.ip().to_canonical(); + let cooldown = 'cooldown: { + for category in &slots_out_connections { + if category.1.0.contains(&ip_canonical) { + if category.1.1 == 0 { + break 'cooldown Duration::from_secs(60 * 60 * 2); + } else { + break 'cooldown Duration::from_secs(30); + } + } + } + if slot_default_category == 0 { + Duration::from_secs(60 * 60 * 2) + } else { + Duration::from_secs(30) + } + }; + //TODO: Change it to manage multiple listeners SAFETY: Check above + if let Some(last_tested_time) = db.tested_addresses.get(addr) { + let last_tested_time = last_tested_time.estimate_instant().expect("Time went backward"); + if last_tested_time.elapsed() < cooldown { + continue; } } - //Don't test our proper ip - if let Some(ip) = protocol_config.routable_ip { - if ip.to_canonical() == addr.ip().to_canonical() { - return; + db.tested_addresses.insert(*addr, now); + // TODO: Don't launch test if peer is already connected to us as a normal connection. + // Maybe we need to have a way to still update his last announce timestamp because he is a great peer + if ip_canonical.is_global() && !active_connections.get_peers_connected().iter().any(|(_, (addr, _, _))| addr.ip().to_canonical() == ip_canonical) { + //Don't test our local addresses + for (local_addr, _transport) in protocol_config.listeners.iter() { + if addr == local_addr { + continue; + } + } + //Don't test our proper ip + if let Some(ip) = protocol_config.routable_ip { + if ip.to_canonical() == ip_canonical { + continue; + } } + info!("testing peer {} listener addr: {}", &listener.0, &addr); + let _res = network_manager.try_connect( + *addr, + protocol_config.timeout_connection.to_duration(), + &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(protocol_config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100)))), + ); } - info!("testing peer {} listener addr: {}", &listener.0, &addr); - let _res = network_manager.try_connect( - *addr, - Duration::from_millis(1000), - &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(protocol_config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100)))), - ); - } - }); + }; + } }, Err(_e) => break, } @@ -272,44 +349,38 @@ impl Tester { default(Duration::from_secs(2)) => { // If no message in 2 seconds they will test a peer that hasn't been tested for long time - // we find the last peer that has been tested - let Some((peer_id, peer_info)) = db.read().get_oldest_peer() else { + let Some(listener) = db.read().get_oldest_peer(Duration::from_secs(60 * 60 * 2)) else { continue; }; - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backward") - .as_millis(); - let elapsed_secs = (timestamp.saturating_sub(peer_info.last_announce.timestamp)) / 1000; - if elapsed_secs < 60 { - continue; + { + let mut db = db.write(); + db.tested_addresses.insert(listener, MassaTime::now().unwrap()); } // we try to connect to all peer listener (For now we have only one listener) - peer_info.last_announce.listeners.iter().for_each(|listener| { - if !listener.0.ip().to_canonical().is_global() || !active_connections.check_addr_accepted(listener.0) { - return; - } - //Don't test our local addresses - for (local_addr, _transport) in protocol_config.listeners.iter() { - if listener.0 == local_addr { - return; - } + let ip_canonical = listener.ip().to_canonical(); + if !ip_canonical.is_global() || active_connections.get_peers_connected().iter().any(|(_, (addr, _, _))| addr.ip().to_canonical() == ip_canonical) { + continue; + } + //Don't test our local addresses + for (local_addr, _transport) in protocol_config.listeners.iter() { + if listener == *local_addr { + continue; } - //Don't test our proper ip - if let Some(ip) = protocol_config.routable_ip { - if ip.to_canonical() == listener.0.ip().to_canonical() { - return; - } + } + //Don't test our proper ip + if let Some(ip) = protocol_config.routable_ip { + if ip.to_canonical() == ip_canonical { + continue; } - info!("testing peer {} listener addr: {}", peer_id, &listener.0); - let _res = network_manager.try_connect( - *listener.0, - Duration::from_millis(1000), - &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(protocol_config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100)))), - ); - }); + } + info!("testing listener addr: {}", &listener); + let _res = network_manager.try_connect( + listener, + protocol_config.timeout_connection.to_duration(), + &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(protocol_config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100)))), + ); } } } diff --git a/massa-protocol-worker/src/tests/context.rs b/massa-protocol-worker/src/tests/context.rs index b9b77752529..0f8c14fd15b 100644 --- a/massa-protocol-worker/src/tests/context.rs +++ b/massa-protocol-worker/src/tests/context.rs @@ -1,4 +1,4 @@ -use std::{fs::read_to_string, sync::Arc}; +use std::{collections::HashMap, fs::read_to_string, sync::Arc}; use crate::{ connectivity::start_connectivity_thread, create_protocol_controller, @@ -16,7 +16,7 @@ use massa_pool_exports::{ PoolController, }; use massa_protocol_exports::{ - PeerId, ProtocolConfig, ProtocolController, ProtocolError, ProtocolManager, + PeerCategoryInfo, PeerId, ProtocolConfig, ProtocolController, ProtocolError, ProtocolManager, }; use massa_serialization::U64VarIntDeserializer; use massa_storage::Storage; @@ -84,7 +84,6 @@ pub fn start_protocol_controller_with_mock_network( let network_controller = Box::new(MockNetworkController::new(message_handlers.clone())); let connectivity_thread_handle = start_connectivity_thread( - config, PeerId::from_public_key(keypair.get_public_key()), network_controller.clone(), consensus_controller, @@ -93,11 +92,19 @@ pub fn start_protocol_controller_with_mock_network( (sender_endorsements, receiver_endorsements), (sender_operations, receiver_operations), (sender_peers, receiver_peers), - None, + HashMap::default(), peer_db, storage, channels, message_handlers, + HashMap::default(), + PeerCategoryInfo { + max_in_connections_pre_handshake: 10, + max_in_connections_post_handshake: 10, + target_out_connections: 10, + max_in_connections_per_ip: 10, + }, + config, )?; let manager = ProtocolManagerImpl::new(connectivity_thread_handle); diff --git a/massa-protocol-worker/src/tests/mock_network.rs b/massa-protocol-worker/src/tests/mock_network.rs index fd869a9a9ed..23a88a9f057 100644 --- a/massa-protocol-worker/src/tests/mock_network.rs +++ b/massa-protocol-worker/src/tests/mock_network.rs @@ -39,18 +39,10 @@ impl MockActiveConnections { type SharedMockActiveConnections = Arc>; impl ActiveConnectionsTrait for SharedMockActiveConnections { - fn check_addr_accepted(&self, _addr: &std::net::SocketAddr) -> bool { - true - } - fn clone_box(&self) -> Box { Box::new(self.clone()) } - fn get_max_out_connections(&self) -> usize { - 10 - } - fn get_nb_out_connections(&self) -> usize { //TODO: Place a coherent value 0 @@ -61,7 +53,9 @@ impl ActiveConnectionsTrait for SharedMockActiveConnections { 0 } - fn get_peers_connected(&self) -> HashMap { + fn get_peers_connected( + &self, + ) -> HashMap)> { self.read() .connections .iter() @@ -71,6 +65,7 @@ impl ActiveConnectionsTrait for SharedMockActiveConnections { ( std::net::SocketAddr::from(([127, 0, 0, 1], 0)), PeerConnectionType::OUT, + None, ), ) }) diff --git a/massa-protocol-worker/src/tests/mod.rs b/massa-protocol-worker/src/tests/mod.rs index d62dfe0660a..f8ec6bb903d 100644 --- a/massa-protocol-worker/src/tests/mod.rs +++ b/massa-protocol-worker/src/tests/mod.rs @@ -2,15 +2,12 @@ use std::{collections::HashMap, fs::read_to_string, time::Duration}; use massa_consensus_exports::test_exports::ConsensusControllerImpl; use massa_pool_exports::test_exports::MockPoolController; -use massa_protocol_exports::ProtocolConfig; +use massa_protocol_exports::{PeerCategoryInfo, PeerData, ProtocolConfig}; use massa_storage::Storage; use peernet::{peer_id::PeerId, transports::TransportType, types::KeyPair}; use tempfile::NamedTempFile; -use crate::{ - create_protocol_controller, handlers::peer_handler::models::InitialPeers, - start_protocol_controller, -}; +use crate::{create_protocol_controller, start_protocol_controller}; mod ban_nodes_scenarios; mod block_scenarios; @@ -67,25 +64,55 @@ fn basic() { // Setup initial peers let initial_peers_file = NamedTempFile::new().expect("cannot create temp file"); - let mut initial_peers1: InitialPeers = InitialPeers::default(); + let mut initial_peers1: HashMap = HashMap::new(); let mut peers_1 = HashMap::new(); peers_1.insert("127.0.0.1:8082".parse().unwrap(), TransportType::Tcp); - initial_peers1.insert(PeerId::from_public_key(keypair2.get_public_key()), peers_1); + initial_peers1.insert( + PeerId::from_public_key(keypair2.get_public_key()), + PeerData { + listeners: peers_1, + category: "Bootstrap".to_string(), + }, + ); serde_json::to_writer_pretty(initial_peers_file.as_file(), &initial_peers1) .expect("unable to write ledger file"); let initial_peers_file_2 = NamedTempFile::new().expect("cannot create temp file"); - let mut initial_peers2: InitialPeers = InitialPeers::default(); + let mut initial_peers2: HashMap = HashMap::new(); let mut peers_2 = HashMap::new(); peers_2.insert("127.0.0.1:8081".parse().unwrap(), TransportType::Tcp); - initial_peers2.insert(PeerId::from_public_key(keypair1.get_public_key()), peers_2); + initial_peers2.insert( + PeerId::from_public_key(keypair1.get_public_key()), + PeerData { + listeners: peers_2, + category: "Bootstrap".to_string(), + }, + ); serde_json::to_writer_pretty(initial_peers_file_2.as_file(), &initial_peers2) .expect("unable to write ledger file"); config1.initial_peers = initial_peers_file.path().to_path_buf(); - config1.max_in_connections = 5; - config1.max_out_connections = 1; + let mut categories = HashMap::default(); + categories.insert( + "Bootstrap".to_string(), + PeerCategoryInfo { + max_in_connections_pre_handshake: 1, + max_in_connections_post_handshake: 1, + target_out_connections: 1, + max_in_connections_per_ip: 1, + }, + ); + config1.peers_categories = categories; config2.initial_peers = initial_peers_file_2.path().to_path_buf(); - config2.max_in_connections = 5; - config2.max_out_connections = 0; + let mut categories2 = HashMap::default(); + categories2.insert( + "Bootstrap".to_string(), + PeerCategoryInfo { + max_in_connections_pre_handshake: 5, + max_in_connections_post_handshake: 5, + target_out_connections: 1, + max_in_connections_per_ip: 1, + }, + ); + config2.peers_categories = categories2; config2.debug = false; // Setup the storages @@ -152,7 +179,7 @@ fn stop_with_controller_still_exists() { config2 .listeners .insert("127.0.0.1:8082".parse().unwrap(), TransportType::Tcp); - config2.keypair_file = "./src/tests/test_keypair1.json".to_string().into(); + config2.keypair_file = "./src/tests/test_keypair2.json".to_string().into(); let keypair_bs58_check_encoded = read_to_string(&config2.keypair_file) .map_err(|err| { std::io::Error::new(err.kind(), format!("could not load node key file: {}", err)) @@ -163,25 +190,55 @@ fn stop_with_controller_still_exists() { // Setup initial peers let initial_peers_file = NamedTempFile::new().expect("cannot create temp file"); - let mut initial_peers1: InitialPeers = InitialPeers::default(); + let mut initial_peers1: HashMap = HashMap::new(); let mut peers_1 = HashMap::new(); - peers_1.insert("127.0.0.1:8081".parse().unwrap(), TransportType::Tcp); - initial_peers1.insert(PeerId::from_public_key(keypair2.get_public_key()), peers_1); + peers_1.insert("127.0.0.1:8082".parse().unwrap(), TransportType::Tcp); + initial_peers1.insert( + PeerId::from_public_key(keypair2.get_public_key()), + PeerData { + listeners: peers_1, + category: "Bootstrap".to_string(), + }, + ); serde_json::to_writer_pretty(initial_peers_file.as_file(), &initial_peers1) .expect("unable to write ledger file"); let initial_peers_file_2 = NamedTempFile::new().expect("cannot create temp file"); - let mut initial_peers2: InitialPeers = InitialPeers::default(); + let mut initial_peers2: HashMap = HashMap::new(); let mut peers_2 = HashMap::new(); - peers_2.insert("127.0.0.1:8082".parse().unwrap(), TransportType::Tcp); - initial_peers2.insert(PeerId::from_public_key(keypair1.get_public_key()), peers_2); + peers_2.insert("127.0.0.1:8081".parse().unwrap(), TransportType::Tcp); + initial_peers2.insert( + PeerId::from_public_key(keypair1.get_public_key()), + PeerData { + listeners: peers_2, + category: "Bootstrap".to_string(), + }, + ); serde_json::to_writer_pretty(initial_peers_file_2.as_file(), &initial_peers2) .expect("unable to write ledger file"); config1.initial_peers = initial_peers_file.path().to_path_buf(); - config1.max_in_connections = 5; - config1.max_out_connections = 1; + let mut categories = HashMap::default(); + categories.insert( + "Bootstrap".to_string(), + PeerCategoryInfo { + max_in_connections_post_handshake: 1, + max_in_connections_pre_handshake: 1, + target_out_connections: 1, + max_in_connections_per_ip: 1, + }, + ); + config1.peers_categories = categories; config2.initial_peers = initial_peers_file_2.path().to_path_buf(); - config2.max_in_connections = 5; - config2.max_out_connections = 0; + let mut categories2 = HashMap::default(); + categories2.insert( + "Bootstrap".to_string(), + PeerCategoryInfo { + max_in_connections_post_handshake: 5, + max_in_connections_pre_handshake: 5, + target_out_connections: 1, + max_in_connections_per_ip: 1, + }, + ); + config2.peers_categories = categories2; config2.debug = false; // Setup the storages diff --git a/massa-protocol-worker/src/worker.rs b/massa-protocol-worker/src/worker.rs index ee7a0336c4b..41575f1d1bb 100644 --- a/massa-protocol-worker/src/worker.rs +++ b/massa-protocol-worker/src/worker.rs @@ -3,16 +3,21 @@ use massa_consensus_exports::ConsensusController; use massa_models::node::NodeId; use massa_pool_exports::PoolController; use massa_protocol_exports::{ - BootstrapPeers, PeerId, ProtocolConfig, ProtocolController, ProtocolError, ProtocolManager, + BootstrapPeers, PeerData, PeerId, ProtocolConfig, ProtocolController, ProtocolError, + ProtocolManager, }; use massa_serialization::U64VarIntDeserializer; use massa_signature::KeyPair; use massa_storage::Storage; use parking_lot::RwLock; use peernet::{ - config::PeerNetConfiguration, network_manager::PeerNetManager, types::KeyPair as PeerNetKeyPair, + config::{PeerNetCategoryInfo, PeerNetConfiguration}, + network_manager::PeerNetManager, + types::KeyPair as PeerNetKeyPair, +}; +use std::{ + collections::HashMap, fs::read_to_string, ops::Bound::Included, str::FromStr, sync::Arc, }; -use std::{fs::read_to_string, ops::Bound::Included, str::FromStr, sync::Arc}; use tracing::{debug, log::warn}; use crate::{ @@ -186,18 +191,71 @@ pub fn start_protocol_controller( keypair }; + let initial_peers_infos = serde_json::from_str::>( + &std::fs::read_to_string(&config.initial_peers)?, + )?; + + let initial_peers = if let Some(bootstrap_peers) = bootstrap_peers { + bootstrap_peers.0.into_iter().collect() + } else { + initial_peers_infos + .iter() + .map(|(peer_id, data)| (peer_id.clone(), data.listeners.clone())) + .collect() + }; + let peernet_keypair = PeerNetKeyPair::from_str(&keypair.to_string()).unwrap(); peernet_config.self_keypair = peernet_keypair.clone(); - //TODO: Add the rest of the config - peernet_config.max_in_connections = config.max_in_connections; - peernet_config.max_out_connections = config.max_out_connections; + let peernet_categories = config + .peers_categories + .iter() + .map(|(category_name, infos)| { + ( + category_name.clone(), + ( + initial_peers_infos + .iter() + .filter_map(|info| { + if info.1.category == *category_name { + //TODO: Adapt for multiple listeners + Some( + info.1 + .listeners + .iter() + .next() + .map(|addr| addr.0.ip().to_canonical()) + .unwrap(), + ) + } else { + None + } + }) + .collect(), + PeerNetCategoryInfo { + max_in_connections_post_handshake: infos.max_in_connections_post_handshake, + max_in_connections_pre_handshake: infos.max_in_connections_pre_handshake, + max_in_connections_per_ip: infos.max_in_connections_per_ip, + }, + ), + ) + }) + .collect(); + peernet_config.peers_categories = peernet_categories; + peernet_config.default_category_info = PeerNetCategoryInfo { + max_in_connections_pre_handshake: config + .default_category_info + .max_in_connections_pre_handshake, + max_in_connections_post_handshake: config + .default_category_info + .max_in_connections_post_handshake, + max_in_connections_per_ip: config.default_category_info.max_in_connections_per_ip, + }; let network_controller = Box::new(NetworkControllerImpl::new(PeerNetManager::new( peernet_config, ))); let connectivity_thread_handle = start_connectivity_thread( - config, PeerId::from_public_key(peernet_keypair.get_public_key()), network_controller, consensus_controller, @@ -206,11 +264,43 @@ pub fn start_protocol_controller( (sender_endorsements, receiver_endorsements), (sender_operations, receiver_operations), (sender_peers, receiver_peers), - bootstrap_peers, + initial_peers, peer_db, storage, protocol_channels, message_handlers, + config + .peers_categories + .iter() + .map(|(category_name, infos)| { + ( + category_name.clone(), + ( + initial_peers_infos + .iter() + .filter_map(|info| { + if info.1.category == *category_name { + //TODO: Adapt for multiple listeners + Some( + info.1 + .listeners + .iter() + .next() + .map(|addr| addr.0.ip().to_canonical()) + .unwrap(), + ) + } else { + None + } + }) + .collect(), + *infos, + ), + ) + }) + .collect(), + config.default_category_info, + config, )?; let manager = ProtocolManagerImpl::new(connectivity_thread_handle); diff --git a/massa-protocol-worker/src/wrap_network.rs b/massa-protocol-worker/src/wrap_network.rs index ea091858b89..21f1837a5a1 100644 --- a/massa-protocol-worker/src/wrap_network.rs +++ b/massa-protocol-worker/src/wrap_network.rs @@ -26,9 +26,9 @@ pub trait ActiveConnectionsTrait: Send + Sync { ) -> Result<(), ProtocolError>; fn clone_box(&self) -> Box; fn get_peer_ids_connected(&self) -> HashSet; - fn get_peers_connected(&self) -> HashMap; - fn check_addr_accepted(&self, addr: &SocketAddr) -> bool; - fn get_max_out_connections(&self) -> usize; + fn get_peers_connected( + &self, + ) -> HashMap)>; fn get_nb_out_connections(&self) -> usize; fn get_nb_in_connections(&self) -> usize; fn shutdown_connection(&mut self, peer_id: &PeerId); @@ -68,7 +68,9 @@ impl ActiveConnectionsTrait for SharedActiveConnections { self.read().connections.keys().cloned().collect() } - fn get_peers_connected(&self) -> HashMap { + fn get_peers_connected( + &self, + ) -> HashMap)> { self.read() .connections .iter() @@ -78,20 +80,13 @@ impl ActiveConnectionsTrait for SharedActiveConnections { ( *connection.endpoint.get_target_addr(), connection.connection_type, + connection.category_name.clone(), ), ) }) .collect() } - fn check_addr_accepted(&self, addr: &SocketAddr) -> bool { - self.read().check_addr_accepted(addr) - } - - fn get_max_out_connections(&self) -> usize { - self.read().max_out_connections - } - fn get_nb_out_connections(&self) -> usize { self.read().nb_out_connections }