Skip to content

Commit

Permalink
fix(swarm): keep connections alive while active streams exist
Browse files Browse the repository at this point in the history
Resolves: #4520.
Related: #4306.

Pull-Request: #4595.
  • Loading branch information
leonzchang authored Oct 25, 2023
1 parent 4378722 commit fcd410a
Show file tree
Hide file tree
Showing 23 changed files with 152 additions and 581 deletions.
16 changes: 4 additions & 12 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol,
};
use std::collections::VecDeque;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -249,20 +249,12 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if !self.queued_events.is_empty() {
return KeepAlive::Yes;
}

if self.inbound_connect.is_some() {
return KeepAlive::Yes;
}

fn connection_keep_alive(&self) -> bool {
if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
return KeepAlive::Yes;
return true;
}

KeepAlive::No
false
}

fn poll(
Expand Down
2 changes: 0 additions & 2 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,6 @@ where
type ConnectionHandler = Handler;
type ToSwarm = Event;

#[allow(deprecated)]
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
Expand All @@ -3276,7 +3275,6 @@ where
Ok(Handler::new(self.config.protocol_config()))
}

#[allow(deprecated)]
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
Expand Down
25 changes: 3 additions & 22 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use instant::Instant;
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_swarm::Stream;
use smallvec::SmallVec;
Expand Down Expand Up @@ -424,26 +423,8 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
match self {
Handler::Enabled(handler) => {
if handler.in_mesh {
return KeepAlive::Yes;
}

if let Some(
OutboundSubstreamState::PendingSend(_, _)
| OutboundSubstreamState::PendingFlush(_),
) = handler.outbound_substream
{
return KeepAlive::Yes;
}

#[allow(deprecated)]
KeepAlive::No
}
Handler::Disabled(_) => KeepAlive::No,
}
fn connection_keep_alive(&self) -> bool {
matches!(self, Handler::Enabled(h) if h.in_mesh)
}

fn poll(
Expand Down
10 changes: 1 addition & 9 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
ProtocolSupport,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
SubstreamProtocol, SupportedProtocols,
};
use log::{warn, Level};
Expand Down Expand Up @@ -314,14 +314,6 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if !self.active_streams.is_empty() {
return KeepAlive::Yes;
}

KeepAlive::No
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
10 changes: 1 addition & 9 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamUpgradeError,
SubstreamProtocol, SupportedProtocols,
};
use log::trace;
Expand Down Expand Up @@ -702,14 +702,6 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
return KeepAlive::No;
};

