From fc6efaf56bec7e7ee97ba9c53da6b5856aa44391 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 26 Oct 2023 22:40:05 +1100 Subject: [PATCH] fix(dcutr): exchange address candidates In a serious of refactorings, we seem to have introduced a bug where we where exchanged the _external_ addresses of our node as part of `libp2p-dcutr`. This is ironically quite pointless. If we have external addresses, then there is no need for hole-punching (i.e. DCUtR). Instead of gathering external addresses, we use an LRU cache to store our observed addresses. Repeatedly observed addresses will be tried first which should increase the success rate of a hole-punch. Resolves: #4496. Pull-Request: #4624. --- Cargo.lock | 12 +++- examples/dcutr/src/main.rs | 3 +- hole-punching-tests/src/main.rs | 1 - protocols/dcutr/CHANGELOG.md | 4 ++ protocols/dcutr/Cargo.toml | 1 + .../src/{behaviour_impl.rs => behaviour.rs} | 66 ++++++++++++----- protocols/dcutr/src/handler/relayed.rs | 2 +- protocols/dcutr/src/lib.rs | 6 +- protocols/dcutr/tests/lib.rs | 72 +++++++++++++------ 9 files changed, 119 insertions(+), 48 deletions(-) rename protocols/dcutr/src/{behaviour_impl.rs => behaviour.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index 61095bb524e..72b8ffed191 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,7 @@ dependencies = [ "libp2p-tcp", "libp2p-yamux", "log", + "lru 0.11.1", "quick-protobuf", "quick-protobuf-codec", "rand 0.8.5", @@ -2632,7 +2633,7 @@ dependencies = [ "libp2p-swarm", "libp2p-swarm-test", "log", - "lru", + "lru 0.12.0", "quick-protobuf", "quick-protobuf-codec", "smallvec", @@ -3412,6 +3413,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "lru" version = "0.12.0" diff --git a/examples/dcutr/src/main.rs b/examples/dcutr/src/main.rs index f9ddb1e2ef1..6a87e351e02 100644 --- a/examples/dcutr/src/main.rs +++ b/examples/dcutr/src/main.rs @@ -156,8 +156,7 @@ async fn main() -> Result<(), Box> { info: identify::Info { observed_addr, .. }, .. })) => { - info!("Relay told us our public address: {:?}", observed_addr); - swarm.add_external_address(observed_addr); + info!("Relay told us our observed address: {observed_addr}"); learned_observed_addr = true; } event => panic!("{event:?}"), diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index 4bd7312df4b..5e86ecf24b8 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -211,7 +211,6 @@ async fn client_connect_to_relay( .. })) => { log::info!("Relay told us our public address: {observed_addr}"); - swarm.add_external_address(observed_addr); break; } SwarmEvent::ConnectionEstablished { connection_id, .. } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index fc9fddb9fe0..179db86dff2 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -5,6 +5,10 @@ [PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558 +- Exchange address _candidates_ instead of external addresses in `CONNECT`. + If hole-punching wasn't working properly for you until now, this might be the reason why. + See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624). + ## 0.10.0 - Raise MSRV to 1.65. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 737d8aede70..ce038b4b5b7 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -24,6 +24,7 @@ quick-protobuf = "0.8" quick-protobuf-codec = { workspace = true } thiserror = "1.0" void = "1" +lru = "0.11.1" [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour.rs similarity index 88% rename from protocols/dcutr/src/behaviour_impl.rs rename to protocols/dcutr/src/behaviour.rs index e7ecdd3c6ad..72b30421346 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -29,13 +29,13 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent, -}; -use libp2p_swarm::{ - ExternalAddresses, NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, - ToSwarm, + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler, + THandlerOutEvent, }; +use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm}; +use lru::LruCache; use std::collections::{HashMap, HashSet, VecDeque}; +use std::num::NonZeroUsize; use std::task::{Context, Poll}; use thiserror::Error; use void::Void; @@ -79,9 +79,7 @@ pub struct Behaviour { /// All direct (non-relayed) connections. direct_connections: HashMap>, - external_addresses: ExternalAddresses, - - local_peer_id: PeerId, + address_candidates: Candidates, direct_to_relayed_connections: HashMap, @@ -95,20 +93,14 @@ impl Behaviour { Behaviour { queued_events: Default::default(), direct_connections: Default::default(), - external_addresses: Default::default(), - local_peer_id, + address_candidates: Candidates::new(local_peer_id), direct_to_relayed_connections: Default::default(), outgoing_direct_connection_attempts: Default::default(), } } fn observed_addresses(&self) -> Vec { - self.external_addresses - .iter() - .filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit)) - .cloned() - .map(|a| a.with(Protocol::P2p(self.local_peer_id))) - .collect() + self.address_candidates.iter().cloned().collect() } fn on_dial_failure( @@ -359,13 +351,14 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - self.external_addresses.on_swarm_event(&event); - match event { FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), + FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => { + self.address_candidates.add(addr.clone()); + } FromSwarm::AddressChange(_) | FromSwarm::ConnectionEstablished(_) | FromSwarm::ListenFailure(_) @@ -374,13 +367,48 @@ impl NetworkBehaviour for Behaviour { | FromSwarm::ExpiredListenAddr(_) | FromSwarm::ListenerError(_) | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) | FromSwarm::ExternalAddrExpired(_) | FromSwarm::ExternalAddrConfirmed(_) => {} } } } +/// Stores our address candidates. +/// +/// We use an [`LruCache`] to favor addresses that are reported more often. +/// When attempting a hole-punch, we will try more frequent addresses first. +/// Most of these addresses will come from observations by other nodes (via e.g. the identify protocol). +/// More common observations mean a more likely stable port-mapping and thus a higher chance of a successful hole-punch. +struct Candidates { + inner: LruCache, + me: PeerId, +} + +impl Candidates { + fn new(me: PeerId) -> Self { + Self { + inner: LruCache::new(NonZeroUsize::new(20).expect("20 > 0")), + me, + } + } + + fn add(&mut self, mut address: Multiaddr) { + if is_relayed(&address) { + return; + } + + if address.iter().last() != Some(Protocol::P2p(self.me)) { + address.push(Protocol::P2p(self.me)); + } + + self.inner.push(address, ()); + } + + fn iter(&self) -> impl Iterator { + self.inner.iter().map(|(a, _)| a) + } +} + fn is_relayed(addr: &Multiaddr) -> bool { addr.iter().any(|p| p == Protocol::P2pCircuit) } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 3b23a1a2a6b..23ab9f4ae5a 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -20,7 +20,7 @@ //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. -use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; +use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; use crate::protocol; use either::Either; use futures::future; diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 6001c9144e7..389365f94c5 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -23,7 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed. +mod behaviour; mod handler; mod protocol; @@ -33,9 +33,7 @@ mod proto { pub(crate) use self::holepunch::pb::{mod_HolePunch::*, HolePunch}; } -pub use behaviour_impl::Behaviour; -pub use behaviour_impl::Error; -pub use behaviour_impl::Event; +pub use behaviour::{Behaviour, Error, Event}; pub use protocol::PROTOCOL_NAME; pub mod inbound { pub use crate::protocol::inbound::UpgradeError; diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 7c732f90173..6078b101fa2 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -22,6 +22,7 @@ use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::upgrade::Version; use libp2p_core::transport::{MemoryTransport, Transport}; use libp2p_dcutr as dcutr; +use libp2p_identify as identify; use libp2p_identity as identity; use libp2p_identity::PeerId; use libp2p_plaintext as plaintext; @@ -38,10 +39,19 @@ async fn connect() { let mut dst = build_client(); let mut src = build_client(); - // Have all swarms listen on a local memory address. - let (relay_addr, _) = relay.listen().await; - let (dst_addr, _) = dst.listen().await; - src.listen().await; + // Have all swarms listen on a local TCP address. + let (memory_addr, relay_addr) = relay.listen().await; + relay.remove_external_address(&memory_addr); + relay.add_external_address(relay_addr.clone()); + + let (dst_mem_addr, dst_tcp_addr) = dst.listen().await; + let (src_mem_addr, _) = src.listen().await; + + dst.remove_external_address(&dst_mem_addr); + src.remove_external_address(&src_mem_addr); + + assert!(src.external_addresses().next().is_none()); + assert!(dst.external_addresses().next().is_none()); let relay_peer_id = *relay.local_peer_id(); let dst_peer_id = *dst.local_peer_id(); @@ -80,11 +90,12 @@ async fn connect() { break; } } + ClientEvent::Identify(_) => {} other => panic!("Unexpected event: {other:?}."), } } - let dst_addr = dst_addr.with(Protocol::P2p(dst_peer_id)); + let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id)); let established_conn_id = src .wait(move |e| match e { @@ -109,20 +120,33 @@ async fn connect() { assert_eq!(established_conn_id, reported_conn_id); } -fn build_relay() -> Swarm { +fn build_relay() -> Swarm { Swarm::new_ephemeral(|identity| { let local_peer_id = identity.public().to_peer_id(); - relay::Behaviour::new( - local_peer_id, - relay::Config { - reservation_duration: Duration::from_secs(2), - ..Default::default() - }, - ) + Relay { + relay: relay::Behaviour::new( + local_peer_id, + relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }, + ), + identify: identify::Behaviour::new(identify::Config::new( + "/relay".to_owned(), + identity.public(), + )), + } }) } +#[derive(NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct Relay { + relay: relay::Behaviour, + identify: identify::Behaviour, +} + fn build_client() -> Swarm { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = local_key.public().to_peer_id(); @@ -142,6 +166,10 @@ fn build_client() -> Swarm { Client { relay: behaviour, dcutr: dcutr::Behaviour::new(local_peer_id), + identify: identify::Behaviour::new(identify::Config::new( + "/client".to_owned(), + local_key.public(), + )), }, local_peer_id, Config::with_async_std_executor(), @@ -153,6 +181,7 @@ fn build_client() -> Swarm { struct Client { relay: relay::client::Behaviour, dcutr: dcutr::Behaviour, + identify: identify::Behaviour, } async fn wait_for_reservation( @@ -163,14 +192,16 @@ async fn wait_for_reservation( ) { let mut new_listen_addr_for_relayed_addr = false; let mut reservation_req_accepted = false; + let mut addr_observed = false; + loop { + if new_listen_addr_for_relayed_addr && reservation_req_accepted && addr_observed { + break; + } + match client.next_swarm_event().await { - SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {} SwarmEvent::NewListenAddr { address, .. } if address == client_addr => { new_listen_addr_for_relayed_addr = true; - if reservation_req_accepted { - break; - } } SwarmEvent::Behaviour(ClientEvent::Relay( relay::client::Event::ReservationReqAccepted { @@ -180,15 +211,16 @@ async fn wait_for_reservation( }, )) if relay_peer_id == peer_id && renewal == is_renewal => { reservation_req_accepted = true; - if new_listen_addr_for_relayed_addr { - break; - } } SwarmEvent::Dialing { peer_id: Some(peer_id), .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} + SwarmEvent::Behaviour(ClientEvent::Identify(identify::Event::Received { .. })) => { + addr_observed = true; + } + SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {} e => panic!("{e:?}"), } }