diff --git a/src/app/dispatcher.rs b/src/app/dispatcher.rs index 28de835..3feb7a7 100644 --- a/src/app/dispatcher.rs +++ b/src/app/dispatcher.rs @@ -1,5 +1,8 @@ -use std::{convert::TryFrom, sync::Arc, net::SocketAddr}; +use std::{convert::TryFrom, sync::Arc, net::SocketAddr, io}; +use anyhow::{ + anyhow +}; use log::{debug, error, trace}; use tokio::{ net::{TcpStream, UdpSocket}, @@ -8,11 +11,11 @@ use tokio::{ use crate::{ config::Config, - proxy::{Address, Session, StreamWrapperTrait, TcpOutboundHandlerTrait}, + proxy::{Address, Session, StreamWrapperTrait, TcpOutboundHandlerTrait, InboundDatagramTrait, AnyInboundDatagram, UdpOutboundHandlerTrait, AnyOutboundDatagram}, Context, }; -use super::{sniffer::Sniffer, DnsClient, OutboundManager, Router}; +use super::{sniffer::Sniffer, DnsClient, OutboundManager, Router, udp_association_manager::UdpAssociationManager}; // 负责将请求分发给不同的 代理协议 处理 pub struct Dispatcher { @@ -115,7 +118,21 @@ impl Dispatcher { }; } - pub async fn dispatch_udp(&self, _socket: UdpSocket, _sess: Session) {} + pub async fn dispatch_udp(&self, sess: Session) -> anyhow::Result{ + let outbound_tag = match self.router.route(&sess) { + Some(x) => x, + None => { + return Err(anyhow!("no outbound found for {}", &sess.destination)); + } + }; + let handler = match self.outbound_manager.get_handler(outbound_tag.as_ref()) { + Some(h) => h, + None => { + return Err(anyhow!("no handler found for tag {}", &*outbound_tag)) + } + }; + UdpOutboundHandlerTrait::handle(handler.as_ref(), self.ctx.clone(), &sess).await + } pub fn new( context: Arc, @@ -128,7 +145,7 @@ impl Dispatcher { ctx: context, dns_client, outbound_manager: outbound_manager, - router, + router } } } diff --git a/src/app/dns_client.rs b/src/app/dns_client.rs index 83fb962..e154653 100644 --- a/src/app/dns_client.rs +++ b/src/app/dns_client.rs @@ -136,12 +136,12 @@ impl DnsClient { SocketAddr::V4(_v4) => { // let bind_addr = get_default_ipv4_gateway()?; let bind_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - create_bounded_udp_socket(bind_addr)? + create_bounded_udp_socket(SocketAddr::new(bind_addr, 0))? } SocketAddr::V6(_v6) => { // let bind_addr = get_default_ipv6_gateway()?; let bind_addr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)); - create_bounded_udp_socket(bind_addr)? + create_bounded_udp_socket(SocketAddr::new(bind_addr, 0))? } }; match socket.send_to(&*request, server).await { diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 6da3926..1cd4895 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -13,15 +13,16 @@ use crate::{ }, }; -use super::{Dispatcher, InboundListener}; +use super::{Dispatcher, InboundListener, UdpAssociationManager}; // 统一管理全部 inbound 协议 pub struct InboundManager { handlers: HashMap>, configs: Vec, + nat: Arc, } impl InboundManager { - pub fn new(config: Vec) -> InboundManager { + pub fn new(config: Vec, nat: Arc) -> InboundManager { let mut handlers: HashMap> = HashMap::new(); // 迭代全部的inbound协议,并创建listener @@ -42,6 +43,7 @@ impl InboundManager { InboundManager { handlers, configs: config, + nat } } pub fn listen(mut self, dispatcher: Arc) -> Result> { @@ -63,7 +65,7 @@ impl InboundManager { continue; } }; - InboundListener::listen(dispatcher, handler.clone(), addr)? + InboundListener::listen(dispatcher, handler.clone(), addr, self.nat.clone())? } }; tasks.append(&mut future); @@ -71,7 +73,6 @@ impl InboundManager { } Ok(async { futures::future::join_all(tasks).await; - let a = 1; }.boxed()) } } diff --git a/src/app/listener.rs b/src/app/listener.rs index 2283020..b1e2e6c 100644 --- a/src/app/listener.rs +++ b/src/app/listener.rs @@ -1,18 +1,20 @@ use futures_util::{future::BoxFuture, FutureExt, StreamExt}; -use log::{error, info}; +use log::{debug, error, info}; use std::{io::Result, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, UdpSocket}, + sync::oneshot, }; use crate::{ + app::udp_association_manager::UdpPacket, proxy::{ - Address, AnyInboundHandler, InboundResult, Network, Session, - TcpInboundHandlerTrait, + Address, AnyInboundHandler, InboundResult, Network, Session, TcpInboundHandlerTrait, + UdpInboundHandlerTrait, }, }; -use super::dispatcher::Dispatcher; +use super::{dispatcher::Dispatcher, UdpAssociationManager}; pub struct InboundListener {} type TaskFuture = BoxFuture<'static, ()>; @@ -21,6 +23,7 @@ impl InboundListener { dispatcher: Arc, handler: AnyInboundHandler, addr: SocketAddr, + nat: Arc, ) -> Result> { let mut tasks: Vec = vec![]; if handler.has_tcp() { @@ -33,13 +36,11 @@ impl InboundListener { // 2. tcp_listener 不能依赖self,listen调用 dispatcher.clone() 后将 cloned dispatcher 传给 tcp_listener // 这就要求 tcp_listener 改为 InboundListener // 实在不想在 listen 糅合一堆代码,我在这里采用 2 - let f = - InboundListener::tcp_listener(handler.clone(), dispatcher.clone(), addr); + let f = InboundListener::tcp_listener(handler.clone(), dispatcher.clone(), addr); tasks.push(f); } if handler.has_udp() { - let f = - InboundListener::udp_listener(handler.clone(), dispatcher.clone(), addr); + let f = InboundListener::udp_listener(handler.clone(), dispatcher.clone(), addr, nat); tasks.push(f); } Ok(tasks) @@ -64,16 +65,16 @@ impl InboundListener { destination: Address::Ip(addr), network: Network::TCP, local_peer: local, - peer_address: conn.peer_addr().expect("peer") + peer_address: conn.peer_addr().expect("peer"), }; match TcpInboundHandlerTrait::handle(&*handler, session, conn).await { Ok(InboundResult::Stream(stream, mut sess)) => { dispatcher.dispatch_tcp(stream, &mut sess).await; } - Ok(InboundResult::Datagram(socket, sess)) => { - dispatcher.dispatch_udp(socket, sess).await; + Ok(InboundResult::Datagram(socket)) => { + // dispatcher.dispatch_udp(socket, sess).await; } - Ok(InboundResult::NOT_SUPPORTED) => { + Ok(InboundResult::NotSupported) => { error!("not supported"); } Err(err) => { @@ -87,22 +88,83 @@ impl InboundListener { return; } } - } - }.boxed(); + } + .boxed(); task } fn udp_listener( - _handler: AnyInboundHandler, - _dispatcher: Arc, + handler: AnyInboundHandler, + dispatcher: Arc, addr: SocketAddr, + nat: Arc, ) -> TaskFuture { let future = async move { - let _listener = UdpSocket::bind(addr).await.unwrap(); - info!("Udp listen at {}", addr); - () - }.boxed(); - // todo!("udp listener") + let socket = UdpSocket::bind(addr).await.unwrap(); + info!("Udp listening at {}", addr); + match UdpInboundHandlerTrait::handle(handler.as_ref(), socket).await { + Ok(res) => { + match res { + InboundResult::Datagram(send_recv_socket) => { + tokio::spawn(async move { + let mut buf = vec![0u8; 1024]; + let (source_addr, real_addr) = + match send_recv_socket.recv_from(&mut buf).await { + Ok(x) => x, + Err(err) => { + debug!("{}", err); + return; + } + }; + let (sender, mut receiver) = + tokio::sync::mpsc::channel::>(100); + let send_recv_socket1 = send_recv_socket.clone(); + tokio::spawn(async move { + // inbound <- send <- tunnel <- recv <- outbound + loop { + let res = match receiver.recv().await { + Some(x) => x, + None => { + // closed + debug!("{} closed channel", &source_addr); + return; + } + }; + match send_recv_socket1 + .send_to(res.as_ref(), source_addr.clone()) + .await + { + Ok(_) => {} + Err(err) => { + debug!("{}", err); + return; + } + } + } + }); + let packet = UdpPacket { + data: buf, + dest: real_addr.clone(), + }; + match nat + .send_packet(real_addr, source_addr, packet, sender) + .await + { + Ok(_) => {} + Err(err) => { + debug!("{}", err); + } + } + }); + } + InboundResult::Stream(stream, sess) => {} + InboundResult::NotSupported => {} + } + } + Err(err) => {} + } + } + .boxed(); future } } diff --git a/src/app/mod.rs b/src/app/mod.rs index e9701d1..cc6089d 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -23,3 +23,6 @@ pub use sniffer::Sniffer; mod router; pub use router::Router; + +mod udp_association_manager; +pub use self::udp_association_manager::UdpAssociationManager; \ No newline at end of file diff --git a/src/app/udp_association_manager.rs b/src/app/udp_association_manager.rs new file mode 100644 index 0000000..732969b --- /dev/null +++ b/src/app/udp_association_manager.rs @@ -0,0 +1,127 @@ +use std::{collections::HashMap, io, net::SocketAddr, sync::Arc}; + +use log::{debug, error, trace}; +use tokio::sync::{ + mpsc::{self, Sender}, + Mutex, +}; + +use crate::proxy::{Address, Network, Session}; + +use super::Dispatcher; + +pub struct UdpPacket { + pub data: Vec, + pub dest: Address, +} +type SyncMap = Arc)>>>; +pub struct UdpAssociationManager { + store: SyncMap, + dispatcher: Arc, +} + +impl UdpAssociationManager { + pub fn new(dispatcher: Arc) -> Self { + Self { + store: Arc::new(Mutex::new(HashMap::new())), + dispatcher, + } + } + + pub async fn send_packet( + &self, + dest: Address, + source_addr: SocketAddr, + data: UdpPacket, + local_sender: Sender>, + ) -> anyhow::Result<()> { + let map = self.store.lock().await; + if map.contains_key(&source_addr) { + self.do_send(&source_addr, data).await; + return Ok(()); + } + let sess = Session { + destination: dest, + network: Network::UDP, + peer_address: source_addr, + ..Default::default() + }; + self.add_association(sess, local_sender).await?; + self.do_send(&source_addr, data).await; + Ok(()) + } + async fn add_association( + &self, + sess: Session, + local_sender: Sender>, + ) -> anyhow::Result<()> { + let (mut remote_socket_sender, mut remote_socket_receiver) = + mpsc::channel::(100); + let socket = self.dispatcher.dispatch_udp(sess.clone()).await?; + let socket1 = socket.clone(); + tokio::spawn(async move { + loop { + // inbound -> send -> tunnel -> recv -> outbound + match remote_socket_receiver.recv().await { + Some(msg) => { + let UdpPacket { ref data, dest } = msg; + match socket1.send_to(data, dest).await { + Ok(res) => {} + Err(err) => { + debug!("{}", err) + } + } + } + None => { + debug!("received none, closed"); + //closed + return; + } + }; + } + }); + let socket2 = socket.clone(); + tokio::spawn(async move { + // inbound <- send <- tunnel <- recv <- outbound + let mut buf = vec![0u8; 1024]; + loop { + match socket2.recv_from(&mut buf).await { + Ok(x) => match local_sender.try_send(buf.clone()) { + Ok(_) => {} + Err(err) => { + debug!("{}", err); + return; + } + }, + Err(err) => { + trace!("{}", err); + } + } + } + }); + let mut map = self.store.lock().await; + map.insert(sess.peer_address, remote_socket_sender); + Ok(()) + } + async fn do_send(&self, source_addr: &SocketAddr, packet: UdpPacket) { + let map = self.store.lock().await; + let sender = match map.get(source_addr) { + Some(x) => x, + None => { + error!("no sender for {} found", &source_addr); + return; + } + }; + let sender = sender.clone(); + tokio::spawn(async move { + // inbound -> send -> tunnel -> recv -> outbound + match sender.try_send(packet) { + Ok(_) => {} + Err(err) => { + debug!("send to remote should success but failed with err {}", err); + return; + } + }; + }); + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 8daba25..3eb3706 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,2 +1,9 @@ #[cfg(target_os = "linux")] pub mod linux; +use std::net::SocketAddr; + +use lazy_static::lazy_static; + +lazy_static!{ + pub static ref UNKNOWN_SOCKET_ADDR: SocketAddr = "0.0.0.0:0".parse::().unwrap(); +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 399145c..c465711 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,23 +1,21 @@ +pub mod app; mod common; -mod net; pub mod config; -pub mod app; +mod net; pub mod proxy; -use std::{sync::{Arc, Once}}; +use std::sync::{Arc, Once}; -use app::{Dispatcher, DnsClient, InboundManager, OutboundManager, Router}; +use app::{Dispatcher, DnsClient, InboundManager, OutboundManager, Router, UdpAssociationManager}; use futures::future::BoxFuture; +use anyhow::anyhow; use log4rs::{ append::console::ConsoleAppender, config::{Appender, Logger, Root}, encode::pattern::PatternEncoder, }; -use anyhow::{ - anyhow -}; -use tokio::{sync::{RwLock}}; +use tokio::sync::RwLock; pub use self::config::{load_from_file, parse_from_str}; @@ -32,45 +30,51 @@ impl Context { } pub fn newRuntime() -> tokio::runtime::Runtime { - let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); runtime } -pub fn start(config: config::Config, shutdown_handler: BoxFuture<'static, ()>) -> anyhow::Result<()> { +pub fn start( + config: config::Config, + shutdown_handler: BoxFuture<'static, ()>, +) -> anyhow::Result<()> { let mut tasks = Vec::new(); let stdout_logger = ConsoleAppender::builder() - .encoder(Box::new(PatternEncoder::new( - "{d} {h({l})} {f}:{L} {m} {n}", - ))) + .encoder(Box::new(PatternEncoder::new( + "{d} {h({l})} {f}:{L} {m} {n}", + ))) .build(); - let logger_config = log4rs::Config::builder() + let logger_config = log4rs::Config::builder() .appender(Appender::builder().build("stdout", Box::new(stdout_logger))) .logger(Logger::builder().build("tunnel", log::LevelFilter::Trace)) .build( Root::builder() - .appender("stdout") - .build(log::LevelFilter::Error), + .appender("stdout") + .build(log::LevelFilter::Error), ) .unwrap(); - static ONCE: Once = Once::new(); - ONCE.call_once(|| { - let _handler = log4rs::init_config(logger_config).unwrap(); - }); - - let inbound_manager = InboundManager::new(config.inbounds.clone()); - let outbound_manager = Arc::new(OutboundManager::new(config.outbounds.clone())?); - let router = Arc::new(Router::new(config.routes.clone())); - let dns_client = Arc::new(RwLock::new(DnsClient::new(config.clone()))); - let context = Arc::new(Context::new(dns_client.clone())); - - let dispatcher = Arc::new(Dispatcher::new( - context.clone(), - router.clone(), - dns_client.clone(), - outbound_manager.clone(), - config.clone(), - )); - + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + let _handler = log4rs::init_config(logger_config).unwrap(); + }); + let outbound_manager = Arc::new(OutboundManager::new(config.outbounds.clone())?); + let router = Arc::new(Router::new(config.routes.clone())); + let dns_client = Arc::new(RwLock::new(DnsClient::new(config.clone()))); + let context = Arc::new(Context::new(dns_client.clone())); + let dispatcher = Arc::new(Dispatcher::new( + context.clone(), + router.clone(), + dns_client.clone(), + outbound_manager.clone(), + config.clone(), + )); + + let nat = UdpAssociationManager::new(dispatcher.clone()); + let inbound_manager = InboundManager::new(config.inbounds.clone(), Arc::new(nat)); + let inbound_futures = match inbound_manager.listen(dispatcher.clone()) { Ok(x) => x, Err(err) => { diff --git a/src/proxy/direct/mod.rs b/src/proxy/direct/mod.rs index 9bee5a9..2396006 100644 --- a/src/proxy/direct/mod.rs +++ b/src/proxy/direct/mod.rs @@ -5,7 +5,7 @@ use tokio::net::{TcpStream, UdpSocket}; use crate::Context; -use super::{TcpOutboundHandlerTrait, Session, UdpOutboundHandlerTrait, connect_to_remote_tcp, connect_to_remote_udp}; +use super::{TcpOutboundHandlerTrait, Session, UdpOutboundHandlerTrait, connect_to_remote_tcp, connect_to_remote_udp, AnyInboundDatagram, SimpleOutboundSocket, AnyOutboundDatagram}; pub struct TcpOutboundHandler{} @@ -20,8 +20,10 @@ pub struct UdpOutboundHandler{} #[async_trait] impl UdpOutboundHandlerTrait for UdpOutboundHandler { - async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result { - connect_to_remote_udp(ctx.dns_client.clone(), sess.local_peer, sess.destination.clone() - ).await + async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result { + let socket = connect_to_remote_udp(ctx.dns_client.clone(), sess.local_peer).await?; + Ok(Arc::new(SimpleOutboundSocket{ + socket + })) } } \ No newline at end of file diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index d964dc8..14e2db0 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -16,7 +16,7 @@ use tokio::{ net::{TcpSocket, UdpSocket, TcpStream}, sync::RwLock, }; -use crate::{app::DnsClient, Context}; +use crate::{app::DnsClient, Context, common::UNKNOWN_SOCKET_ADDR}; #[cfg(target_os = "unix")] mod tun; @@ -59,6 +59,12 @@ impl Address { } +impl Default for Address { + fn default() -> Self { + Address::Ip(*UNKNOWN_SOCKET_ADDR) + } +} + impl Display for Address { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let str = match self { @@ -112,6 +118,7 @@ pub struct Session { // 真正要连接的 remote pub destination: Address, // 连接到本地代理服务器的remote + // listen之后socket#local_addr() // local_peer <=> tunnel pub local_peer: SocketAddr, // 连接到本地的对端socket @@ -119,6 +126,16 @@ pub struct Session { pub network: Network } +impl Default for Session { + fn default() -> Self { + Self { + local_peer: *UNKNOWN_SOCKET_ADDR, + network: Network::TCP, + peer_address: *UNKNOWN_SOCKET_ADDR, + .. Default::default() + } + } +} impl Session { pub fn port (&self) -> u16{ match self.destination { @@ -128,13 +145,13 @@ impl Session { } } -pub fn create_bounded_udp_socket(addr: IpAddr) -> io::Result { +pub fn create_bounded_udp_socket(addr: SocketAddr) -> io::Result { let socket = match addr { - IpAddr::V4(..) => Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?, - IpAddr::V6(..) => Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?, + SocketAddr::V4(..) => Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?, + SocketAddr::V6(..) => Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?, }; // let s: SockAddr = ; - match socket.bind(&SockAddr::from(SocketAddr::new(addr, 0))) { + match socket.bind(&addr.into()) { Ok(..) => {}, Err(err) => { log::error!("failed to bind socket {}", err.to_string()) @@ -163,10 +180,54 @@ pub fn create_bounded_tcp_socket(addr: SocketAddr) -> io::Result { // ---------------------------- // INBOUND + +#[async_trait] +pub trait InboundDatagramTrait: Sync + Send + Unpin{ + // buf, source addr, real socket addr + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(SocketAddr, Address)>; + async fn send_to(&self, buf: &[u8], dest: SocketAddr) -> io::Result; +} + +#[async_trait] +pub trait OutboundDatagramTrait: Sync + Send + Unpin { + async fn send_to(&self, buf: &[u8], dest: Address) -> io::Result; + async fn recv_from(&self, buf: &mut [u8]) -> io::Result
; +} +pub struct SimpleOutboundSocket { + socket: UdpSocket +} +impl From for SimpleOutboundSocket { + fn from(socket: UdpSocket) -> Self { + Self { + socket + } + } +} + +#[async_trait] +impl OutboundDatagramTrait for SimpleOutboundSocket { + async fn recv_from(&self, mut buf: &mut [u8]) -> io::Result
{ + let (n, dest) = self.socket.recv_from(&mut buf).await?; + Ok(Address::Ip(dest)) + } + async fn send_to(&self, buf: &[u8], dest: Address) -> io::Result { + let addr = match dest { + Address::Ip(x) =>x, + _ => { + return Err(io::Error::new(io::ErrorKind::Other, "simple datagram destination shouldn't be domain name")); + } + }; + self.socket.send_to(buf.as_ref(), addr).await + } +} + + +pub type AnyInboundDatagram = Arc; +pub type AnyOutboundDatagram = Arc; pub enum InboundResult { Stream(TcpStream, Session), - Datagram(UdpSocket, Session), - NOT_SUPPORTED + Datagram(AnyInboundDatagram), + NotSupported } pub type AnyTcpInboundHandler = Arc; @@ -198,16 +259,16 @@ impl TcpInboundHandlerTrait for InboundHandler { if let Some(handler) = &self.tcp_handler { return handler.handle(sess, stream).await; } - Ok(InboundResult::NOT_SUPPORTED) + Ok(InboundResult::NotSupported) } } #[async_trait] impl UdpInboundHandlerTrait for InboundHandler { - async fn handle(&self, sess: Session, socket: UdpSocket) -> io::Result { + async fn handle(&self, socket: UdpSocket) -> io::Result { if let Some(handler) = &self.udp_handler { - return handler.handle(sess, socket).await; + return handler.handle(socket).await; } - Ok(InboundResult::NOT_SUPPORTED) + Ok(InboundResult::NotSupported) } } @@ -229,7 +290,7 @@ pub trait TcpInboundHandlerTrait: Sync + Send + Unpin { #[async_trait] pub trait UdpInboundHandlerTrait: Sync + Send + Unpin { - async fn handle(&self, session: Session, socket: tokio::net::UdpSocket) -> io::Result; + async fn handle(&self, socket: tokio::net::UdpSocket) -> io::Result; } // OUTBOUND @@ -260,7 +321,7 @@ pub enum Error { #[async_trait] pub trait UdpOutboundHandlerTrait: Send + Sync + Unpin { - async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result; + async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result; } pub type AnyTcpOutboundHandler = Arc; @@ -279,6 +340,20 @@ impl OutboundHandler { OutboundHandler { tag , tcp_handler: tcp, udp_handler: udp } } } +#[async_trait] +impl UdpOutboundHandlerTrait for OutboundHandler { + async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result { + + todo!() + } +} + +#[async_trait] +impl TcpOutboundHandlerTrait for OutboundHandler { + async fn handle(&self, ctx: Arc, sess: &Session) -> anyhow::Result { + todo!() + } +} pub trait StreamWrapperTrait: AsyncRead + AsyncWrite + Send + Sync + Unpin{} impl StreamWrapperTrait for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin {} @@ -292,6 +367,13 @@ pub async fn connect_to_remote_tcp(dns_client:Arc>, addr: Addr Err(err.into()) }) } +pub async fn connect_to_remote_udp(dns_client: Arc>, source_addr: SocketAddr) -> anyhow::Result { + let any_addr = match source_addr { + SocketAddr::V4(_) => "0.0.0.0:0".parse::().unwrap(), + SocketAddr::V6(_) => "[::]:0".parse::().unwrap(), + }; + create_bounded_udp_socket(any_addr).map_err(|x|anyhow!("create bounded udp socket failed")) +} pub async fn name_to_socket_addr(dns_client: Arc>, addr: Address) -> anyhow::Result { let socket_addr = match addr { @@ -314,11 +396,4 @@ pub async fn name_to_socket_addr(dns_client: Arc>, addr: Addre Address::Ip(addr) => addr }; Ok(socket_addr) -} - -pub async fn connect_to_remote_udp(dns_client: Arc>, local: SocketAddr, peer: Address) -> anyhow::Result { - let socket = UdpSocket::bind(local).await?; - let socket_addr = name_to_socket_addr(dns_client, peer).await?; - UdpSocket::connect(&socket, socket_addr).await?; - Ok(socket) } \ No newline at end of file diff --git a/src/proxy/shadowsocks/mod.rs b/src/proxy/shadowsocks/mod.rs index 6056d61..8f4759b 100644 --- a/src/proxy/shadowsocks/mod.rs +++ b/src/proxy/shadowsocks/mod.rs @@ -259,6 +259,7 @@ pub struct ShadowsocksDatagram { // <-target_address || forwarded data-> // nonce 都从1开始,但由于UDP只使用一次,所以nonce一直都是 1 ? // https://github.com/v2fly/v2ray-core/blob/3ef7feaeaf737d05c5a624c580633b7ce0f0f1be/common/crypto/auth.go#L73 +// 不过也没关系了,毕竟每次salt都是随机的 // Since shadowsocks.org offline, I can't found original UDP spec for shadowsocks ever. // following spec copy from https://github-wiki-see.page/m/shadowsocks/shadowsocks-org/wiki/AEAD-Ciphers diff --git a/src/proxy/socks/inbound.rs b/src/proxy/socks/inbound.rs index 6967241..cb5d568 100644 --- a/src/proxy/socks/inbound.rs +++ b/src/proxy/socks/inbound.rs @@ -1,10 +1,10 @@ use log::error; -use std::io; +use std::{io, net::SocketAddr, sync::Arc}; use crate::{ proxy::{ socks::handshake_as_server, Session, InboundResult, TcpInboundHandlerTrait, - UdpInboundHandlerTrait, + UdpInboundHandlerTrait, InboundDatagramTrait, Address, }, }; use async_trait::async_trait; @@ -30,7 +30,7 @@ pub struct UdpInboundHandler; #[async_trait] impl UdpInboundHandlerTrait for UdpInboundHandler { - async fn handle(&self, conn: Session, socket: UdpSocket) -> io::Result { + async fn handle(&self, socket: UdpSocket) -> io::Result { // socks5 对 udp 会有单独的连接流程 // 由于 udp 的connectionless 特性,所以 client 只发送一次,header, data 都包含在其中 // https://datatracker.ietf.org/doc/html/rfc1928#section-7 @@ -41,6 +41,40 @@ impl UdpInboundHandlerTrait for UdpInboundHandler { // does not support fragmentation MUST drop any datagram whose FRAG // field is other than X'00'. // https://github.com/iamwwc/v2ray-core/blob/02f251ebecbf21095c7b74cb3f0feaed0927d3f9/proxy/socks/protocol.go#L321 - Ok(InboundResult::Datagram(socket, conn)) + let udp = Socks5Datagram { + socket + }; + Ok(InboundResult::Datagram(Arc::new(udp))) + } +} + +struct Socks5Datagram { + socket: UdpSocket +} + +#[async_trait] +impl InboundDatagramTrait for Socks5Datagram { + async fn recv_from(&self, buf: &mut [u8]) ->io::Result<(SocketAddr, Address)> { + let mut buf = vec![0u8; 1024]; + match self.socket.recv_from(&mut buf).await { + Ok((n, source_addr)) => { + + }, + Err(err) => { + + } + } + todo!() + } + async fn send_to(&self, buf: &[u8], dest: SocketAddr) ->io::Result { + match self.socket.send_to(&buf, dest).await { + Ok(n) => { + + } + Err(err) => { + + } + } + todo!() } } diff --git a/src/proxy/socks/outbound.rs b/src/proxy/socks/outbound.rs index 39b723f..e906e4d 100644 --- a/src/proxy/socks/outbound.rs +++ b/src/proxy/socks/outbound.rs @@ -7,7 +7,7 @@ use tokio::{net::{TcpStream, UdpSocket}}; use crate::{ proxy::{ connect_to_remote_tcp, Address, Session, TcpOutboundHandlerTrait, - UdpOutboundHandlerTrait, + UdpOutboundHandlerTrait, AnyInboundDatagram, AnyOutboundDatagram, }, Context, }; @@ -40,7 +40,7 @@ pub struct UdpOutboundHandler { #[async_trait] impl UdpOutboundHandlerTrait for UdpOutboundHandler { - async fn handle(&self, _ctx: Arc, _session: &Session) -> anyhow::Result { + async fn handle(&self, _ctx: Arc, _session: &Session) -> anyhow::Result { todo!() } }