diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index c7e031b1cfd..6bc86454f2d 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -27,6 +27,19 @@ tracing = { workspace = true } socket2 = "0.5.7" ring = { workspace = true } +sha2 = "0.10" +hex = "0.4" +hex-literal = "0.4" + +# WebTransport +http = "1.0.0" +h3 = { git = "https://github.com/hyperium/h3" } +h3-quinn = { git = "https://github.com/hyperium/h3" } +h3-webtransport = { git = "https://github.com/hyperium/h3" } +libp2p-noise = { workspace = true } +rcgen = "0.11.3" +time = "0.3" + [features] tokio = ["dep:tokio", "if-watch/tokio", "quinn/runtime-tokio"] async-std = ["dep:async-std", "if-watch/smol", "quinn/runtime-async-std"] diff --git a/transports/quic/src/certificate_manager.rs b/transports/quic/src/certificate_manager.rs new file mode 100644 index 00000000000..13f29c9d196 --- /dev/null +++ b/transports/quic/src/certificate_manager.rs @@ -0,0 +1,156 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::sync::Arc; + +use rustls; +use sha2::Digest; +use time::{Duration, OffsetDateTime}; + +use libp2p_core::multihash::Multihash; +use libp2p_tls::{ + certificate, P2P_ALPN, verifier, +}; + +const MULTIHASH_SHA256_CODE: u64 = 0x12; +const CERT_VALID_PERIOD: Duration = Duration::days(14); + +#[derive(Clone, Debug)] +pub(crate) struct ServerCertManager { + keypair: libp2p_identity::Keypair, + + items: Vec, +} + +#[derive(Clone, Debug)] +struct CertItem { + server_tls_config: Arc, + start: OffsetDateTime, + end: OffsetDateTime, + cert_hash: Multihash<64>, +} + +impl ServerCertManager { + pub(crate) fn new(keypair: libp2p_identity::Keypair) -> Self { + Self { + keypair, + items: Vec::with_capacity(3), + } + } + + /// Gets TLS server config and certificate hashes. + pub(crate) fn get_config(&mut self) -> Result<(Arc, Vec>), certificate::GenError> { + self.check_and_roll_items()?; + + let cur_item = self.items.first() + .expect("The first element exists"); + let cert_hashes = self.items.iter() + .map(|item| item.cert_hash.clone()) + .collect(); + + Ok((cur_item.server_tls_config.clone(), cert_hashes)) + } + + fn create_cert_item(&self, start: OffsetDateTime) -> Result { + let not_after = start.clone() + .checked_add(CERT_VALID_PERIOD) + .expect("Addition does not overflow"); + + let (cert, private_key) = + certificate::generate_webtransport_certificate(&self.keypair, start, not_after)?; + + let cert_hash = Multihash::wrap( + MULTIHASH_SHA256_CODE, sha2::Sha256::digest(cert.as_ref().as_ref()).as_ref(), + ).expect("fingerprint's len to be 32 bytes"); + + let mut tls_config = rustls::ServerConfig::builder() + .with_cipher_suites(verifier::CIPHERSUITES) + .with_safe_default_kx_groups() + .with_protocol_versions(verifier::PROTOCOL_VERSIONS) + .expect("Cipher suites and kx groups are configured; qed") + .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) + .with_single_cert(vec![cert], private_key) + .expect("Server cert key DER is valid; qed"); + + tls_config.alpn_protocols = alpn_protocols(); + + Ok(CertItem { server_tls_config: Arc::new(tls_config), start, end: not_after, cert_hash }) + } + + /// https://github.com/libp2p/specs/tree/master/webtransport#certificates + /// Servers need to take care of regularly renewing their certificates.At first boot of the node, + /// it creates one self-signed certificate with a validity of 14 days, starting immediately, + /// and another certificate with the 14 day validity period starting on the expiry date of the first certificate. + /// The node advertises a multiaddress containing the certificate hashes of these two certificates. + /// Once the first certificate has expired, the node starts using the already generated next certificate. + /// At the same time, it again generates a new certificate for the following period and updates the multiaddress it advertises. + fn check_and_roll_items(&mut self) -> Result<(), certificate::GenError> { + if self.items.len() == 0 { + let current = self.create_cert_item(OffsetDateTime::now_utc())?; + let next_start = current.end.clone(); + self.items.push(current); + self.items.push(self.create_cert_item(next_start)?); + } else { + let next = self.items.get(1) + .expect("Element with index 1 exists"); + + if OffsetDateTime::now_utc() >= next.start { + let next_start = next.end.clone(); + self.items.push(self.create_cert_item(next_start)?); + if self.items.len() == 3 { + self.items.remove(0); + } + } + }; + + Ok(()) + } +} + +fn alpn_protocols() -> Vec> { + vec![P2P_ALPN.to_vec(), + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), ] +} + +/* +#[cfg(test)] +mod tests { + use std::fmt::{Debug, Formatter}; + use rcgen::SerialNumber; + use ring::rand::{SecureRandom, SystemRandom}; + use ring::{hkdf, signature}; + use ring::error::Unspecified; + use ring::signature::EcdsaKeyPair; + use time::macros::datetime; + + #[test] + fn key_pair_generate() { + let alg = &signature::ECDSA_P256_SHA256_ASN1_SIGNING; + let rnd = SystemRandom::new(); + + let doc = EcdsaKeyPair::generate_pkcs8(alg, &rnd); + + assert!(doc.is_ok()) + } +}*/ \ No newline at end of file diff --git a/transports/quic/src/connection/connecting.rs b/transports/quic/src/connection/connecting.rs index f6e397b4d1e..cc806b55e34 100644 --- a/transports/quic/src/connection/connecting.rs +++ b/transports/quic/src/connection/connecting.rs @@ -34,6 +34,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use libp2p_core::muxing::StreamMuxerBox; /// A QUIC connection currently being negotiated. #[derive(Debug)] @@ -68,7 +69,7 @@ impl Connecting { } impl Future for Connecting { - type Output = Result<(PeerId, Connection), Error>; + type Output = Result<(PeerId, StreamMuxerBox), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let connection = match futures::ready!(self.connecting.poll_unpin(cx)) { @@ -78,6 +79,6 @@ impl Future for Connecting { let peer_id = Self::remote_peer_id(&connection); let muxer = Connection::new(connection); - Poll::Ready(Ok((peer_id, muxer))) + Poll::Ready(Ok((peer_id, StreamMuxerBox::new(muxer)))) } } diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 7ae649b6914..59b37c3acad 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -57,16 +57,19 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +mod certificate_manager; mod config; mod connection; mod hole_punching; mod provider; mod transport; +mod webtransport; use std::net::SocketAddr; pub use config::Config; pub use connection::{Connecting, Connection, Stream}; +use libp2p_tls::certificate; #[cfg(feature = "async-std")] pub use provider::async_std; @@ -101,6 +104,12 @@ pub enum Error { /// Error when holepunching for a remote is already in progress #[error("Already punching hole for {0}).")] HolePunchInProgress(SocketAddr), + + #[error(transparent)] + CertificateGenerationError(#[from] certificate::GenError), + + #[error("WebTransport error.")] + WebTransportError, } /// Dialing a remote peer failed. diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 9bd4c035cec..cf24c3c6f34 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -18,10 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::{Config, QuinnConfig}; +use crate::config::Config; use crate::hole_punching::hole_puncher; use crate::provider::Provider; -use crate::{ConnectError, Connecting, Connection, Error}; +use crate::{ConnectError, Connecting, Error, webtransport}; use futures::channel::oneshot; use futures::future::{BoxFuture, Either}; @@ -31,11 +31,7 @@ use futures::{prelude::*, stream::SelectAll}; use if_watch::IfEvent; -use libp2p_core::{ - multiaddr::{Multiaddr, Protocol}, - transport::{ListenerId, TransportError, TransportEvent}, - Transport, -}; +use libp2p_core::{multiaddr::{Multiaddr, Protocol}, muxing::StreamMuxerBox, transport::{ListenerId, TransportError, TransportEvent}, Transport}; use libp2p_identity::PeerId; use socket2::{Domain, Socket, Type}; use std::collections::hash_map::{DefaultHasher, Entry}; @@ -49,6 +45,7 @@ use std::{ pin::Pin, task::{Context, Poll, Waker}, }; +use libp2p_core::multihash::Multihash; /// Implementation of the [`Transport`] trait for QUIC. /// @@ -63,12 +60,8 @@ use std::{ /// See . #[derive(Debug)] pub struct GenTransport { - /// Config for the inner [`quinn`] structs. - quinn_config: QuinnConfig, - /// Timeout for the [`Connecting`] future. - handshake_timeout: Duration, - /// Whether draft-29 is supported for dialing and listening. - support_draft_29: bool, + /// Config for the transport. + config: Config, /// Streams of active [`Listener`]s. listeners: SelectAll>, /// Dialer for each socket family if no matching listener exists. @@ -76,22 +69,22 @@ pub struct GenTransport { /// Waker to poll the transport again when a new dialer or listener is added. waker: Option, /// Holepunching attempts - hole_punch_attempts: HashMap>, + hole_punch_attempts: HashMap< + SocketAddr, + oneshot::Sender< + BoxFuture<'static, Result< as Transport>::Output, as Transport>::Error>> + > + >, } impl GenTransport

