diff --git a/Cargo.lock b/Cargo.lock index c90cff73674..89d3b70ba8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2661,7 +2661,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.5" +version = "0.44.6" dependencies = [ "arrayvec", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 9e662abb630..73ae23d70ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.3" } -libp2p-kad = { version = "0.44.5", path = "protocols/kad" } +libp2p-kad = { version = "0.44.6", path = "protocols/kad" } libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } diff --git a/examples/distributed-key-value-store/src/main.rs b/examples/distributed-key-value-store/src/main.rs index ce0998a9ac5..d1eec92203c 100644 --- a/examples/distributed-key-value-store/src/main.rs +++ b/examples/distributed-key-value-store/src/main.rs @@ -23,12 +23,9 @@ use async_std::io; use futures::{prelude::*, select}; use libp2p::core::upgrade::Version; +use libp2p::kad; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::Mode; -use libp2p::kad::{ - record::Key, AddProviderOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, PeerRecord, - PutRecordOk, QueryResult, Quorum, Record, -}; use libp2p::{ identity, mdns, noise, swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, @@ -54,18 +51,18 @@ async fn main() -> Result<(), Box> { #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "MyBehaviourEvent")] struct MyBehaviour { - kademlia: Kademlia, + kademlia: kad::Behaviour, mdns: mdns::async_io::Behaviour, } #[allow(clippy::large_enum_variant)] enum MyBehaviourEvent { - Kademlia(KademliaEvent), + Kademlia(kad::Event), Mdns(mdns::Event), } - impl From for MyBehaviourEvent { - fn from(event: KademliaEvent) -> Self { + impl From for MyBehaviourEvent { + fn from(event: kad::Event) -> Self { MyBehaviourEvent::Kademlia(event) } } @@ -80,7 +77,7 @@ async fn main() -> Result<(), Box> { let mut swarm = { // Create a Kademlia behaviour. let store = MemoryStore::new(local_peer_id); - let kademlia = Kademlia::new(local_peer_id, store); + let kademlia = kad::Behaviour::new(local_peer_id, store); let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?; let behaviour = MyBehaviour { kademlia, mdns }; SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build() @@ -107,9 +104,9 @@ async fn main() -> Result<(), Box> { swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); } } - SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => { match result { - QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => { + kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => { for peer in providers { println!( "Peer {peer:?} provides key {:?}", @@ -117,12 +114,12 @@ async fn main() -> Result<(), Box> { ); } } - QueryResult::GetProviders(Err(err)) => { + kad::QueryResult::GetProviders(Err(err)) => { eprintln!("Failed to get providers: {err:?}"); } - QueryResult::GetRecord(Ok( - GetRecordOk::FoundRecord(PeerRecord { - record: Record { key, value, .. }, + kad::QueryResult::GetRecord(Ok( + kad::GetRecordOk::FoundRecord(kad::PeerRecord { + record: kad::Record { key, value, .. }, .. }) )) => { @@ -132,26 +129,26 @@ async fn main() -> Result<(), Box> { std::str::from_utf8(&value).unwrap(), ); } - QueryResult::GetRecord(Ok(_)) => {} - QueryResult::GetRecord(Err(err)) => { + kad::QueryResult::GetRecord(Ok(_)) => {} + kad::QueryResult::GetRecord(Err(err)) => { eprintln!("Failed to get record: {err:?}"); } - QueryResult::PutRecord(Ok(PutRecordOk { key })) => { + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { println!( "Successfully put record {:?}", std::str::from_utf8(key.as_ref()).unwrap() ); } - QueryResult::PutRecord(Err(err)) => { + kad::QueryResult::PutRecord(Err(err)) => { eprintln!("Failed to put record: {err:?}"); } - QueryResult::StartProviding(Ok(AddProviderOk { key })) => { + kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => { println!( "Successfully put provider record {:?}", std::str::from_utf8(key.as_ref()).unwrap() ); } - QueryResult::StartProviding(Err(err)) => { + kad::QueryResult::StartProviding(Err(err)) => { eprintln!("Failed to put provider record: {err:?}"); } _ => {} @@ -163,14 +160,14 @@ async fn main() -> Result<(), Box> { } } -fn handle_input_line(kademlia: &mut Kademlia, line: String) { +fn handle_input_line(kademlia: &mut kad::Behaviour, line: String) { let mut args = line.split(' '); match args.next() { Some("GET") => { let key = { match args.next() { - Some(key) => Key::new(&key), + Some(key) => kad::record::Key::new(&key), None => { eprintln!("Expected key"); return; @@ -182,7 +179,7 @@ fn handle_input_line(kademlia: &mut Kademlia, line: String) { Some("GET_PROVIDERS") => { let key = { match args.next() { - Some(key) => Key::new(&key), + Some(key) => kad::record::Key::new(&key), None => { eprintln!("Expected key"); return; @@ -194,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia, line: String) { Some("PUT") => { let key = { match args.next() { - Some(key) => Key::new(&key), + Some(key) => kad::record::Key::new(&key), None => { eprintln!("Expected key"); return; @@ -210,20 +207,20 @@ fn handle_input_line(kademlia: &mut Kademlia, line: String) { } } }; - let record = Record { + let record = kad::Record { key, value, publisher: None, expires: None, }; kademlia - .put_record(record, Quorum::One) + .put_record(record, kad::Quorum::One) .expect("Failed to store record locally."); } Some("PUT_PROVIDER") => { let key = { match args.next() { - Some(key) => Key::new(&key), + Some(key) => kad::record::Key::new(&key), None => { eprintln!("Expected key"); return; diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 7ddd0afb0cc..675f69a03d8 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -5,11 +5,7 @@ use futures::prelude::*; use libp2p::{ core::Multiaddr, - identity, - kad::{ - self, record::store::MemoryStore, GetProvidersOk, Kademlia, KademliaEvent, QueryId, - QueryResult, - }, + identity, kad, multiaddr::Protocol, noise, request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, @@ -56,7 +52,7 @@ pub(crate) async fn new( let mut swarm = SwarmBuilder::with_async_std_executor( transport, ComposedBehaviour { - kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), + kademlia: kad::Behaviour::new(peer_id, kad::record::store::MemoryStore::new(peer_id)), request_response: request_response::cbor::Behaviour::new( [( StreamProtocol::new("/file-exchange/1"), @@ -179,8 +175,8 @@ pub(crate) struct EventLoop { command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, pending_dial: HashMap>>>, - pending_start_providing: HashMap>, - pending_get_providers: HashMap>>, + pending_start_providing: HashMap>, + pending_get_providers: HashMap>>, pending_request_file: HashMap, Box>>>, } @@ -221,9 +217,9 @@ impl EventLoop { ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { + kad::Event::OutboundQueryProgressed { id, - result: QueryResult::StartProviding(_), + result: kad::QueryResult::StartProviding(_), .. }, )) => { @@ -234,11 +230,12 @@ impl EventLoop { let _ = sender.send(()); } SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { + kad::Event::OutboundQueryProgressed { id, result: - QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { - providers, .. + kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { + providers, + .. })), .. }, @@ -256,11 +253,11 @@ impl EventLoop { } } SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { + kad::Event::OutboundQueryProgressed { result: - QueryResult::GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { - .. - })), + kad::QueryResult::GetProviders(Ok( + kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. }, + )), .. }, )) => {} @@ -412,13 +409,13 @@ impl EventLoop { #[behaviour(to_swarm = "ComposedEvent")] struct ComposedBehaviour { request_response: request_response::cbor::Behaviour, - kademlia: Kademlia, + kademlia: kad::Behaviour, } #[derive(Debug)] enum ComposedEvent { RequestResponse(request_response::Event), - Kademlia(KademliaEvent), + Kademlia(kad::Event), } impl From> for ComposedEvent { @@ -427,8 +424,8 @@ impl From> for ComposedEvent } } -impl From for ComposedEvent { - fn from(event: KademliaEvent) -> Self { +impl From for ComposedEvent { + fn from(event: kad::Event) -> Self { ComposedEvent::Kademlia(event) } } diff --git a/examples/ipfs-kad/src/main.rs b/examples/ipfs-kad/src/main.rs index 1d0ed0a3b00..6897cf63d4e 100644 --- a/examples/ipfs-kad/src/main.rs +++ b/examples/ipfs-kad/src/main.rs @@ -21,8 +21,8 @@ #![doc = include_str!("../README.md")] use futures::StreamExt; +use libp2p::kad; use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::{GetClosestPeersError, Kademlia, KademliaConfig, KademliaEvent, QueryResult}; use libp2p::{ development_transport, identity, swarm::{SwarmBuilder, SwarmEvent}, @@ -51,10 +51,10 @@ async fn main() -> Result<(), Box> { // Create a swarm to manage peers and events. let mut swarm = { // Create a Kademlia behaviour. - let mut cfg = KademliaConfig::default(); + let mut cfg = kad::Config::default(); cfg.set_query_timeout(Duration::from_secs(5 * 60)); let store = MemoryStore::new(local_peer_id); - let mut behaviour = Kademlia::with_config(local_peer_id, store, cfg); + let mut behaviour = kad::Behaviour::with_config(local_peer_id, store, cfg); // Add the bootnodes to the local routing table. `libp2p-dns` built // into the `transport` resolves the `dnsaddr` when Kademlia tries @@ -78,8 +78,8 @@ async fn main() -> Result<(), Box> { loop { let event = swarm.select_next_some().await; - if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { - result: QueryResult::GetClosestPeers(result), + if let SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed { + result: kad::QueryResult::GetClosestPeers(result), .. }) = event { @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box> { println!("Query finished with no closest peers.") } } - Err(GetClosestPeersError::Timeout { peers, .. }) => { + Err(kad::GetClosestPeersError::Timeout { peers, .. }) => { if !peers.is_empty() { println!("Query timed out with closest peers: {peers:#?}") } else { diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index e4170dd9228..bd5a6526737 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -159,10 +159,10 @@ impl Metrics { } } -impl super::Recorder for Metrics { - fn record(&self, event: &libp2p_kad::KademliaEvent) { +impl super::Recorder for Metrics { + fn record(&self, event: &libp2p_kad::Event) { match event { - libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => { + libp2p_kad::Event::OutboundQueryProgressed { result, stats, .. } => { self.query_result_num_requests .get_or_create(&result.into()) .observe(stats.num_requests().into()); @@ -217,7 +217,7 @@ impl super::Recorder for Metrics { _ => {} } } - libp2p_kad::KademliaEvent::RoutingUpdated { + libp2p_kad::Event::RoutingUpdated { is_new_peer, old_peer, bucket_range: (low, _high), @@ -250,7 +250,7 @@ impl super::Recorder for Metrics { } } - libp2p_kad::KademliaEvent::InboundRequest { request } => { + libp2p_kad::Event::InboundRequest { request } => { self.inbound_requests.get_or_create(&request.into()).inc(); } _ => {} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index fd977537d96..2132dd5d7fb 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -118,8 +118,8 @@ impl Recorder for Metrics { } #[cfg(feature = "kad")] -impl Recorder for Metrics { - fn record(&self, event: &libp2p_kad::KademliaEvent) { +impl Recorder for Metrics { + fn record(&self, event: &libp2p_kad::Event) { self.kad.record(event) } } diff --git a/misc/server/src/behaviour.rs b/misc/server/src/behaviour.rs index b21d58862d7..2f7741b9317 100644 --- a/misc/server/src/behaviour.rs +++ b/misc/server/src/behaviour.rs @@ -1,6 +1,6 @@ use libp2p::autonat; use libp2p::identify; -use libp2p::kad::{record::store::MemoryStore, Kademlia, KademliaConfig}; +use libp2p::kad; use libp2p::ping; use libp2p::relay; use libp2p::swarm::behaviour::toggle::Toggle; @@ -20,7 +20,7 @@ pub(crate) struct Behaviour { relay: relay::Behaviour, ping: ping::Behaviour, identify: identify::Behaviour, - pub(crate) kademlia: Toggle>, + pub(crate) kademlia: Toggle>, autonat: Toggle, } @@ -31,15 +31,15 @@ impl Behaviour { enable_autonat: bool, ) -> Self { let kademlia = if enable_kademlia { - let mut kademlia_config = KademliaConfig::default(); + let mut kademlia_config = kad::Config::default(); // Instantly remove records and provider records. // // TODO: Replace hack with option to disable both. kademlia_config.set_record_ttl(Some(Duration::from_secs(0))); kademlia_config.set_provider_record_ttl(Some(Duration::from_secs(0))); - let mut kademlia = Kademlia::with_config( + let mut kademlia = kad::Behaviour::with_config( pub_key.to_peer_id(), - MemoryStore::new(pub_key.to_peer_id()), + kad::record::store::MemoryStore::new(pub_key.to_peer_id()), kademlia_config, ); let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io").unwrap(); diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index f89ca309a3c..7e90c78371f 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.44.6 - unreleased +- Rename `Kademlia` symbols to follow naming convention. + See [PR 4547]. + +[PR 4547]: https://github.com/libp2p/rust-libp2p/pull/4547 + ## 0.44.5 - Migrate to `quick-protobuf-codec` crate for codec logic. See [PR 4501]. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index b4c3c597da2..5b2a74080b3 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = { workspace = true } description = "Kademlia protocol for libp2p" -version = "0.44.5" +version = "0.44.6" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index f74c39a07c3..262962cbd1f 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -23,10 +23,10 @@ mod test; use crate::addresses::Addresses; -use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; +use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId}; use crate::jobs::*; use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus}; -use crate::protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig}; +use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig}; use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState}; use crate::record_priv::{ self, @@ -59,20 +59,20 @@ use thiserror::Error; pub use crate::query::QueryStats; -/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p +/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p /// Kademlia protocol. -pub struct Kademlia { +pub struct Behaviour { /// The Kademlia routing table. kbuckets: KBucketsTable, Addresses>, /// The k-bucket insertion strategy. - kbucket_inserts: KademliaBucketInserts, + kbucket_inserts: BucketInserts, /// Configuration of the wire protocol. - protocol_config: KademliaProtocolConfig, + protocol_config: ProtocolConfig, /// Configuration of [`RecordStore`] filtering. - record_filtering: KademliaStoreInserts, + record_filtering: StoreInserts, /// The currently active (i.e. in-progress) queries. queries: QueryPool, @@ -100,7 +100,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque>, + queued_events: VecDeque>, listen_addresses: ListenAddresses, @@ -108,8 +108,8 @@ pub struct Kademlia { connections: HashMap, - /// See [`KademliaConfig::caching`]. - caching: KademliaCaching, + /// See [`Config::caching`]. + caching: Caching, local_peer_id: PeerId, @@ -125,7 +125,7 @@ pub struct Kademlia { /// and their addresses into the k-buckets of the Kademlia /// routing table. #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum KademliaBucketInserts { +pub enum BucketInserts { /// Whenever a connection to a peer is established as a /// result of a dialing attempt and that peer is not yet /// in the routing table, it is inserted as long as there @@ -135,10 +135,10 @@ pub enum KademliaBucketInserts { /// disconnected peer is evicted from the bucket. OnConnected, /// New peers and addresses are only added to the routing table via - /// explicit calls to [`Kademlia::add_address`]. + /// explicit calls to [`Behaviour::add_address`]. /// /// > **Note**: Even though peers can only get into the - /// > routing table as a result of [`Kademlia::add_address`], + /// > routing table as a result of [`Behaviour::add_address`], /// > routing table entries are still updated as peers /// > connect and disconnect (i.e. the order of the entries /// > as well as the network addresses). @@ -153,63 +153,63 @@ pub enum KademliaBucketInserts { /// /// [`Key`]: crate::record_priv::Key #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum KademliaStoreInserts { +pub enum StoreInserts { /// Whenever a (provider) record is received, /// the record is forwarded immediately to the [`RecordStore`]. Unfiltered, /// Whenever a (provider) record is received, an event is emitted. - /// Provider records generate a [`InboundRequest::AddProvider`] under [`KademliaEvent::InboundRequest`], - /// normal records generate a [`InboundRequest::PutRecord`] under [`KademliaEvent::InboundRequest`]. + /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`], + /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`]. /// /// When deemed valid, a (provider) record needs to be explicitly stored in /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`], /// whichever is applicable. A mutable reference to the [`RecordStore`] can - /// be retrieved via [`Kademlia::store_mut`]. + /// be retrieved via [`Behaviour::store_mut`]. FilterBoth, } /// The configuration for the `Kademlia` behaviour. /// -/// The configuration is consumed by [`Kademlia::new`]. +/// The configuration is consumed by [`Behaviour::new`]. #[derive(Debug, Clone)] -pub struct KademliaConfig { +pub struct Config { kbucket_pending_timeout: Duration, query_config: QueryConfig, - protocol_config: KademliaProtocolConfig, + protocol_config: ProtocolConfig, record_ttl: Option, record_replication_interval: Option, record_publication_interval: Option, - record_filtering: KademliaStoreInserts, + record_filtering: StoreInserts, provider_record_ttl: Option, provider_publication_interval: Option, connection_idle_timeout: Duration, - kbucket_inserts: KademliaBucketInserts, - caching: KademliaCaching, + kbucket_inserts: BucketInserts, + caching: Caching, } -impl Default for KademliaConfig { +impl Default for Config { fn default() -> Self { - KademliaConfig { + Config { kbucket_pending_timeout: Duration::from_secs(60), query_config: QueryConfig::default(), protocol_config: Default::default(), record_ttl: Some(Duration::from_secs(36 * 60 * 60)), record_replication_interval: Some(Duration::from_secs(60 * 60)), record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)), - record_filtering: KademliaStoreInserts::Unfiltered, + record_filtering: StoreInserts::Unfiltered, provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)), provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), connection_idle_timeout: Duration::from_secs(10), - kbucket_inserts: KademliaBucketInserts::OnConnected, - caching: KademliaCaching::Enabled { max_peers: 1 }, + kbucket_inserts: BucketInserts::OnConnected, + caching: Caching::Enabled { max_peers: 1 }, } } } /// The configuration for Kademlia "write-back" caching after successful -/// lookups via [`Kademlia::get_record`]. +/// lookups via [`Behaviour::get_record`]. #[derive(Debug, Clone)] -pub enum KademliaCaching { +pub enum Caching { /// Caching is disabled and the peers closest to records being looked up /// that do not return a record are not tracked, i.e. /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty. @@ -217,11 +217,11 @@ pub enum KademliaCaching { /// Up to `max_peers` peers not returning a record that are closest to the key /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`]. /// The write-back operation must be performed explicitly, if - /// desired and after choosing a record from the results, via [`Kademlia::put_record_to`]. + /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`]. Enabled { max_peers: u16 }, } -impl KademliaConfig { +impl Config { /// Sets custom protocol names. /// /// Kademlia nodes only communicate with other nodes using the same protocol @@ -266,7 +266,7 @@ impl KademliaConfig { /// This only controls the level of parallelism of an iterative query, not /// the level of parallelism of a query to a fixed set of peers. /// - /// When used with [`KademliaConfig::disjoint_query_paths`] it equals + /// When used with [`Config::disjoint_query_paths`] it equals /// the amount of disjoint paths used. pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self { self.query_config.parallelism = parallelism; @@ -302,9 +302,9 @@ impl KademliaConfig { /// Sets whether or not records should be filtered before being stored. /// - /// See [`KademliaStoreInserts`] for the different values. - /// Defaults to [`KademliaStoreInserts::Unfiltered`]. - pub fn set_record_filtering(&mut self, filtering: KademliaStoreInserts) -> &mut Self { + /// See [`StoreInserts`] for the different values. + /// Defaults to [`StoreInserts::Unfiltered`]. + pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self { self.record_filtering = filtering; self } @@ -387,24 +387,24 @@ impl KademliaConfig { } /// Sets the k-bucket insertion strategy for the Kademlia routing table. - pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self { + pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self { self.kbucket_inserts = inserts; self } - /// Sets the [`KademliaCaching`] strategy to use for successful lookups. + /// Sets the [`Caching`] strategy to use for successful lookups. /// - /// The default is [`KademliaCaching::Enabled`] with a `max_peers` of 1. + /// The default is [`Caching::Enabled`] with a `max_peers` of 1. /// Hence, with default settings and a lookup quorum of 1, a successful lookup /// will result in the record being cached at the closest node to the key that /// did not return the record, i.e. the standard Kademlia behaviour. - pub fn set_caching(&mut self, c: KademliaCaching) -> &mut Self { + pub fn set_caching(&mut self, c: Caching) -> &mut Self { self.caching = c; self } } -impl Kademlia +impl Behaviour where TStore: RecordStore + Send + 'static, { @@ -419,7 +419,7 @@ where } /// Creates a new `Kademlia` network behaviour with the given configuration. - pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self { + pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self { let local_key = kbucket::Key::from(id); let put_record_job = config @@ -438,7 +438,7 @@ where .provider_publication_interval .map(AddProviderJob::new); - Kademlia { + Behaviour { store, caching: config.caching, kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout), @@ -523,14 +523,14 @@ where /// in the DHT. /// /// If the routing table has been updated as a result of this operation, - /// a [`KademliaEvent::RoutingUpdated`] event is emitted. + /// a [`Event::RoutingUpdated`] event is emitted. pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate { let key = kbucket::Key::from(*peer); match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, _) => { if entry.value().insert(address) { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::RoutingUpdated { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated { peer: *peer, is_new_peer: false, addresses: entry.value().clone(), @@ -540,8 +540,7 @@ where .bucket(&key) .map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."), - }, - )) + })) } RoutingUpdate::Success } @@ -559,7 +558,7 @@ where match entry.insert(addresses.clone(), status) { kbucket::InsertResult::Inserted => { self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::RoutingUpdated { + Event::RoutingUpdated { peer: *peer, is_new_peer: true, addresses, @@ -665,7 +664,7 @@ where /// Initiates an iterative query for the closest peers to the given key. /// /// The result of the query is delivered in a - /// [`KademliaEvent::OutboundQueryProgressed{QueryResult::GetClosestPeers}`]. + /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`]. pub fn get_closest_peers(&mut self, key: K) -> QueryId where K: Into> + Into> + Clone, @@ -692,7 +691,7 @@ where /// Performs a lookup for a record in the DHT. /// /// The result of this operation is delivered in a - /// [`KademliaEvent::OutboundQueryProgressed{QueryResult::GetRecord}`]. + /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`]. pub fn get_record(&mut self, key: record_priv::Key) -> QueryId { let record = if let Some(record) = self.store.get(&key) { if record.is_expired(Instant::now()) { @@ -734,14 +733,13 @@ where let stats = QueryStats::empty(); if let Some(record) = record { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed { id, result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), step, stats, - }, - )); + })); } id @@ -753,12 +751,12 @@ where /// Returns `Ok` if a record has been stored locally, providing the /// `QueryId` of the initial query that replicates the record in the DHT. /// The result of the query is eventually reported as a - /// [`KademliaEvent::OutboundQueryProgressed{QueryResult::PutRecord}`]. + /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`]. /// /// The record is always stored locally with the given expiration. If the record's /// expiration is `None`, the common case, it does not expire in local storage /// but is still replicated with the configured record TTL. To remove the record - /// locally and stop it from being re-published in the DHT, see [`Kademlia::remove_record`]. + /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`]. /// /// After the initial publication of the record, it is subject to (re-)replication /// and (re-)publication as per the configured intervals. Periodic (re-)publication @@ -869,13 +867,13 @@ where /// /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is - /// reported via [`KademliaEvent::OutboundQueryProgressed{QueryResult::Bootstrap}`] events, + /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events, /// with one such event per bootstrapping query. /// /// Returns `Err` if bootstrapping is impossible due an empty routing table. /// /// > **Note**: Bootstrapping requires at least one node of the DHT to be known. - /// > See [`Kademlia::add_address`]. + /// > See [`Behaviour::add_address`]. pub fn bootstrap(&mut self) -> Result { let local_key = self.kbuckets.local_key().clone(); let info = QueryInfo::Bootstrap { @@ -904,16 +902,16 @@ where /// The publication of the provider records is periodically repeated as per the /// configured interval, to renew the expiry and account for changes to the DHT /// topology. A provider record may be removed from local storage and - /// thus no longer re-published by calling [`Kademlia::stop_providing`]. + /// thus no longer re-published by calling [`Behaviour::stop_providing`]. /// /// In contrast to the standard Kademlia push-based model for content distribution - /// implemented by [`Kademlia::put_record`], the provider API implements a + /// implemented by [`Behaviour::put_record`], the provider API implements a /// pull-based model that may be used in addition or as an alternative. /// The means by which the actual value is obtained from a provider is out of scope /// of the libp2p Kademlia provider API. /// /// The results of the (repeated) provider announcements sent by this node are - /// reported via [`KademliaEvent::OutboundQueryProgressed{QueryResult::StartProviding}`]. + /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`]. pub fn start_providing(&mut self, key: record_priv::Key) -> Result { // Note: We store our own provider records locally without local addresses // to avoid redundant storage and outdated addresses. Instead these are @@ -950,7 +948,7 @@ where /// Performs a lookup for providers of a value to the given key. /// /// The result of this operation is delivered in a - /// reported via [`KademliaEvent::OutboundQueryProgressed{QueryResult::GetProviders}`]. + /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`]. pub fn get_providers(&mut self, key: record_priv::Key) -> QueryId { let providers: HashSet<_> = self .store @@ -981,8 +979,8 @@ where let stats = QueryStats::empty(); if !providers.is_empty() { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed { id, result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, @@ -990,8 +988,7 @@ where })), step, stats, - }, - )); + })); } id } @@ -1040,7 +1037,7 @@ where .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(*conn_id), - event: KademliaHandlerIn::ReconfigureMode { + event: HandlerIn::ReconfigureMode { new_mode: self.mode, }, }), @@ -1142,9 +1139,9 @@ where let node_id = p.provider; let multiaddrs = p.addresses; let connection_ty = if connected.contains(&node_id) { - KadConnectionType::Connected + ConnectionType::Connected } else { - KadConnectionType::NotConnected + ConnectionType::NotConnected }; if multiaddrs.is_empty() { // The provider is either the local node and we fill in @@ -1228,7 +1225,7 @@ where if let Some(address) = address { if entry.value().insert(address) { self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::RoutingUpdated { + Event::RoutingUpdated { peer, is_new_peer: false, addresses: entry.value().clone(), @@ -1260,20 +1257,21 @@ where } match (address, self.kbucket_inserts) { (None, _) => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::UnroutablePeer { peer }, - )); + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer })); } - (Some(a), KademliaBucketInserts::Manual) => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address: a }, - )); + (Some(a), BucketInserts::Manual) => { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer { + peer, + address: a, + })); } - (Some(a), KademliaBucketInserts::OnConnected) => { + (Some(a), BucketInserts::OnConnected) => { let addresses = Addresses::new(a); match entry.insert(addresses.clone(), new_status) { kbucket::InsertResult::Inserted => { - let event = KademliaEvent::RoutingUpdated { + let event = Event::RoutingUpdated { peer, is_new_peer: true, addresses, @@ -1290,19 +1288,19 @@ where debug!("Bucket full. Peer not added to routing table: {}", peer); let address = addresses.first().clone(); self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address }, + Event::RoutablePeer { peer, address }, )); } kbucket::InsertResult::Pending { disconnected } => { let address = addresses.first().clone(); self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::PendingRoutablePeer { peer, address }, + Event::PendingRoutablePeer { peer, address }, )); // `disconnected` might already be in the process of re-connecting. // In other words `disconnected` might have already re-connected but // is not yet confirmed to support the Kademlia protocol via - // [`KademliaHandlerEvent::ProtocolConfirmed`]. + // [`HandlerEvent::ProtocolConfirmed`]. // // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { @@ -1322,7 +1320,7 @@ where } /// Handles a finished (i.e. successful) query. - fn query_finished(&mut self, q: Query) -> Option { + fn query_finished(&mut self, q: Query) -> Option { let query_id = q.id(); log::trace!("Query {:?} finished.", query_id); let result = q.into_result(); @@ -1387,7 +1385,7 @@ where step.last = true; }; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Ok(BootstrapOk { @@ -1401,7 +1399,7 @@ where QueryInfo::GetClosestPeers { key, mut step } => { step.last = true; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { @@ -1415,7 +1413,7 @@ where QueryInfo::GetProviders { mut step, .. } => { step.last = true; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetProviders(Ok( @@ -1456,13 +1454,13 @@ where .. }, } => match context { - AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryProgressed { + AddProviderContext::Publish => Some(Event::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::StartProviding(Ok(AddProviderOk { key })), step: ProgressStep::first_and_last(), }), - AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { + AddProviderContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })), @@ -1486,7 +1484,7 @@ where closest_peers: result.peers.collect(), }) }; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetRecord(results), @@ -1537,14 +1535,14 @@ where }; match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::PutRecord(mk_result(record.key)), step: ProgressStep::first_and_last(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { + PutRecordContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishRecord(mk_result(record.key)), @@ -1560,7 +1558,7 @@ where } /// Handles a query that timed out. - fn query_timeout(&mut self, query: Query) -> Option { + fn query_timeout(&mut self, query: Query) -> Option { let query_id = query.id(); log::trace!("Query {:?} timed out.", query_id); let result = query.into_result(); @@ -1589,7 +1587,7 @@ where step.last = true; } - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Err(BootstrapError::Timeout { @@ -1601,13 +1599,13 @@ where } QueryInfo::AddProvider { context, key, .. } => Some(match context { - AddProviderContext::Publish => KademliaEvent::OutboundQueryProgressed { + AddProviderContext::Publish => Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })), step: ProgressStep::first_and_last(), }, - AddProviderContext::Republish => KademliaEvent::OutboundQueryProgressed { + AddProviderContext::Republish => Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })), @@ -1618,7 +1616,7 @@ where QueryInfo::GetClosestPeers { key, mut step } => { step.last = true; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { @@ -1645,14 +1643,14 @@ where }); match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::PutRecord(err), step: ProgressStep::first_and_last(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { + PutRecordContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishRecord(err), @@ -1674,7 +1672,7 @@ where QueryInfo::GetRecord { key, mut step, .. } => { step.last = true; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })), @@ -1685,7 +1683,7 @@ where QueryInfo::GetProviders { key, mut step, .. } => { step.last = true; - Some(KademliaEvent::OutboundQueryProgressed { + Some(Event::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { @@ -1703,7 +1701,7 @@ where &mut self, source: PeerId, connection: ConnectionId, - request_id: KademliaRequestId, + request_id: RequestId, mut record: Record, ) { if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { @@ -1713,7 +1711,7 @@ where self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { + event: HandlerIn::PutRecordRes { key: record.key, value: record.value, request_id, @@ -1762,7 +1760,7 @@ where // requirement to send back the value in the response, although this // is a waste of resources. match self.record_filtering { - KademliaStoreInserts::Unfiltered => match self.store.put(record.clone()) { + StoreInserts::Unfiltered => match self.store.put(record.clone()) { Ok(()) => { debug!( "Record stored: {:?}; {} bytes", @@ -1770,7 +1768,7 @@ where record.value.len() ); self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + Event::InboundRequest { request: InboundRequest::PutRecord { source, connection, @@ -1784,37 +1782,36 @@ where self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::Reset(request_id), + event: HandlerIn::Reset(request_id), }); return; } }, - KademliaStoreInserts::FilterBoth => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + StoreInserts::FilterBoth => { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::PutRecord { source, connection, record: Some(record.clone()), }, - }, - )); + })); } } } - // The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the + // The remote receives a [`HandlerIn::PutRecordRes`] even in the // case where the record is discarded due to being expired. Given that - // the remote sent the local node a [`KademliaHandlerEvent::PutRecord`] + // the remote sent the local node a [`HandlerEvent::PutRecord`] // request, the remote perceives the local node as one node among the k // closest nodes to the target. In addition returning - // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal + // [`HandlerIn::PutRecordRes`] does not reveal any internal // information to a possibly malicious remote node. self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { + event: HandlerIn::PutRecordRes { key: record.key, value: record.value, request_id, @@ -1832,26 +1829,24 @@ where addresses: provider.multiaddrs, }; match self.record_filtering { - KademliaStoreInserts::Unfiltered => { + StoreInserts::Unfiltered => { if let Err(e) = self.store.add_provider(record) { info!("Provider record not stored: {:?}", e); return; } - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::AddProvider { record: None }, - }, - )); + })); } - KademliaStoreInserts::FilterBoth => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + StoreInserts::FilterBoth => { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::AddProvider { record: Some(record), }, - }, - )); + })); } } } @@ -1880,7 +1875,7 @@ where // and is unreachable in the context of another peer pending insertion // into the same bucket. This is handled transparently by the // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending` - // within `Kademlia::poll`. + // within `Behaviour::poll`. debug!( "Last remaining address '{}' of peer '{}' is unreachable.", address, peer_id, @@ -1910,7 +1905,7 @@ where // When a connection is established, we don't know yet whether the // remote supports the configured protocol name. Only once a connection - // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we + // handler reports [`HandlerEvent::ProtocolConfirmed`] do we // update the local routing table. // Peer's first connection. @@ -2055,12 +2050,12 @@ fn exp_decrease(ttl: Duration, exp: u32) -> Duration { Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0)) } -impl NetworkBehaviour for Kademlia +impl NetworkBehaviour for Behaviour where TStore: RecordStore + Send + 'static, { - type ConnectionHandler = KademliaHandler; - type ToSwarm = KademliaEvent; + type ConnectionHandler = Handler; + type ToSwarm = Event; fn handle_established_inbound_connection( &mut self, @@ -2075,7 +2070,7 @@ where }; self.connections.insert(connection_id, peer); - Ok(KademliaHandler::new( + Ok(Handler::new( self.protocol_config.clone(), self.connection_idle_timeout, connected_point, @@ -2098,7 +2093,7 @@ where }; self.connections.insert(connection_id, peer); - Ok(KademliaHandler::new( + Ok(Handler::new( self.protocol_config.clone(), self.connection_idle_timeout, connected_point, @@ -2149,7 +2144,7 @@ where event: THandlerOutEvent, ) { match event { - KademliaHandlerEvent::ProtocolConfirmed { endpoint } => { + HandlerEvent::ProtocolConfirmed { endpoint } => { debug_assert!(self.connected_peers.contains(&source)); // The remote's address can only be put into the routing table, // and thus shared with other nodes, if the local node is the dialer, @@ -2163,7 +2158,7 @@ where self.connection_updated(source, address, NodeStatus::Connected); } - KademliaHandlerEvent::ProtocolNotSupported { endpoint } => { + HandlerEvent::ProtocolNotSupported { endpoint } => { let address = match endpoint { ConnectedPoint::Dialer { address, .. } => Some(address), ConnectedPoint::Listener { .. } => None, @@ -2171,51 +2166,49 @@ where self.connection_updated(source, address, NodeStatus::Disconnected); } - KademliaHandlerEvent::FindNodeReq { key, request_id } => { + HandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::FindNode { num_closer_peers: closer_peers.len(), }, - }, - )); + })); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::FindNodeRes { + event: HandlerIn::FindNodeRes { closer_peers, request_id, }, }); } - KademliaHandlerEvent::FindNodeRes { + HandlerEvent::FindNodeRes { closer_peers, query_id, } => { self.discovered(&query_id, &source, closer_peers.iter()); } - KademliaHandlerEvent::GetProvidersReq { key, request_id } => { + HandlerEvent::GetProvidersReq { key, request_id } => { let provider_peers = self.provider_peers(&key, &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::GetProvider { num_closer_peers: closer_peers.len(), num_provider_peers: provider_peers.len(), }, - }, - )); + })); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetProvidersRes { + event: HandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, @@ -2223,7 +2216,7 @@ where }); } - KademliaHandlerEvent::GetProvidersRes { + HandlerEvent::GetProvidersRes { closer_peers, provider_peers, query_id, @@ -2243,7 +2236,7 @@ where let providers = provider_peers.iter().map(|p| p.node_id).collect(); self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id: query_id, result: QueryResult::GetProviders(Ok( GetProvidersOk::FoundProviders { @@ -2260,7 +2253,7 @@ where } } - KademliaHandlerEvent::QueryError { query_id, error } => { + HandlerEvent::QueryError { query_id, error } => { log::debug!( "Request to {:?} in query {:?} failed with {:?}", source, @@ -2274,7 +2267,7 @@ where } } - KademliaHandlerEvent::AddProvider { key, provider } => { + HandlerEvent::AddProvider { key, provider } => { // Only accept a provider record from a legitimate peer. if provider.node_id != source { return; @@ -2283,7 +2276,7 @@ where self.provider_received(key, provider); } - KademliaHandlerEvent::GetRecord { key, request_id } => { + HandlerEvent::GetRecord { key, request_id } => { // Lookup the record locally. let record = match self.store.get(&key) { Some(record) => { @@ -2299,19 +2292,18 @@ where let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::InboundRequest { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { request: InboundRequest::GetRecord { num_closer_peers: closer_peers.len(), present_locally: record.is_some(), }, - }, - )); + })); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetRecordRes { + event: HandlerIn::GetRecordRes { record, closer_peers, request_id, @@ -2319,7 +2311,7 @@ where }); } - KademliaHandlerEvent::GetRecordRes { + HandlerEvent::GetRecordRes { record, closer_peers, query_id, @@ -2341,7 +2333,7 @@ where }; self.queued_events.push_back(ToSwarm::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id: query_id, result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord( record, @@ -2354,7 +2346,7 @@ where *step = step.next(); } else { log::trace!("Record with key {:?} not found at {}", key, source); - if let KademliaCaching::Enabled { max_peers } = self.caching { + if let Caching::Enabled { max_peers } = self.caching { let source_key = kbucket::Key::from(source); let target_key = kbucket::Key::from(key.clone()); let distance = source_key.distance(&target_key); @@ -2374,11 +2366,11 @@ where self.discovered(&query_id, &source, closer_peers.iter()); } - KademliaHandlerEvent::PutRecord { record, request_id } => { + HandlerEvent::PutRecord { record, request_id } => { self.record_received(source, connection, request_id, record); } - KademliaHandlerEvent::PutRecordRes { query_id, .. } => { + HandlerEvent::PutRecordRes { query_id, .. } => { if let Some(query) = self.queries.get_mut(&query_id) { query.on_success(&source, vec![]); if let QueryInfo::PutRecord { @@ -2462,7 +2454,7 @@ where // Drain applied pending entries from the routing table. if let Some(entry) = self.kbuckets.take_applied_pending() { let kbucket::Node { key, value } = entry.inserted; - let event = KademliaEvent::RoutingUpdated { + let event = Event::RoutingUpdated { bucket_range: self .kbuckets .bucket(&key) @@ -2605,7 +2597,7 @@ pub struct PeerRecord { /// See [`NetworkBehaviour::poll`]. #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] -pub enum KademliaEvent { +pub enum Event { /// An inbound request has been received and handled. // // Note on the difference between 'request' and 'query': A request is a @@ -2646,19 +2638,19 @@ pub enum KademliaEvent { /// A peer has connected for whom no listen address is known. /// /// If the peer is to be added to the routing table, a known - /// listen address for the peer must be provided via [`Kademlia::add_address`]. + /// listen address for the peer must be provided via [`Behaviour::add_address`]. UnroutablePeer { peer: PeerId }, /// A connection to a peer has been established for whom a listen address /// is known but the peer has not been added to the routing table either - /// because [`KademliaBucketInserts::Manual`] is configured or because + /// because [`BucketInserts::Manual`] is configured or because /// the corresponding bucket is full. /// /// If the peer is to be included in the routing table, it must - /// must be explicitly added via [`Kademlia::add_address`], possibly after + /// must be explicitly added via [`Behaviour::add_address`], possibly after /// removing another peer. /// - /// See [`Kademlia::kbucket`] for insight into the contents of + /// See [`Behaviour::kbucket`] for insight into the contents of /// the k-bucket of `peer`. RoutablePeer { peer: PeerId, address: Multiaddr }, @@ -2668,10 +2660,10 @@ pub enum KademliaEvent { /// may not make it into the routing table. /// /// If the peer is to be unconditionally included in the routing table, - /// it should be explicitly added via [`Kademlia::add_address`] after + /// it should be explicitly added via [`Behaviour::add_address`] after /// removing another peer. /// - /// See [`Kademlia::kbucket`] for insight into the contents of + /// See [`Behaviour::kbucket`] for insight into the contents of /// the k-bucket of `peer`. PendingRoutablePeer { peer: PeerId, address: Multiaddr }, } @@ -2719,10 +2711,10 @@ pub enum InboundRequest { num_provider_peers: usize, }, /// A peer sent an add provider request. - /// If filtering [`KademliaStoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is + /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is /// included. /// - /// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details.. + /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details.. AddProvider { record: Option }, /// Request to retrieve a record. GetRecord { @@ -2730,9 +2722,9 @@ pub enum InboundRequest { present_locally: bool, }, /// A peer sent a put record request. - /// If filtering [`KademliaStoreInserts::FilterBoth`] is enabled, the [`Record`] is included. + /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included. /// - /// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`]. + /// See [`StoreInserts`] and [`Config::set_record_filtering`]. PutRecord { source: PeerId, connection: ConnectionId, @@ -2743,35 +2735,35 @@ pub enum InboundRequest { /// The results of Kademlia queries. #[derive(Debug, Clone)] pub enum QueryResult { - /// The result of [`Kademlia::bootstrap`]. + /// The result of [`Behaviour::bootstrap`]. Bootstrap(BootstrapResult), - /// The result of [`Kademlia::get_closest_peers`]. + /// The result of [`Behaviour::get_closest_peers`]. GetClosestPeers(GetClosestPeersResult), - /// The result of [`Kademlia::get_providers`]. + /// The result of [`Behaviour::get_providers`]. GetProviders(GetProvidersResult), - /// The result of [`Kademlia::start_providing`]. + /// The result of [`Behaviour::start_providing`]. StartProviding(AddProviderResult), /// The result of a (automatic) republishing of a provider record. RepublishProvider(AddProviderResult), - /// The result of [`Kademlia::get_record`]. + /// The result of [`Behaviour::get_record`]. GetRecord(GetRecordResult), - /// The result of [`Kademlia::put_record`]. + /// The result of [`Behaviour::put_record`]. PutRecord(PutRecordResult), /// The result of a (automatic) republishing of a (value-)record. RepublishRecord(PutRecordResult), } -/// The result of [`Kademlia::get_record`]. +/// The result of [`Behaviour::get_record`]. pub type GetRecordResult = Result; -/// The successful result of [`Kademlia::get_record`]. +/// The successful result of [`Behaviour::get_record`]. #[derive(Debug, Clone)] pub enum GetRecordOk { FoundRecord(PeerRecord), @@ -2780,16 +2772,16 @@ pub enum GetRecordOk { /// _to the record key_ (not the local node) that were queried but /// did not return the record, sorted by distance to the record key /// from closest to farthest. How many of these are tracked is configured - /// by [`KademliaConfig::set_caching`]. + /// by [`Config::set_caching`]. /// /// Writing back the cache at these peers is a manual operation. - /// ie. you may wish to use these candidates with [`Kademlia::put_record_to`] + /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`] /// after selecting one of the returned records. cache_candidates: BTreeMap, }, } -/// The error result of [`Kademlia::get_record`]. +/// The error result of [`Behaviour::get_record`]. #[derive(Debug, Clone, Error)] pub enum GetRecordError { #[error("the record was not found")] @@ -2828,16 +2820,16 @@ impl GetRecordError { } } -/// The result of [`Kademlia::put_record`]. +/// The result of [`Behaviour::put_record`]. pub type PutRecordResult = Result; -/// The successful result of [`Kademlia::put_record`]. +/// The successful result of [`Behaviour::put_record`]. #[derive(Debug, Clone)] pub struct PutRecordOk { pub key: record_priv::Key, } -/// The error result of [`Kademlia::put_record`]. +/// The error result of [`Behaviour::put_record`]. #[derive(Debug, Clone, Error)] pub enum PutRecordError { #[error("the quorum failed; needed {quorum} peers")] @@ -2875,17 +2867,17 @@ impl PutRecordError { } } -/// The result of [`Kademlia::bootstrap`]. +/// The result of [`Behaviour::bootstrap`]. pub type BootstrapResult = Result; -/// The successful result of [`Kademlia::bootstrap`]. +/// The successful result of [`Behaviour::bootstrap`]. #[derive(Debug, Clone)] pub struct BootstrapOk { pub peer: PeerId, pub num_remaining: u32, } -/// The error result of [`Kademlia::bootstrap`]. +/// The error result of [`Behaviour::bootstrap`]. #[derive(Debug, Clone, Error)] pub enum BootstrapError { #[error("the request timed out")] @@ -2895,17 +2887,17 @@ pub enum BootstrapError { }, } -/// The result of [`Kademlia::get_closest_peers`]. +/// The result of [`Behaviour::get_closest_peers`]. pub type GetClosestPeersResult = Result; -/// The successful result of [`Kademlia::get_closest_peers`]. +/// The successful result of [`Behaviour::get_closest_peers`]. #[derive(Debug, Clone)] pub struct GetClosestPeersOk { pub key: Vec, pub peers: Vec, } -/// The error result of [`Kademlia::get_closest_peers`]. +/// The error result of [`Behaviour::get_closest_peers`]. #[derive(Debug, Clone, Error)] pub enum GetClosestPeersError { #[error("the request timed out")] @@ -2929,10 +2921,10 @@ impl GetClosestPeersError { } } -/// The result of [`Kademlia::get_providers`]. +/// The result of [`Behaviour::get_providers`]. pub type GetProvidersResult = Result; -/// The successful result of [`Kademlia::get_providers`]. +/// The successful result of [`Behaviour::get_providers`]. #[derive(Debug, Clone)] pub enum GetProvidersOk { FoundProviders { @@ -2945,7 +2937,7 @@ pub enum GetProvidersOk { }, } -/// The error result of [`Kademlia::get_providers`]. +/// The error result of [`Behaviour::get_providers`]. #[derive(Debug, Clone, Error)] pub enum GetProvidersError { #[error("the request timed out")] @@ -3010,8 +3002,8 @@ impl From, Addresses>> for KadPeer { node_id: e.node.key.into_preimage(), multiaddrs: e.node.value.into_vec(), connection_ty: match e.status { - NodeStatus::Connected => KadConnectionType::Connected, - NodeStatus::Disconnected => KadConnectionType::NotConnected, + NodeStatus::Connected => ConnectionType::Connected, + NodeStatus::Disconnected => ConnectionType::NotConnected, }, } } @@ -3029,7 +3021,7 @@ struct QueryInner { /// /// A request is pending if the targeted peer is not currently connected /// and these requests are sent as soon as a connection to the peer is established. - pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn); K_VALUE.get()]>, + pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>, } impl QueryInner { @@ -3045,33 +3037,33 @@ impl QueryInner { /// The context of a [`QueryInfo::AddProvider`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AddProviderContext { - /// The context is a [`Kademlia::start_providing`] operation. + /// The context is a [`Behaviour::start_providing`] operation. Publish, /// The context is periodic republishing of provider announcements - /// initiated earlier via [`Kademlia::start_providing`]. + /// initiated earlier via [`Behaviour::start_providing`]. Republish, } /// The context of a [`QueryInfo::PutRecord`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum PutRecordContext { - /// The context is a [`Kademlia::put_record`] operation. + /// The context is a [`Behaviour::put_record`] operation. Publish, /// The context is periodic republishing of records stored - /// earlier via [`Kademlia::put_record`]. + /// earlier via [`Behaviour::put_record`]. Republish, /// The context is periodic replication (i.e. without extending /// the record TTL) of stored records received earlier from another peer. Replicate, /// The context is a custom store operation targeting specific - /// peers initiated by [`Kademlia::put_record_to`]. + /// peers initiated by [`Behaviour::put_record_to`]. Custom, } /// Information about a running query. #[derive(Debug, Clone)] pub enum QueryInfo { - /// A query initiated by [`Kademlia::bootstrap`]. + /// A query initiated by [`Behaviour::bootstrap`]. Bootstrap { /// The targeted peer ID. peer: PeerId, @@ -3085,7 +3077,7 @@ pub enum QueryInfo { step: ProgressStep, }, - /// A (repeated) query initiated by [`Kademlia::get_closest_peers`]. + /// A (repeated) query initiated by [`Behaviour::get_closest_peers`]. GetClosestPeers { /// The key being queried (the preimage). key: Vec, @@ -3093,7 +3085,7 @@ pub enum QueryInfo { step: ProgressStep, }, - /// A (repeated) query initiated by [`Kademlia::get_providers`]. + /// A (repeated) query initiated by [`Behaviour::get_providers`]. GetProviders { /// The key for which to search for providers. key: record_priv::Key, @@ -3103,7 +3095,7 @@ pub enum QueryInfo { step: ProgressStep, }, - /// A (repeated) query initiated by [`Kademlia::start_providing`]. + /// A (repeated) query initiated by [`Behaviour::start_providing`]. AddProvider { /// The record key. key: record_priv::Key, @@ -3113,7 +3105,7 @@ pub enum QueryInfo { context: AddProviderContext, }, - /// A (repeated) query initiated by [`Kademlia::put_record`]. + /// A (repeated) query initiated by [`Behaviour::put_record`]. PutRecord { record: Record, /// The expected quorum of responses w.r.t. the replication factor. @@ -3124,7 +3116,7 @@ pub enum QueryInfo { context: PutRecordContext, }, - /// A (repeated) query initiated by [`Kademlia::get_record`]. + /// A (repeated) query initiated by [`Behaviour::get_record`]. GetRecord { /// The key to look for. key: record_priv::Key, @@ -3141,22 +3133,22 @@ pub enum QueryInfo { impl QueryInfo { /// Creates an event for a handler to issue an outgoing request in the /// context of a query. - fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn { + fn to_request(&self, query_id: QueryId) -> HandlerIn { match &self { - QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq { + QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq { key: peer.to_bytes(), query_id, }, - QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq { + QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq { key: key.clone(), query_id, }, - QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq { + QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq { key: key.clone(), query_id, }, QueryInfo::AddProvider { key, phase, .. } => match phase { - AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq { + AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq { key: key.to_vec(), query_id, }, @@ -3164,25 +3156,25 @@ impl QueryInfo { provider_id, external_addresses, .. - } => KademliaHandlerIn::AddProvider { + } => HandlerIn::AddProvider { key: key.clone(), provider: crate::protocol::KadPeer { node_id: *provider_id, multiaddrs: external_addresses.clone(), - connection_ty: crate::protocol::KadConnectionType::Connected, + connection_ty: crate::protocol::ConnectionType::Connected, }, }, }, - QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord { + QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord { key: key.clone(), query_id, }, QueryInfo::PutRecord { record, phase, .. } => match phase { - PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq { + PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq { key: record.key.to_vec(), query_id, }, - PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord { + PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord { record: record.clone(), query_id, }, @@ -3290,7 +3282,7 @@ impl fmt::Display for NoKnownPeers { impl std::error::Error for NoKnownPeers {} -/// The possible outcomes of [`Kademlia::add_address`]. +/// The possible outcomes of [`Behaviour::add_address`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RoutingUpdate { /// The given peer and address has been added to the routing @@ -3299,7 +3291,7 @@ pub enum RoutingUpdate { /// The peer and address is pending insertion into /// the routing table, if a disconnected peer fails /// to respond. If the given peer and address ends up - /// in the routing table, [`KademliaEvent::RoutingUpdated`] + /// in the routing table, [`Event::RoutingUpdated`] /// is eventually emitted. Pending, /// The routing table update failed, either because the diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index cd4337e9094..f85208ee817 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -48,13 +48,13 @@ use std::{ u64, }; -type TestSwarm = Swarm>; +type TestSwarm = Swarm>; fn build_node() -> (Multiaddr, TestSwarm) { build_node_with_config(Default::default()) } -fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { +fn build_node_with_config(cfg: Config) -> (Multiaddr, TestSwarm) { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = MemoryTransport::default() @@ -65,7 +65,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let local_id = local_public_key.to_peer_id(); let store = MemoryStore::new(local_id); - let behaviour = Kademlia::with_config(local_id, store, cfg); + let behaviour = Behaviour::with_config(local_id, store, cfg); let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_id).build(); @@ -82,7 +82,7 @@ fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> { } /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> { +fn build_nodes_with_config(num: usize, cfg: Config) -> Vec<(Multiaddr, TestSwarm)> { (0..num) .map(|_| build_node_with_config(cfg.clone())) .collect() @@ -95,7 +95,7 @@ fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm fn build_connected_nodes_with_config( total: usize, step: usize, - cfg: KademliaConfig, + cfg: Config, ) -> Vec<(Multiaddr, TestSwarm)> { let mut swarms = build_nodes_with_config(total, cfg); let swarm_ids: Vec<_> = swarms @@ -121,7 +121,7 @@ fn build_connected_nodes_with_config( fn build_fully_connected_nodes_with_config( total: usize, - cfg: KademliaConfig, + cfg: Config, ) -> Vec<(Multiaddr, TestSwarm)> { let mut swarms = build_nodes_with_config(total, cfg); let swarm_addr_and_peer_id: Vec<_> = swarms @@ -166,7 +166,7 @@ fn bootstrap() { // or smaller than K_VALUE. let num_group = rng.gen_range(1..(num_total % K_VALUE.get()) + 2); - let mut cfg = KademliaConfig::default(); + let mut cfg = Config::default(); if rng.gen() { cfg.disjoint_query_paths(true); } @@ -190,7 +190,7 @@ fn bootstrap() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::Bootstrap(Ok(ok)), .. @@ -280,7 +280,7 @@ fn query_iter() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::GetClosestPeers(Ok(ok)), .. @@ -338,12 +338,10 @@ fn unresponsive_not_returned_direct() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - result: QueryResult::GetClosestPeers(Ok(ok)), - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + result: QueryResult::GetClosestPeers(Ok(ok)), + .. + }))) => { assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(ok.peers.len(), 0); return Poll::Ready(()); @@ -398,12 +396,10 @@ fn unresponsive_not_returned_indirect() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - result: QueryResult::GetClosestPeers(Ok(ok)), - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + result: QueryResult::GetClosestPeers(Ok(ok)), + .. + }))) => { assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers[0], first_peer_id); @@ -453,13 +449,11 @@ fn get_record_not_found() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Err(e)), - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Err(e)), + .. + }))) => { assert_eq!(id, qid); if let GetRecordError::NotFound { key, closest_peers } = e { assert_eq!(key, target_key); @@ -495,14 +489,14 @@ fn put_record() { // At least 4 nodes, 1 under test + 3 bootnodes. let num_total = usize::max(4, replication_factor.get() * 2); - let mut config = KademliaConfig::default(); + let mut config = Config::default(); config.set_replication_factor(replication_factor); if rng.gen() { config.disjoint_query_paths(true); } if filter_records { - config.set_record_filtering(KademliaStoreInserts::FilterBoth); + config.set_record_filtering(StoreInserts::FilterBoth); } let mut swarms = { @@ -574,7 +568,7 @@ fn put_record() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::PutRecord(res), stats, @@ -582,7 +576,7 @@ fn put_record() { }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::RepublishRecord(res), stats, @@ -605,16 +599,14 @@ fn put_record() { } } } - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::InboundRequest { - request: InboundRequest::PutRecord { record, .. }, - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest { + request: InboundRequest::PutRecord { record, .. }, + }))) => { if !drop_records { if let Some(record) = record { assert_eq!( swarm.behaviour().record_filtering, - KademliaStoreInserts::FilterBoth + StoreInserts::FilterBoth ); // Accept the record swarm @@ -625,7 +617,7 @@ fn put_record() { } else { assert_eq!( swarm.behaviour().record_filtering, - KademliaStoreInserts::Unfiltered + StoreInserts::Unfiltered ); } } @@ -684,7 +676,7 @@ fn put_record() { }) .collect::>(); - if swarms[0].behaviour().record_filtering != KademliaStoreInserts::Unfiltered + if swarms[0].behaviour().record_filtering != StoreInserts::Unfiltered && drop_records { assert_eq!(actual.len(), 0); @@ -765,14 +757,12 @@ fn get_record() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Ok(r)), - step: ProgressStep { count, last }, - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(r)), + step: ProgressStep { count, last }, + .. + }))) => { assert_eq!(id, qid); if usize::from(count) == 1 { assert!(!last); @@ -829,14 +819,12 @@ fn get_record_many() { swarm.behaviour_mut().query_mut(&qid).unwrap().finish(); } match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Ok(r)), - step: ProgressStep { count: _, last }, - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(r)), + step: ProgressStep { count: _, last }, + .. + }))) => { assert_eq!(id, qid); if let GetRecordOk::FoundRecord(r) = r { assert_eq!(r.record, record); @@ -870,7 +858,7 @@ fn add_provider() { // At least 4 nodes, 1 under test + 3 bootnodes. let num_total = usize::max(4, replication_factor.get() * 2); - let mut config = KademliaConfig::default(); + let mut config = Config::default(); config.set_replication_factor(replication_factor); if rng.gen() { config.disjoint_query_paths(true); @@ -924,14 +912,14 @@ fn add_provider() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::StartProviding(res), .. }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::RepublishProvider(res), .. @@ -1062,7 +1050,7 @@ fn exceed_jobs_max_queries() { loop { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { match e { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + SwarmEvent::Behaviour(Event::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(r)), .. }) => break assert!(r.peers.is_empty()), @@ -1085,14 +1073,14 @@ fn exp_decr_expiration_overflow() { } // Right shifting a u64 by >63 results in a panic. - prop_no_panic(KademliaConfig::default().record_ttl.unwrap(), 64); + prop_no_panic(Config::default().record_ttl.unwrap(), 64); quickcheck(prop_no_panic as fn(_, _)) } #[test] fn disjoint_query_does_not_finish_before_all_paths_did() { - let mut config = KademliaConfig::default(); + let mut config = Config::default(); config.disjoint_query_paths(true); // I.e. setting the amount disjoint paths to be explored to 2. config.set_parallelism(NonZeroUsize::new(2).unwrap()); @@ -1140,13 +1128,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - result: QueryResult::GetRecord(result), - step, - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + result: QueryResult::GetRecord(result), + step, + .. + }))) => { if i != 0 { panic!("Expected `QueryResult` from Alice.") } @@ -1197,13 +1183,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { - result: QueryResult::GetRecord(result), - step, - .. - }, - ))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::OutboundQueryProgressed { + result: QueryResult::GetRecord(result), + step, + .. + }))) => { if i != 0 { panic!("Expected `QueryResult` from Alice.") } @@ -1241,11 +1225,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { } /// Tests that peers are not automatically inserted into -/// the routing table with `KademliaBucketInserts::Manual`. +/// the routing table with `BucketInserts::Manual`. #[test] fn manual_bucket_inserts() { - let mut cfg = KademliaConfig::default(); - cfg.set_kbucket_inserts(KademliaBucketInserts::Manual); + let mut cfg = Config::default(); + cfg.set_kbucket_inserts(BucketInserts::Manual); // 1 -> 2 -> [3 -> ...] let mut swarms = build_connected_nodes_with_config(3, 1, cfg); // The peers and their addresses for which we expect `RoutablePeer` events. @@ -1271,7 +1255,7 @@ fn manual_bucket_inserts() { for (_, swarm) in swarms.iter_mut() { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::RoutablePeer { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::RoutablePeer { peer, address, }))) => { @@ -1303,7 +1287,7 @@ fn network_behaviour_on_address_change() { let old_address: Multiaddr = Protocol::Memory(1).into(); let new_address: Multiaddr = Protocol::Memory(2).into(); - let mut kademlia = Kademlia::new(local_peer_id, MemoryStore::new(local_peer_id)); + let mut kademlia = Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id)); let endpoint = ConnectedPoint::Dialer { address: old_address.clone(), @@ -1337,7 +1321,7 @@ fn network_behaviour_on_address_change() { kademlia.on_connection_handler_event( remote_peer_id, connection_id, - KademliaHandlerEvent::ProtocolConfirmed { endpoint }, + HandlerEvent::ProtocolConfirmed { endpoint }, ); assert_eq!( @@ -1389,7 +1373,7 @@ fn get_providers_single() { block_on(async { match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + SwarmEvent::Behaviour(Event::OutboundQueryProgressed { result: QueryResult::StartProviding(Ok(_)), .. }) => {} @@ -1403,7 +1387,7 @@ fn get_providers_single() { block_on(async { loop { match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + SwarmEvent::Behaviour(Event::OutboundQueryProgressed { id, result: QueryResult::GetProviders(Ok(ok)), step: index, @@ -1469,7 +1453,7 @@ fn get_providers_limit() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryProgressed { + Event::OutboundQueryProgressed { id, result: QueryResult::GetProviders(Ok(ok)), step: index, diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index d695420ec2b..0df4da6bdc7 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -20,8 +20,7 @@ use crate::behaviour::Mode; use crate::protocol::{ - KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, - KademliaProtocolConfig, + KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, ProtocolConfig, }; use crate::record_priv::{self, Record}; use crate::QueryId; @@ -54,9 +53,9 @@ const MAX_NUM_SUBSTREAMS: usize = 32; /// make. /// /// It also handles requests made by the remote. -pub struct KademliaHandler { +pub struct Handler { /// Configuration of the wire protocol. - protocol_config: KademliaProtocolConfig, + protocol_config: ProtocolConfig, /// In client mode, we don't accept inbound substreams. mode: Mode, @@ -126,7 +125,7 @@ enum OutboundSubstreamState { // TODO: add timeout WaitingAnswer(KadOutStreamSink, QueryId), /// An error happened on the substream and we should report the error to the user. - ReportError(KademliaHandlerQueryErr, QueryId), + ReportError(HandlerQueryErr, QueryId), /// The substream is being closed. Closing(KadOutStreamSink), /// The substream is complete and will not perform any more work. @@ -143,7 +142,7 @@ enum InboundSubstreamState { connection_id: UniqueConnecId, substream: KadInStreamSink, }, - /// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response. + /// Waiting for the behaviour to send a [`HandlerIn`] event containing the response. WaitingBehaviour(UniqueConnecId, KadInStreamSink, Option), /// Waiting to send an answer back to the remote. PendingSend(UniqueConnecId, KadInStreamSink, KadResponseMsg), @@ -162,7 +161,7 @@ enum InboundSubstreamState { impl InboundSubstreamState { fn try_answer_with( &mut self, - id: KademliaRequestId, + id: RequestId, msg: KadResponseMsg, ) -> Result<(), KadResponseMsg> { match std::mem::replace( @@ -214,7 +213,7 @@ impl InboundSubstreamState { /// Event produced by the Kademlia handler. #[derive(Debug)] -pub enum KademliaHandlerEvent { +pub enum HandlerEvent { /// The configured protocol name has been confirmed by the peer through /// a successfully negotiated substream or by learning the supported protocols of the remote. ProtocolConfirmed { endpoint: ConnectedPoint }, @@ -228,10 +227,10 @@ pub enum KademliaHandlerEvent { /// The key for which to locate the closest nodes. key: Vec, /// Identifier of the request. Needs to be passed back when answering. - request_id: KademliaRequestId, + request_id: RequestId, }, - /// Response to an `KademliaHandlerIn::FindNodeReq`. + /// Response to an `HandlerIn::FindNodeReq`. FindNodeRes { /// Results of the request. closer_peers: Vec, @@ -245,10 +244,10 @@ pub enum KademliaHandlerEvent { /// The key for which providers are requested. key: record_priv::Key, /// Identifier of the request. Needs to be passed back when answering. - request_id: KademliaRequestId, + request_id: RequestId, }, - /// Response to an `KademliaHandlerIn::GetProvidersReq`. + /// Response to an `HandlerIn::GetProvidersReq`. GetProvidersRes { /// Nodes closest to the key. closer_peers: Vec, @@ -261,7 +260,7 @@ pub enum KademliaHandlerEvent { /// An error happened when performing a query. QueryError { /// The error that happened. - error: KademliaHandlerQueryErr, + error: HandlerQueryErr, /// The user data passed to the query. query_id: QueryId, }, @@ -279,10 +278,10 @@ pub enum KademliaHandlerEvent { /// Key for which we should look in the dht key: record_priv::Key, /// Identifier of the request. Needs to be passed back when answering. - request_id: KademliaRequestId, + request_id: RequestId, }, - /// Response to a `KademliaHandlerIn::GetRecord`. + /// Response to a `HandlerIn::GetRecord`. GetRecordRes { /// The result is present if the key has been found record: Option, @@ -296,7 +295,7 @@ pub enum KademliaHandlerEvent { PutRecord { record: Record, /// Identifier of the request. Needs to be passed back when answering. - request_id: KademliaRequestId, + request_id: RequestId, }, /// Response to a request to store a record. @@ -312,7 +311,7 @@ pub enum KademliaHandlerEvent { /// Error that can happen when requesting an RPC query. #[derive(Debug)] -pub enum KademliaHandlerQueryErr { +pub enum HandlerQueryErr { /// Error while trying to perform the query. Upgrade(StreamUpgradeError), /// Received an answer that doesn't correspond to the request. @@ -321,44 +320,44 @@ pub enum KademliaHandlerQueryErr { Io(io::Error), } -impl fmt::Display for KademliaHandlerQueryErr { +impl fmt::Display for HandlerQueryErr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - KademliaHandlerQueryErr::Upgrade(err) => { + HandlerQueryErr::Upgrade(err) => { write!(f, "Error while performing Kademlia query: {err}") } - KademliaHandlerQueryErr::UnexpectedMessage => { + HandlerQueryErr::UnexpectedMessage => { write!( f, "Remote answered our Kademlia RPC query with the wrong message type" ) } - KademliaHandlerQueryErr::Io(err) => { + HandlerQueryErr::Io(err) => { write!(f, "I/O error during a Kademlia RPC query: {err}") } } } } -impl error::Error for KademliaHandlerQueryErr { +impl error::Error for HandlerQueryErr { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - KademliaHandlerQueryErr::Upgrade(err) => Some(err), - KademliaHandlerQueryErr::UnexpectedMessage => None, - KademliaHandlerQueryErr::Io(err) => Some(err), + HandlerQueryErr::Upgrade(err) => Some(err), + HandlerQueryErr::UnexpectedMessage => None, + HandlerQueryErr::Io(err) => Some(err), } } } -impl From> for KademliaHandlerQueryErr { +impl From> for HandlerQueryErr { fn from(err: StreamUpgradeError) -> Self { - KademliaHandlerQueryErr::Upgrade(err) + HandlerQueryErr::Upgrade(err) } } /// Event to send to the handler. #[derive(Debug)] -pub enum KademliaHandlerIn { +pub enum HandlerIn { /// Resets the (sub)stream associated with the given request ID, /// thus signaling an error to the remote. /// @@ -366,7 +365,7 @@ pub enum KademliaHandlerIn { /// can be used as an alternative to letting requests simply time /// out on the remote peer, thus potentially avoiding some delay /// for the query on the remote. - Reset(KademliaRequestId), + Reset(RequestId), /// Change the connection to the specified mode. ReconfigureMode { new_mode: Mode }, @@ -387,7 +386,7 @@ pub enum KademliaHandlerIn { /// Identifier of the request that was made by the remote. /// /// It is a logic error to use an id of the handler of a different node. - request_id: KademliaRequestId, + request_id: RequestId, }, /// Same as `FindNodeReq`, but should also return the entries of the local providers list for @@ -408,7 +407,7 @@ pub enum KademliaHandlerIn { /// Identifier of the request that was made by the remote. /// /// It is a logic error to use an id of the handler of a different node. - request_id: KademliaRequestId, + request_id: RequestId, }, /// Indicates that this provider is known for this key. @@ -437,7 +436,7 @@ pub enum KademliaHandlerIn { /// Nodes that are closer to the key we were searching for. closer_peers: Vec, /// Identifier of the request that was made by the remote. - request_id: KademliaRequestId, + request_id: RequestId, }, /// Put a value into the dht records. @@ -454,14 +453,14 @@ pub enum KademliaHandlerIn { /// Value that was put. value: Vec, /// Identifier of the request that was made by the remote. - request_id: KademliaRequestId, + request_id: RequestId, }, } /// Unique identifier for a request. Must be passed back in order to answer a request from /// the remote. #[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub struct KademliaRequestId { +pub struct RequestId { /// Unique identifier for an incoming connection. connec_unique_id: UniqueConnecId, } @@ -470,9 +469,9 @@ pub struct KademliaRequestId { #[derive(Debug, Copy, Clone, PartialEq, Eq)] struct UniqueConnecId(u64); -impl KademliaHandler { +impl Handler { pub fn new( - protocol_config: KademliaProtocolConfig, + protocol_config: ProtocolConfig, idle_timeout: Duration, endpoint: ConnectedPoint, remote_peer_id: PeerId, @@ -494,7 +493,7 @@ impl KademliaHandler { let keep_alive = KeepAlive::Until(Instant::now() + idle_timeout); - KademliaHandler { + Handler { protocol_config, mode, idle_timeout, @@ -612,12 +611,12 @@ impl KademliaHandler { } } -impl ConnectionHandler for KademliaHandler { - type FromBehaviour = KademliaHandlerIn; - type ToBehaviour = KademliaHandlerEvent; +impl ConnectionHandler for Handler { + type FromBehaviour = HandlerIn; + type ToBehaviour = HandlerEvent; type Error = io::Error; // TODO: better error type? - type InboundProtocol = Either; - type OutboundProtocol = KademliaProtocolConfig; + type InboundProtocol = Either; + type OutboundProtocol = ProtocolConfig; type OutboundOpenInfo = (); type InboundOpenInfo = (); @@ -628,9 +627,9 @@ impl ConnectionHandler for KademliaHandler { } } - fn on_behaviour_event(&mut self, message: KademliaHandlerIn) { + fn on_behaviour_event(&mut self, message: HandlerIn) { match message { - KademliaHandlerIn::Reset(request_id) => { + HandlerIn::Reset(request_id) => { if let Some(state) = self .inbound_substreams .iter_mut() @@ -644,19 +643,19 @@ impl ConnectionHandler for KademliaHandler { state.close(); } } - KademliaHandlerIn::FindNodeReq { key, query_id } => { + HandlerIn::FindNodeReq { key, query_id } => { let msg = KadRequestMsg::FindNode { key }; self.pending_messages.push_back((msg, Some(query_id))); } - KademliaHandlerIn::FindNodeRes { + HandlerIn::FindNodeRes { closer_peers, request_id, } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), - KademliaHandlerIn::GetProvidersReq { key, query_id } => { + HandlerIn::GetProvidersReq { key, query_id } => { let msg = KadRequestMsg::GetProviders { key }; self.pending_messages.push_back((msg, Some(query_id))); } - KademliaHandlerIn::GetProvidersRes { + HandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, @@ -667,19 +666,19 @@ impl ConnectionHandler for KademliaHandler { provider_peers, }, ), - KademliaHandlerIn::AddProvider { key, provider } => { + HandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; self.pending_messages.push_back((msg, None)); } - KademliaHandlerIn::GetRecord { key, query_id } => { + HandlerIn::GetRecord { key, query_id } => { let msg = KadRequestMsg::GetValue { key }; self.pending_messages.push_back((msg, Some(query_id))); } - KademliaHandlerIn::PutRecord { record, query_id } => { + HandlerIn::PutRecord { record, query_id } => { let msg = KadRequestMsg::PutValue { record }; self.pending_messages.push_back((msg, Some(query_id))); } - KademliaHandlerIn::GetRecordRes { + HandlerIn::GetRecordRes { record, closer_peers, request_id, @@ -692,14 +691,14 @@ impl ConnectionHandler for KademliaHandler { }, ); } - KademliaHandlerIn::PutRecordRes { + HandlerIn::PutRecordRes { key, request_id, value, } => { self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value }); } - KademliaHandlerIn::ReconfigureMode { new_mode } => { + HandlerIn::ReconfigureMode { new_mode } => { let peer = self.remote_peer_id; match &self.endpoint { @@ -736,7 +735,7 @@ impl ConnectionHandler for KademliaHandler { if let ProtocolStatus::Confirmed = self.protocol_status { self.protocol_status = ProtocolStatus::Reported; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::ProtocolConfirmed { + HandlerEvent::ProtocolConfirmed { endpoint: self.endpoint.clone(), }, )); @@ -833,8 +832,8 @@ impl ConnectionHandler for KademliaHandler { } } -impl KademliaHandler { - fn answer_pending_request(&mut self, request_id: KademliaRequestId, mut msg: KadResponseMsg) { +impl Handler { + fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) { for state in self.inbound_substreams.iter_mut() { match state.try_answer_with(request_id, msg) { Ok(()) => return, @@ -849,7 +848,7 @@ impl KademliaHandler { } impl futures::Stream for OutboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -866,8 +865,8 @@ impl futures::Stream for OutboundSubstreamState { *this = OutboundSubstreamState::Done; let event = query_id.map(|query_id| { ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), + HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), query_id, }, ) @@ -883,12 +882,10 @@ impl futures::Stream for OutboundSubstreamState { Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), - query_id, - }, - ) + ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }) }); return Poll::Ready(event); @@ -911,12 +908,10 @@ impl futures::Stream for OutboundSubstreamState { Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), - query_id, - }, - ) + ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }) }); return Poll::Ready(event); @@ -939,8 +934,8 @@ impl futures::Stream for OutboundSubstreamState { } Poll::Ready(Some(Err(error))) => { *this = OutboundSubstreamState::Done; - let event = KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), + let event = HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), query_id, }; @@ -950,10 +945,8 @@ impl futures::Stream for OutboundSubstreamState { } Poll::Ready(None) => { *this = OutboundSubstreamState::Done; - let event = KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io( - io::ErrorKind::UnexpectedEof.into(), - ), + let event = HandlerEvent::QueryError { + error: HandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), query_id, }; @@ -965,7 +958,7 @@ impl futures::Stream for OutboundSubstreamState { } OutboundSubstreamState::ReportError(error, query_id) => { *this = OutboundSubstreamState::Done; - let event = KademliaHandlerEvent::QueryError { error, query_id }; + let event = HandlerEvent::QueryError { error, query_id }; return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); } @@ -987,7 +980,7 @@ impl futures::Stream for OutboundSubstreamState { } impl futures::Stream for InboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -1013,9 +1006,9 @@ impl futures::Stream for InboundSubstreamState { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::FindNodeReq { + HandlerEvent::FindNodeReq { key, - request_id: KademliaRequestId { + request_id: RequestId { connec_unique_id: connection_id, }, }, @@ -1025,9 +1018,9 @@ impl futures::Stream for InboundSubstreamState { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::GetProvidersReq { + HandlerEvent::GetProvidersReq { key, - request_id: KademliaRequestId { + request_id: RequestId { connec_unique_id: connection_id, }, }, @@ -1040,16 +1033,16 @@ impl futures::Stream for InboundSubstreamState { substream, }; return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::AddProvider { key, provider }, + HandlerEvent::AddProvider { key, provider }, ))); } Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::GetRecord { + HandlerEvent::GetRecord { key, - request_id: KademliaRequestId { + request_id: RequestId { connec_unique_id: connection_id, }, }, @@ -1059,9 +1052,9 @@ impl futures::Stream for InboundSubstreamState { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - KademliaHandlerEvent::PutRecord { + HandlerEvent::PutRecord { record, - request_id: KademliaRequestId { + request_id: RequestId { connec_unique_id: connection_id, }, }, @@ -1138,24 +1131,24 @@ impl futures::Stream for InboundSubstreamState { } /// Process a Kademlia message that's supposed to be a response to one of our requests. -fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> KademliaHandlerEvent { +fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> HandlerEvent { // TODO: must check that the response corresponds to the request match event { KadResponseMsg::Pong => { // We never send out pings. - KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::UnexpectedMessage, + HandlerEvent::QueryError { + error: HandlerQueryErr::UnexpectedMessage, query_id, } } - KadResponseMsg::FindNode { closer_peers } => KademliaHandlerEvent::FindNodeRes { + KadResponseMsg::FindNode { closer_peers } => HandlerEvent::FindNodeRes { closer_peers, query_id, }, KadResponseMsg::GetProviders { closer_peers, provider_peers, - } => KademliaHandlerEvent::GetProvidersRes { + } => HandlerEvent::GetProvidersRes { closer_peers, provider_peers, query_id, @@ -1163,12 +1156,12 @@ fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> KademliaHan KadResponseMsg::GetValue { record, closer_peers, - } => KademliaHandlerEvent::GetRecordRes { + } => HandlerEvent::GetRecordRes { record, closer_peers, query_id, }, - KadResponseMsg::PutValue { key, value, .. } => KademliaHandlerEvent::PutRecordRes { + KadResponseMsg::PutValue { key, value, .. } => HandlerEvent::PutRecordRes { key, value, query_id, diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index cfc4f92941b..af07076018e 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -74,10 +74,10 @@ use std::vec; /// The maximum number of queries towards which background jobs /// are allowed to start new queries on an invocation of -/// `Kademlia::poll`. +/// `Behaviour::poll`. pub(crate) const JOBS_MAX_QUERIES: usize = 100; /// The maximum number of new queries started by a background job -/// per invocation of `Kademlia::poll`. +/// per invocation of `Behaviour::poll`. pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10; /// A background job run periodically. #[derive(Debug)] diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index ccdb06b885d..dd9f7f56f30 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -26,7 +26,7 @@ //! [Identify](https://github.com/libp2p/specs/tree/master/identify) protocol might be seen as a core protocol. Rust-libp2p //! tries to stay as generic as possible, and does not make this assumption. //! This means that the Identify protocol must be manually hooked up to Kademlia through calls -//! to [`Kademlia::add_address`]. +//! to [`Behaviour::add_address`]. //! If you choose not to use the Identify protocol, and do not provide an alternative peer //! discovery mechanism, a Kademlia node will not discover nodes beyond the network's //! [boot nodes](https://docs.libp2p.io/concepts/glossary/#boot-node). Without the Identify protocol, @@ -73,11 +73,10 @@ pub use behaviour::{ QueryResult, QueryStats, RoutingUpdate, }; pub use behaviour::{ - Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, - KademliaStoreInserts, ProgressStep, Quorum, + Behaviour, BucketInserts, Caching, Config, Event, ProgressStep, Quorum, StoreInserts, }; pub use kbucket::{Distance as KBucketDistance, EntryView, KBucketRef, Key as KBucketKey}; -pub use protocol::KadConnectionType; +pub use protocol::ConnectionType; pub use query::QueryId; pub use record_priv::{store, Key as RecordKey, ProviderRecord, Record}; @@ -115,3 +114,30 @@ pub const PROTOCOL_NAME: StreamProtocol = protocol::DEFAULT_PROTO_NAME; /// Constant shared across tests for the [`Multihash`](libp2p_core::multihash::Multihash) type. #[cfg(test)] const SHA_256_MH: u64 = 0x12; + +#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Behaviour`.")] +pub type Kademlia = Behaviour; + +#[deprecated( + note = "Import the `kad` module instead and refer to this type as `kad::BucketInserts`." +)] +pub type KademliaBucketInserts = BucketInserts; + +#[deprecated( + note = "Import the `kad` module instead and refer to this type as `kad::StoreInserts`." +)] +pub type KademliaStoreInserts = StoreInserts; + +#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Config`.")] +pub type KademliaConfig = Config; + +#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Caching`.")] +pub type KademliaCaching = Caching; + +#[deprecated(note = "Import the `kad` module instead and refer to this type as `kad::Event`.")] +pub type KademliaEvent = Event; + +#[deprecated( + note = "Import the `kad` module instead and refer to this type as `kad::ConnectionType`." +)] +pub type KadConnectionType = ConnectionType; diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 24b24789091..e6341ee4f21 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -20,7 +20,7 @@ //! The Kademlia connection protocol upgrade and associated message types. //! -//! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the +//! The connection protocol upgrade is provided by [`ProtocolConfig`], with the //! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively. //! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used //! to poll the underlying transport for incoming messages, and the `Sink` component @@ -46,7 +46,7 @@ pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs pub(crate) const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024; /// Status of our connection to a node reported by the Kademlia protocol. #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] -pub enum KadConnectionType { +pub enum ConnectionType { /// Sender hasn't tried to connect to peer. NotConnected = 0, /// Sender is currently connected to peer. @@ -57,26 +57,26 @@ pub enum KadConnectionType { CannotConnect = 3, } -impl From for KadConnectionType { - fn from(raw: proto::ConnectionType) -> KadConnectionType { +impl From for ConnectionType { + fn from(raw: proto::ConnectionType) -> ConnectionType { use proto::ConnectionType::*; match raw { - NOT_CONNECTED => KadConnectionType::NotConnected, - CONNECTED => KadConnectionType::Connected, - CAN_CONNECT => KadConnectionType::CanConnect, - CANNOT_CONNECT => KadConnectionType::CannotConnect, + NOT_CONNECTED => ConnectionType::NotConnected, + CONNECTED => ConnectionType::Connected, + CAN_CONNECT => ConnectionType::CanConnect, + CANNOT_CONNECT => ConnectionType::CannotConnect, } } } -impl From for proto::ConnectionType { - fn from(val: KadConnectionType) -> Self { +impl From for proto::ConnectionType { + fn from(val: ConnectionType) -> Self { use proto::ConnectionType::*; match val { - KadConnectionType::NotConnected => NOT_CONNECTED, - KadConnectionType::Connected => CONNECTED, - KadConnectionType::CanConnect => CAN_CONNECT, - KadConnectionType::CannotConnect => CANNOT_CONNECT, + ConnectionType::NotConnected => NOT_CONNECTED, + ConnectionType::Connected => CONNECTED, + ConnectionType::CanConnect => CAN_CONNECT, + ConnectionType::CannotConnect => CANNOT_CONNECT, } } } @@ -89,7 +89,7 @@ pub struct KadPeer { /// The multiaddresses that the sender think can be used in order to reach the peer. pub multiaddrs: Vec, /// How the sender is connected to that remote. - pub connection_ty: KadConnectionType, + pub connection_ty: ConnectionType, } // Builds a `KadPeer` from a corresponding protobuf message. @@ -135,13 +135,13 @@ impl From for proto::Peer { // only one request, then we can change the output of the `InboundUpgrade` and // `OutboundUpgrade` to be just a single message #[derive(Debug, Clone)] -pub struct KademliaProtocolConfig { +pub struct ProtocolConfig { protocol_names: Vec, /// Maximum allowed size of a packet. max_packet_size: usize, } -impl KademliaProtocolConfig { +impl ProtocolConfig { /// Returns the configured protocol name. pub fn protocol_names(&self) -> &[StreamProtocol] { &self.protocol_names @@ -159,16 +159,16 @@ impl KademliaProtocolConfig { } } -impl Default for KademliaProtocolConfig { +impl Default for ProtocolConfig { fn default() -> Self { - KademliaProtocolConfig { + ProtocolConfig { protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(), max_packet_size: DEFAULT_MAX_PACKET_SIZE, } } } -impl UpgradeInfo for KademliaProtocolConfig { +impl UpgradeInfo for ProtocolConfig { type Info = StreamProtocol; type InfoIter = std::vec::IntoIter; @@ -213,7 +213,7 @@ pub(crate) type KadInStreamSink = Framed = Framed>; -impl InboundUpgrade for KademliaProtocolConfig +impl InboundUpgrade for ProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, { @@ -228,7 +228,7 @@ where } } -impl OutboundUpgrade for KademliaProtocolConfig +impl OutboundUpgrade for ProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, { @@ -624,7 +624,7 @@ mod tests { use futures::{Future, Sink, Stream}; use libp2p_core::{PeerId, PublicKey, Transport}; use multihash::{encode, Hash}; - use protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig}; + use protocol::{ConnectionType, KadPeer, ProtocolConfig}; use std::sync::mpsc; use std::thread; @@ -641,7 +641,7 @@ mod tests { closer_peers: vec![KadPeer { node_id: PeerId::random(), multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()], - connection_ty: KadConnectionType::Connected, + connection_ty: ConnectionType::Connected, }], }); test_one(KadMsg::GetProvidersReq { @@ -651,12 +651,12 @@ mod tests { closer_peers: vec![KadPeer { node_id: PeerId::random(), multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()], - connection_ty: KadConnectionType::Connected, + connection_ty: ConnectionType::Connected, }], provider_peers: vec![KadPeer { node_id: PeerId::random(), multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()], - connection_ty: KadConnectionType::NotConnected, + connection_ty: ConnectionType::NotConnected, }], }); test_one(KadMsg::AddProvider { @@ -664,7 +664,7 @@ mod tests { provider_peer: KadPeer { node_id: PeerId::random(), multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()], - connection_ty: KadConnectionType::Connected, + connection_ty: ConnectionType::Connected, }, }); // TODO: all messages @@ -674,7 +674,7 @@ mod tests { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let transport = TcpTransport::default().with_upgrade(KademliaProtocolConfig); + let transport = TcpTransport::default().with_upgrade(ProtocolConfig); let (listener, addr) = transport .listen_on( "/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -694,7 +694,7 @@ mod tests { let _ = rt.block_on(future).unwrap(); }); - let transport = TcpTransport::default().with_upgrade(KademliaProtocolConfig); + let transport = TcpTransport::default().with_upgrade(ProtocolConfig); let future = transport .dial(rx.recv().unwrap()) diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 6b0a42a0b74..6cc158619f5 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -230,19 +230,19 @@ pub struct QueryId(usize); pub(crate) struct QueryConfig { /// Timeout of a single query. /// - /// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details. + /// See [`crate::behaviour::Config::set_query_timeout`] for details. pub(crate) timeout: Duration, /// The replication factor to use. /// - /// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details. + /// See [`crate::behaviour::Config::set_replication_factor`] for details. pub(crate) replication_factor: NonZeroUsize, /// Allowed level of parallelism for iterative queries. /// - /// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details. + /// See [`crate::behaviour::Config::set_parallelism`] for details. pub(crate) parallelism: NonZeroUsize, /// Whether to use disjoint paths on iterative lookups. /// - /// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details. + /// See [`crate::behaviour::Config::disjoint_query_paths`] for details. pub(crate) disjoint_query_paths: bool, } diff --git a/protocols/kad/tests/client_mode.rs b/protocols/kad/tests/client_mode.rs index 30fd4d972a8..bc162ff6a01 100644 --- a/protocols/kad/tests/client_mode.rs +++ b/protocols/kad/tests/client_mode.rs @@ -1,7 +1,7 @@ use libp2p_identify as identify; use libp2p_identity as identity; use libp2p_kad::store::MemoryStore; -use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent, Mode}; +use libp2p_kad::{Behaviour, Config, Event, Mode}; use libp2p_swarm::Swarm; use libp2p_swarm_test::SwarmExt; @@ -19,7 +19,7 @@ async fn server_gets_added_to_routing_table_by_client() { match libp2p_swarm_test::drive(&mut client, &mut server).await { ( - [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })], + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })], [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_)], ) => { assert_eq!(peer, server_peer_id) @@ -41,7 +41,7 @@ async fn two_servers_add_each_other_to_routing_table() { let server1_peer_id = *server1.local_peer_id(); let server2_peer_id = *server2.local_peer_id(); - use KademliaEvent::*; + use Event::*; use MyBehaviourEvent::*; match libp2p_swarm_test::drive(&mut server1, &mut server2).await { @@ -94,7 +94,7 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti other => panic!("Unexpected events: {other:?}"), } - use KademliaEvent::*; + use Event::*; // Server learns its external address (this could be through AutoNAT or some other mechanism). server.add_external_address(memory_addr); @@ -127,7 +127,7 @@ async fn set_client_to_server_mode() { match libp2p_swarm_test::drive(&mut client, &mut server).await { ( - [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })], + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(Event::RoutingUpdated { peer, .. })], [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(identify::Event::Received { info, .. })], ) => { assert_eq!(peer, server_peer_id); @@ -159,7 +159,7 @@ async fn set_client_to_server_mode() { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct MyBehaviour { identify: identify::Behaviour, - kad: Kademlia, + kad: Behaviour, } impl MyBehaviour { @@ -171,10 +171,10 @@ impl MyBehaviour { "/test/1.0.0".to_owned(), k.public(), )), - kad: Kademlia::with_config( + kad: Behaviour::with_config( local_peer_id, MemoryStore::new(local_peer_id), - KademliaConfig::default(), + Config::default(), ), } } diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index fa3f6c69dd0..d0680591621 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -98,7 +98,7 @@ fn three_fields() { struct Foo { ping: ping::Behaviour, identify: identify::Behaviour, - kad: libp2p_kad::Kademlia, + kad: libp2p_kad::Behaviour, } #[allow( @@ -115,7 +115,7 @@ fn three_fields() { let _: identify::Event = event; } FooEvent::Kad(event) => { - let _: libp2p_kad::KademliaEvent = event; + let _: libp2p_kad::Event = event; } } } @@ -327,7 +327,7 @@ fn with_either() { #[derive(NetworkBehaviour)] #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Foo { - kad: libp2p_kad::Kademlia, + kad: libp2p_kad::Behaviour, ping_or_identify: Either, } @@ -351,7 +351,7 @@ fn with_generics() { fn foo() { require_net_behaviour::< Foo< - libp2p_kad::Kademlia, + libp2p_kad::Behaviour, libp2p_ping::Behaviour, >, >(); @@ -370,7 +370,7 @@ fn with_generics_mixed() { #[allow(dead_code)] fn foo() { - require_net_behaviour::>>( + require_net_behaviour::>>( ); } } @@ -381,12 +381,12 @@ fn custom_event_with_either() { #[allow(clippy::large_enum_variant)] enum BehaviourOutEvent { - Kad(libp2p_kad::KademliaEvent), + Kad(libp2p_kad::Event), PingOrIdentify(Either), } - impl From for BehaviourOutEvent { - fn from(event: libp2p_kad::KademliaEvent) -> Self { + impl From for BehaviourOutEvent { + fn from(event: libp2p_kad::Event) -> Self { BehaviourOutEvent::Kad(event) } } @@ -404,7 +404,7 @@ fn custom_event_with_either() { prelude = "libp2p_swarm::derive_prelude" )] struct Foo { - kad: libp2p_kad::Kademlia, + kad: libp2p_kad::Behaviour, ping_or_identify: Either, }