KeepAlive::Yes
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
14 changes: 6 additions & 8 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
StreamUpgradeError, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use std::collections::VecDeque;
use std::{
Expand Down Expand Up @@ -225,10 +225,6 @@ impl ConnectionHandler for Handler {

fn on_behaviour_event(&mut self, _: Void) {}

fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -349,15 +345,17 @@ impl ConnectionHandler for Handler {
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol: stream,
protocol: mut stream,
..
}) => {
stream.ignore_for_keep_alive();
self.inbound = Some(protocol::recv_ping(stream).boxed());
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: stream,
protocol: mut stream,
..
}) => {
stream.ignore_for_keep_alive();
self.outbound = Some(OutboundState::Ping(
send_ping(stream, self.config.timeout).boxed(),
));
Expand Down
27 changes: 8 additions & 19 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
StreamUpgradeError, SubstreamProtocol,
};
use std::collections::VecDeque;
Expand Down Expand Up @@ -376,10 +376,6 @@ pub struct Handler {
///
/// Contains a [`futures::future::Future`] for each lend out substream that
/// resolves once the substream is dropped.
///
/// Once all substreams are dropped and this handler has no other work,
/// [`KeepAlive::Until`] can be set, allowing the connection to be closed
/// eventually.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<()>>,
/// Futures relaying data for circuit between two peers.
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
Expand Down Expand Up @@ -615,13 +611,12 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
match self.idle_at {
Some(idle_at) if Instant::now().duration_since(idle_at) > Duration::from_secs(10) => {
KeepAlive::No
}
_ => KeepAlive::Yes,
}
fn connection_keep_alive(&self) -> bool {
let Some(idle_at) = self.idle_at else {
return true;
};

Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
}

fn poll(
Expand Down Expand Up @@ -881,13 +876,7 @@ impl ConnectionHandler for Handler {
{}

// Check keep alive status.
if self.reservation_request_future.is_none()
&& self.circuit_accept_futures.is_empty()
&& self.circuit_deny_futures.is_empty()
&& self.alive_lend_out_substreams.is_empty()
&& self.circuits.is_empty()
&& self.active_reservation.is_none()
{
if self.active_reservation.is_none() {
if self.idle_at.is_none() {
self.idle_at = Some(Instant::now());
}
Expand Down
26 changes: 3 additions & 23 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use log::debug;
Expand Down Expand Up @@ -319,28 +319,8 @@ impl ConnectionHandler for Handler {
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if self.reservation.is_some() {
return KeepAlive::Yes;
}

if !self.alive_lend_out_substreams.is_empty() {
return KeepAlive::Yes;
}

if !self.circuit_deny_futs.is_empty() {
return KeepAlive::Yes;
}

if !self.open_circuit_futs.is_empty() {
return KeepAlive::Yes;
}

if !self.outbound_circuits.is_empty() {
return KeepAlive::Yes;
}

KeepAlive::No
fn connection_keep_alive(&self) -> bool {
self.reservation.is_some()
}

fn poll(
Expand Down
18 changes: 1 addition & 17 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use libp2p_swarm::handler::{
ListenUpgradeError,
};
use libp2p_swarm::{
handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError},
handler::{ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError},
SubstreamProtocol,
};
use smallvec::SmallVec;
Expand All @@ -59,8 +59,6 @@ where
/// The timeout for inbound and outbound substreams (i.e. request
/// and response processing).
substream_timeout: Duration,
/// The current connection keep-alive.
keep_alive: KeepAlive,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
Expand Down Expand Up @@ -94,7 +92,6 @@ where
Self {
inbound_protocols,
codec,
keep_alive: KeepAlive::Yes,
substream_timeout,
outbound: VecDeque::new(),
inbound: FuturesUnordered::new(),
Expand Down Expand Up @@ -274,14 +271,9 @@ where
}

fn on_behaviour_event(&mut self, request: Self::FromBehaviour) {
self.keep_alive = KeepAlive::Yes;
self.outbound.push_back(request);
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -300,7 +292,6 @@ where
match result {
Ok(((id, rq), rs_sender)) => {
// We received an inbound request.
self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request {
request_id: id,
request: rq,
Expand Down Expand Up @@ -330,13 +321,6 @@ where
self.outbound.shrink_to_fit();
}

if self.inbound.is_empty() && self.keep_alive.is_yes() {
// No new inbound or outbound requests. We already check
// there is no active streams exist in swarm connection,
// so we can set keep-alive to no directly.
self.keep_alive = KeepAlive::No;
}

Poll::Pending
}

Expand Down
7 changes: 3 additions & 4 deletions swarm/src/behaviour/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use crate::behaviour::FromSwarm;
use crate::connection::ConnectionId;
use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
};
use crate::upgrade::SendWrapper;
use crate::{
Expand Down Expand Up @@ -291,11 +290,11 @@ where
.on_behaviour_event(event)
}

fn connection_keep_alive(&self) -> KeepAlive {
fn connection_keep_alive(&self) -> bool {
self.inner
.as_ref()
.map(|h| h.connection_keep_alive())
.unwrap_or(KeepAlive::No)
.unwrap_or(false)
}

fn poll(
Expand Down
Loading

0 comments on commit fcd410a

Please sign in to comment.