{ /// Create a new [`GenTransport`] with the given [`Config`]. pub fn new(config: Config) -> Self { - let handshake_timeout = config.handshake_timeout; - let support_draft_29 = config.support_draft_29; - let quinn_config = config.into(); Self { + config: config.clone(), listeners: SelectAll::new(), - quinn_config, - handshake_timeout, dialer: HashMap::new(), waker: None, - support_draft_29, hole_punch_attempts: Default::default(), } } @@ -134,16 +127,30 @@ impl GenTransport

{ addr: Multiaddr, check_unspecified_addr: bool, ) -> Result< - (SocketAddr, ProtocolVersion, Option), + (SocketAddr, ProtocolVersion, Option, bool), TransportError<::Error>, > { - let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + let (socket_addr, version, peer_id, is_webtransport_addr) = multiaddr_to_socketaddr(&addr, self.config.support_draft_29) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) { return Err(TransportError::MultiaddrNotSupported(addr)); } - Ok((socket_addr, version, peer_id)) + Ok((socket_addr, version, peer_id, is_webtransport_addr)) + } + + /// Gets an endpoint with the same `socket_addr`. + /// To check that an endpoint uses the same cert we can compare cert hashes. + fn get_existed_endpoint(&mut self, socket_addr: &SocketAddr, cert_hashes: &Vec>) -> Option { + if let Some(listener) = self.listeners + .iter_mut() + .find(|l| { + !l.is_closed && &l.socket_addr() == socket_addr && l.same_cert_hashes(cert_hashes) + }) { + return Some(listener.endpoint.clone()) + } + + None } /// Pick any listener to use for dialing. @@ -198,9 +205,9 @@ impl GenTransport

{ } impl Transport for GenTransport

{ - type Output = (PeerId, Connection); + type Output = (PeerId, StreamMuxerBox); type Error = Error; - type ListenerUpgrade = Connecting; + type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; fn listen_on( @@ -208,19 +215,32 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?; - let endpoint_config = self.quinn_config.endpoint_config.clone(); - let server_config = self.quinn_config.server_config.clone(); - let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; + let (socket_addr, version, _peer_id, is_webtransport_addr) = self.remote_multiaddr_to_socketaddr(addr, false)?; + let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; let socket_c = socket.try_clone().map_err(Self::Error::from)?; - let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?; + + let (server_config, noise, cert_hashes) = self.config.server_quinn_config() + .map_err(|e| Error::CertificateGenerationError(e))?; + + let endpoint = match self.get_existed_endpoint(&socket_addr, &cert_hashes) { + Some(res) => res, + None => Self::new_endpoint(self.config.endpoint_config(), Some(server_config), socket)?, + }; + + let webtransport = WebTransport { + in_use: is_webtransport_addr, + cert_hashes, + noise, + }; + let listener = Listener::new( listener_id, socket_c, endpoint, - self.handshake_timeout, + self.config.handshake_timeout.clone(), version, + webtransport )?; self.listeners.push(listener); @@ -248,8 +268,8 @@ impl Transport for GenTransport

{ } fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { - if !is_quic_addr(listen, self.support_draft_29) - || !is_quic_addr(observed, self.support_draft_29) + if !is_quic_addr(listen, self.config.support_draft_29) + || !is_quic_addr(observed, self.config.support_draft_29) { return None; } @@ -257,7 +277,12 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, true)?; + let (socket_addr, version, _peer_id, is_webtransport) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; + + if is_webtransport { + // WebTransport implementation doesn't support dialling. + return Err(TransportError::MultiaddrNotSupported(addr)); + } let endpoint = match self.eligible_listener(&socket_addr) { None => { @@ -275,8 +300,7 @@ impl Transport for GenTransport

{ }; let socket = UdpSocket::bind(listen_socket_addr).map_err(Self::Error::from)?; - let endpoint_config = self.quinn_config.endpoint_config.clone(); - let endpoint = Self::new_endpoint(endpoint_config, None, socket)?; + let endpoint = Self::new_endpoint(self.config.endpoint_config(), None, socket)?; vacant.insert(endpoint.clone()); endpoint @@ -286,11 +310,12 @@ impl Transport for GenTransport

{ } Some(listener) => listener.endpoint.clone(), }; - let handshake_timeout = self.handshake_timeout; - let mut client_config = self.quinn_config.client_config.clone(); + let handshake_timeout = self.config.handshake_timeout; + let mut client_config = self.config.client_quinn_config(); if version == ProtocolVersion::Draft29 { client_config.version(0xff00_001d); } + Ok(Box::pin(async move { // This `"l"` seems necessary because an empty string is an invalid domain // name. While we don't use domain names, the underlying rustls library @@ -306,7 +331,7 @@ impl Transport for GenTransport

{ &mut self, addr: Multiaddr, ) -> Result> { - let (socket_addr, _version, peer_id) = + let (socket_addr, _version, peer_id, _is_webtransport) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; @@ -320,7 +345,7 @@ impl Transport for GenTransport

{ tracing::debug!("Preparing for hole-punch from {addr}"); - let hole_puncher = hole_puncher::

(socket, socket_addr, self.handshake_timeout); + let hole_puncher = hole_puncher::

(socket, socket_addr, self.config.handshake_timeout); let (sender, receiver) = oneshot::channel(); @@ -375,7 +400,7 @@ impl Transport for GenTransport

{ send_back_addr, } => { let socket_addr = - multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29) + multiaddr_to_socketaddr(&send_back_addr, self.config.support_draft_29) .unwrap() .0; @@ -424,6 +449,8 @@ struct Listener { /// An underlying copy of the socket to be able to hole punch with socket: UdpSocket, + webtransport: WebTransport, + /// A future to poll new incoming connections. accept: BoxFuture<'static, Option>, /// Timeout for connection establishment on inbound connections. @@ -453,6 +480,7 @@ impl Listener

{ endpoint: quinn::Endpoint, handshake_timeout: Duration, version: ProtocolVersion, + webtransport: WebTransport, ) -> Result { let if_watcher; let pending_event; @@ -464,7 +492,9 @@ impl Listener

{ } else { if_watcher = None; listening_addresses.insert(local_addr.ip()); - let ma = socketaddr_to_multiaddr(&local_addr, version); + let mut ma = socketaddr_to_multiaddr(&local_addr, version); + ma = webtransport.update_multiaddr(ma); + pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -486,6 +516,7 @@ impl Listener

{ pending_event, close_listener_waker: None, listening_addresses, + webtransport, }) } @@ -519,6 +550,19 @@ impl Listener

{ .expect("Cannot fail because the socket is bound") } + fn same_cert_hashes(&self, cert_hashes: &Vec>) -> bool { + if self.webtransport.cert_hashes.len() != cert_hashes.len() { + return false + } + let mut set: HashSet> = self.webtransport.cert_hashes + .clone().into_iter().collect(); + for hash in cert_hashes { + set.remove(hash); + } + + set.is_empty() + } + /// Poll for a next If Event. fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { let endpoint_addr = self.socket_addr(); @@ -528,9 +572,11 @@ impl Listener

{ loop { match ready!(P::poll_if_event(if_watcher, cx)) { Ok(IfEvent::Up(inet)) => { - if let Some(listen_addr) = + if let Some(mut listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.webtransport.update_multiaddr(listen_addr); + tracing::debug!( address=%listen_addr, "New listen address" @@ -543,9 +589,10 @@ impl Listener

{ } } Ok(IfEvent::Down(inet)) => { - if let Some(listen_addr) = + if let Some(mut listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.webtransport.update_multiaddr(listen_addr); tracing::debug!( address=%listen_addr, "Expired listen address" @@ -597,12 +644,39 @@ impl Stream for Listener

{ } }; + + /*let mut local_addr = socketaddr_to_multiaddr( + &self.socket_addr(), + self.version, + ); + local_addr = self.webtransport.update_multiaddr(local_addr);*/ + let local_addr = socketaddr_to_multiaddr(&self.socket_addr(), self.version); let remote_addr = connecting.remote_address(); - let send_back_addr = socketaddr_to_multiaddr(&remote_addr, self.version); + let send_back_addr = socketaddr_to_multiaddr( + &remote_addr, + self.version, + ); + + let timeout = self.handshake_timeout.clone(); + let fut = if self.webtransport.in_use { + let noise = self.webtransport.noise.clone(); + + async move { + webtransport::Connecting::new( + noise, + connecting, + timeout, + ).await + }.boxed() + } else { + async move { + Connecting::new(connecting, timeout).await + }.boxed() + }; let event = TransportEvent::Incoming { - upgrade: Connecting::new(connecting, self.handshake_timeout), + upgrade: fut, local_addr, send_back_addr, listener_id: self.listener_id, @@ -690,11 +764,13 @@ fn ip_to_listenaddr( fn multiaddr_to_socketaddr( addr: &Multiaddr, support_draft_29: bool, -) -> Option<(SocketAddr, ProtocolVersion, Option)> { +) -> Option<(SocketAddr, ProtocolVersion, Option, bool)> { let mut iter = addr.iter(); let proto1 = iter.next()?; let proto2 = iter.next()?; let proto3 = iter.next()?; + let proto4 = iter.next(); + let proto5 = iter.next(); let mut peer_id = None; for proto in iter { @@ -711,12 +787,22 @@ fn multiaddr_to_socketaddr( _ => return None, }; + let is_webtransport = match proto4 { + Some(Protocol::WebTransport) => { + if let Some(Protocol::Certhash(_)) = proto5 { + panic!("Cannot listen on a specific certhash for WebTransport addr {addr}"); + } + true + } + _ => false, + }; + match (proto1, proto2) { (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } _ => None, } @@ -750,7 +836,10 @@ fn is_quic_addr(addr: &Multiaddr, support_draft_29: bool) -> bool { } /// Turns an IP address and port into the corresponding QUIC multiaddr. -fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -> Multiaddr { +fn socketaddr_to_multiaddr( + socket_addr: &SocketAddr, + version: ProtocolVersion, +) -> Multiaddr { let quic_proto = match version { ProtocolVersion::V1 => Protocol::QuicV1, ProtocolVersion::Draft29 => Protocol::Quic, @@ -761,6 +850,27 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) - .with(quic_proto) } +struct WebTransport { + in_use: bool, + + cert_hashes: Vec>, + noise: libp2p_noise::Config, +} + +impl WebTransport { + pub(crate) fn update_multiaddr(&self, addr: Multiaddr) -> Multiaddr { + if self.in_use { + let mut vec = self.cert_hashes.clone(); + + return addr.with(Protocol::WebTransport) + .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))) + .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); + } + + addr + } +} + #[cfg(test)] #[cfg(any(feature = "async-std", feature = "tokio"))] mod tests { diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs new file mode 100644 index 00000000000..630d44c0b15 --- /dev/null +++ b/transports/quic/src/webtransport.rs @@ -0,0 +1,6 @@ +mod connection; +mod connecting; +mod stream; + +pub(crate) use connection::Connection; +pub(crate) use connecting::Connecting; \ No newline at end of file diff --git a/transports/quic/src/webtransport/connecting.rs b/transports/quic/src/webtransport/connecting.rs new file mode 100644 index 00000000000..3a912b9c932 --- /dev/null +++ b/transports/quic/src/webtransport/connecting.rs @@ -0,0 +1,173 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Future that drives a QUIC connection until is has performed its TLS handshake. +//! And then it drives a http3 connection based on the QUIC connection. +//! And then it drives a WebTransport session based on the http3 connection. + +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{ + future::{select, Select}, + prelude::*, +}; +use futures::future::{BoxFuture, Either}; +use futures_timer::Delay; +use h3::ext::Protocol; +use h3_webtransport::server::WebTransportSession; +use http::{Method, Request}; + +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_identity::PeerId; + +use crate::{Error, webtransport}; +use crate::webtransport::connecting::WebTransportError::{Http3Error, NoiseError}; + +const WEBTRANSPORT_PATH: &str = "/.well-known/libp2p-webtransport"; +const NOISE_QUERY: &str = "type=noise"; + +// #[derive(Debug)] +pub(crate) struct Connecting { + connecting: Select< + BoxFuture<'static, Result<(PeerId, StreamMuxerBox), WebTransportError>>, + Delay + >, +} + +impl Connecting { + pub(crate) fn new( + noise: libp2p_noise::Config, + connecting: quinn::Connecting, timeout: Duration) -> Self { + Connecting { + connecting: select(web_transport_connection(connecting, noise).boxed(), Delay::new(timeout)) + } + } +} + +impl Future for Connecting { + type Output = Result<(PeerId, StreamMuxerBox), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match futures::ready!(self.connecting.poll_unpin(cx)) { + Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)), + Either::Left((Ok((peer_id, muxer)), _)) => { + Ok((peer_id, muxer)) + } + Either::Left((Err(_e), _)) => return Poll::Ready(Err(Error::WebTransportError)), + }; + + Poll::Ready(res) + } +} + +fn remote_peer_id(connection: &quinn::Connection) -> PeerId { + let identity = connection + .peer_identity() + .expect("connection got identity because it passed TLS handshake; qed"); + let certificates: Box> = + identity.downcast().expect("we rely on rustls feature; qed"); + let end_entity = certificates + .first() + .expect("there should be exactly one certificate; qed"); + let p2p_cert = libp2p_tls::certificate::parse(end_entity) + .expect("the certificate was validated during TLS handshake; qed"); + p2p_cert.peer_id() +} + +async fn web_transport_connection(connecting: quinn::Connecting, noise: libp2p_noise::Config) + -> Result<(PeerId, StreamMuxerBox), WebTransportError> { + let quic_conn = connecting.await?; + + // info!("new http3 established"); + + let peer_id = remote_peer_id(&quic_conn); + + let mut h3_conn = h3::server::builder() + .enable_webtransport(true) + .enable_connect(true) + .enable_datagram(true) + .max_webtransport_sessions(1) + .send_grease(true) + .build(h3_quinn::Connection::new(quic_conn)) + .await + .unwrap(); + + match h3_conn.accept().await? { + Some((request, stream)) => { + let ext = request.extensions(); + let proto = ext.get::(); + if Some(&Protocol::WEB_TRANSPORT) == proto { + if check_request(&request) { + let session = WebTransportSession::accept(request, stream, h3_conn) + .await?; + let connection = webtransport::Connection::new(session, noise); + + Ok((peer_id, StreamMuxerBox::new(connection))) + } else { + Err(WebTransportError::BadRequest(request.method().clone())) + } + } else { + Err(WebTransportError::UnexpectedProtocol(proto.cloned())) + } + } + None => { + // indicating no more streams to be received + Err(WebTransportError::ClosedConnection) + } + } +} + +fn check_request(req: &Request<()>) -> bool { + req.method() == &Method::CONNECT && + req.uri().path() == WEBTRANSPORT_PATH && + req.uri().query() == Some(NOISE_QUERY) +} + +// #[derive(Debug)] +pub(crate) enum WebTransportError { + UnexpectedProtocol(Option), + BadRequest(Method), + ConnectionError(quinn::ConnectionError), + ClosedConnection, + Http3Error(h3::Error), + NoiseError(libp2p_noise::Error), +} + +impl From for WebTransportError { + fn from(e: libp2p_noise::Error) -> Self { + NoiseError(e) + } +} + +impl From for WebTransportError { + fn from(e: h3::Error) -> Self { + Http3Error(e) + } +} + +impl From for WebTransportError { + fn from(e: quinn::ConnectionError) -> Self { + WebTransportError::ConnectionError(e) + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs new file mode 100644 index 00000000000..dd66caae83b --- /dev/null +++ b/transports/quic/src/webtransport/connection.rs @@ -0,0 +1,137 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use h3_quinn::Connection as Http3Connection; +use h3_webtransport::server::WebTransportSession; +use bytes::Bytes; +use crate::Error; +use futures::{future::BoxFuture, FutureExt}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use std::sync::Arc; +use h3::quic::BidiStream; +use libp2p_core::upgrade::InboundConnectionUpgrade; +use libp2p_noise::Output; +use crate::webtransport::stream::Stream; + +/// State for a single opened WebTransport session. +pub(crate) struct Connection { + /// Underlying connection. + session: Arc>, + //Noise config to auth incoming connections. + noise: libp2p_noise::Config, + /// Future for accepting a new incoming bidirectional stream. + incoming: Option< + BoxFuture<'static, Output>, + >, + /// Future to wait for the connection to be closed. + closing: Option>, +} + +impl Connection { + /// Build a [`Connection`] from raw components. + /// + /// This function assumes that the [`quinn::Connection`] is completely fresh and none of + /// its methods has ever been called. Failure to comply might lead to logic errors and panics. + pub(crate) fn new( + session: WebTransportSession, + noise: libp2p_noise::Config, + ) -> Self { + Self { + session: Arc::new(session), + noise, + incoming: None, + closing: None, + } + } +} + +impl StreamMuxer for Connection { + type Substream = Output; + type Error = Error; + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + let t_session = Arc::clone(&this.session); + let t_noise = this.noise.clone(); + let incoming = this.incoming.get_or_insert_with(|| { + async move { + let res = t_session.accept_bi().await.unwrap(); + match res { + Some(h3_webtransport::server::AcceptedBi::BidiStream(_, stream)) => { + let (send, recv) = stream.split(); + let stream = Stream::new(send, recv); + + // todo should we apply `handshake_timeout` here? + let (_peer_id, out) = t_noise.upgrade_inbound(stream, "").await.unwrap(); + + out + } + _ => unreachable!("fix me!") + } + }.boxed() + }); + + let res = futures::ready!(incoming.poll_unpin(cx)); + this.incoming.take(); + Poll::Ready(Ok(res)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + panic!("WebTransport implementation doesn't support outbound streams.") + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + // TODO: If connection migration is enabled (currently disabled) address + // change on the connection needs to be handled. + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + /*let this = self.get_mut(); + + let closing = this.closing.get_or_insert_with(|| { + this.session. + // this.connection.close(From::from(0u32), &[]); + // let connection = this.connection.clone(); + async move { connection.closed().await }.boxed() + }); + + match futures::ready!(closing.poll_unpin(cx)) { + // Expected error given that `connection.close` was called above. + quinn::ConnectionError::LocallyClosed => {} + error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))), + };*/ + + Poll::Ready(Ok(())) + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/stream.rs b/transports/quic/src/webtransport/stream.rs new file mode 100644 index 00000000000..e9bd6e93aa3 --- /dev/null +++ b/transports/quic/src/webtransport/stream.rs @@ -0,0 +1,89 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + io::{self}, + pin::Pin, + task::{Context, Poll}, +}; +use bytes::Bytes; +use futures::{AsyncRead, AsyncWrite}; + +/// A single stream on a connection +pub(crate) struct Stream { + /// A send part of the stream + send: h3_webtransport::stream::SendStream, Bytes>, + /// A receive part of the stream + recv: h3_webtransport::stream::RecvStream, + /// Whether the stream is closed or not + close_result: Option>, +} + +impl Stream { + pub(super) fn new( + send: h3_webtransport::stream::SendStream, Bytes>, + recv: h3_webtransport::stream::RecvStream + ) -> Self { + Self { + send, + recv, + close_result: None, + } + } +} + +impl AsyncRead for Stream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + if let Some(close_result) = self.close_result { + if close_result.is_err() { + return Poll::Ready(Ok(0)); + } + } + Pin::new(&mut self.recv).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.send).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.send).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(close_result) = self.close_result { + // For some reason poll_close needs to be 'fuse'able + return Poll::Ready(close_result.map_err(Into::into)); + } + let close_result = futures::ready!(Pin::new(&mut self.send).poll_close(cx)); + self.close_result = Some(close_result.as_ref().map_err(|e| e.kind()).copied()); + Poll::Ready(close_result) + } +} \ No newline at end of file diff --git a/transports/tls/Cargo.toml b/transports/tls/Cargo.toml index a4817f20336..60c2888434c 100644 --- a/transports/tls/Cargo.toml +++ b/transports/tls/Cargo.toml @@ -19,6 +19,7 @@ thiserror = "1.0.61" webpki = { version = "0.101.4", package = "rustls-webpki", features = ["std"] } x509-parser = "0.16.0" yasna = "0.5.2" +time = "0.3" # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] diff --git a/transports/tls/src/certificate.rs b/transports/tls/src/certificate.rs index bbd353e32bd..7699ddb2823 100644 --- a/transports/tls/src/certificate.rs +++ b/transports/tls/src/certificate.rs @@ -24,7 +24,10 @@ use libp2p_identity as identity; use libp2p_identity::PeerId; -use x509_parser::{prelude::*, signature_algorithm::SignatureAlgorithm}; +use x509_parser::{ + der_parser, prelude::{oid_registry, X509Certificate, FromDer}, + signature_algorithm::SignatureAlgorithm, +}; /// The libp2p Public Key Extension is a X.509 extension /// with the Object Identifier 1.3.6.1.4.1.53594.1.1, @@ -80,6 +83,36 @@ pub fn generate( Ok((rustls_certificate, rustls_key)) } +/// Generates a self-signed TLS certificate with passed `not_before` and `not_after` dates +/// that includes a libp2p-specific certificate extension containing +/// the public key of the given keypair. +pub fn generate_webtransport_certificate( + identity_keypair: &identity::Keypair, + not_before: time::OffsetDateTime, + not_after: time::OffsetDateTime, +) -> Result<(rustls::Certificate, rustls::PrivateKey), GenError> { + let certificate_keypair = rcgen::KeyPair::generate(P2P_SIGNATURE_ALGORITHM)?; + let rustls_key = rustls::PrivateKey(certificate_keypair.serialize_der()); + + let certificate = { + let mut params = rcgen::CertificateParams::new(vec![]); + params.distinguished_name = rcgen::DistinguishedName::new(); + params.custom_extensions.push(make_libp2p_extension( + identity_keypair, + &certificate_keypair, + )?); + params.alg = P2P_SIGNATURE_ALGORITHM; + params.key_pair = Some(certificate_keypair); + params.not_before = not_before.into(); + params.not_after = not_after.into(); + rcgen::Certificate::from_params(params)? + }; + + let rustls_certificate = rustls::Certificate(certificate.serialize_der()?); + + Ok((rustls_certificate, rustls_key)) +} + /// Attempts to parse the provided bytes as a [`P2pCertificate`]. /// /// For this to succeed, the certificate must contain the specified extension and the signature must diff --git a/transports/tls/src/lib.rs b/transports/tls/src/lib.rs index 741a4b48077..e0b7360faf5 100644 --- a/transports/tls/src/lib.rs +++ b/transports/tls/src/lib.rs @@ -27,7 +27,7 @@ pub mod certificate; mod upgrade; -mod verifier; +pub mod verifier; use libp2p_identity::Keypair; use libp2p_identity::PeerId; @@ -37,7 +37,7 @@ pub use futures_rustls::TlsStream; pub use upgrade::Config; pub use upgrade::UpgradeError; -const P2P_ALPN: [u8; 6] = *b"libp2p"; +pub const P2P_ALPN: [u8; 6] = *b"libp2p"; /// Create a TLS client configuration for libp2p. pub fn make_client_config(