Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add udp #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/app/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::{convert::TryFrom, sync::Arc, net::SocketAddr};
use std::{convert::TryFrom, sync::Arc, net::SocketAddr, io};

use anyhow::{
anyhow
};
use log::{debug, error, trace};
use tokio::{
net::{TcpStream, UdpSocket},
Expand All @@ -8,11 +11,11 @@ use tokio::{

use crate::{
config::Config,
proxy::{Address, Session, StreamWrapperTrait, TcpOutboundHandlerTrait},
proxy::{Address, Session, StreamWrapperTrait, TcpOutboundHandlerTrait, InboundDatagramTrait, AnyInboundDatagram, UdpOutboundHandlerTrait, AnyOutboundDatagram},
Context,
};

use super::{sniffer::Sniffer, DnsClient, OutboundManager, Router};
use super::{sniffer::Sniffer, DnsClient, OutboundManager, Router, udp_association_manager::UdpAssociationManager};

// 负责将请求分发给不同的 代理协议 处理
pub struct Dispatcher {
Expand Down Expand Up @@ -115,7 +118,21 @@ impl Dispatcher {
};
}

pub async fn dispatch_udp(&self, _socket: UdpSocket, _sess: Session) {}
pub async fn dispatch_udp(&self, sess: Session) -> anyhow::Result<AnyOutboundDatagram>{
let outbound_tag = match self.router.route(&sess) {
Some(x) => x,
None => {
return Err(anyhow!("no outbound found for {}", &sess.destination));
}
};
let handler = match self.outbound_manager.get_handler(outbound_tag.as_ref()) {
Some(h) => h,
None => {
return Err(anyhow!("no handler found for tag {}", &*outbound_tag))
}
};
UdpOutboundHandlerTrait::handle(handler.as_ref(), self.ctx.clone(), &sess).await
}

pub fn new(
context: Arc<Context>,
Expand All @@ -128,7 +145,7 @@ impl Dispatcher {
ctx: context,
dns_client,
outbound_manager: outbound_manager,
router,
router
}
}
}
4 changes: 2 additions & 2 deletions src/app/dns_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ impl DnsClient {
SocketAddr::V4(_v4) => {
// let bind_addr = get_default_ipv4_gateway()?;
let bind_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
create_bounded_udp_socket(bind_addr)?
create_bounded_udp_socket(SocketAddr::new(bind_addr, 0))?
}
SocketAddr::V6(_v6) => {
// let bind_addr = get_default_ipv6_gateway()?;
let bind_addr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0));
create_bounded_udp_socket(bind_addr)?
create_bounded_udp_socket(SocketAddr::new(bind_addr, 0))?
}
};
match socket.send_to(&*request, server).await {
Expand Down
9 changes: 5 additions & 4 deletions src/app/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ use crate::{
},
};

use super::{Dispatcher, InboundListener};
use super::{Dispatcher, InboundListener, UdpAssociationManager};
// 统一管理全部 inbound 协议
pub struct InboundManager {
handlers: HashMap<String, Arc<InboundHandler>>,
configs: Vec<Inbound>,
nat: Arc<UdpAssociationManager>,
}

impl InboundManager {
pub fn new(config: Vec<Inbound>) -> InboundManager {
pub fn new(config: Vec<Inbound>, nat: Arc<UdpAssociationManager>) -> InboundManager {
let mut handlers: HashMap<String, Arc<InboundHandler>> = HashMap::new();

// 迭代全部的inbound协议,并创建listener
Expand All @@ -42,6 +43,7 @@ impl InboundManager {
InboundManager {
handlers,
configs: config,
nat
}
}
pub fn listen(mut self, dispatcher: Arc<Dispatcher>) -> Result<BoxFuture<'static, ()>> {
Expand All @@ -63,15 +65,14 @@ impl InboundManager {
continue;
}
};
InboundListener::listen(dispatcher, handler.clone(), addr)?
InboundListener::listen(dispatcher, handler.clone(), addr, self.nat.clone())?
}
};
tasks.append(&mut future);
}
}
Ok(async {
futures::future::join_all(tasks).await;
let a = 1;
}.boxed())
}
}
104 changes: 83 additions & 21 deletions src/app/listener.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use log::{error, info};
use log::{debug, error, info};
use std::{io::Result, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, UdpSocket},
sync::oneshot,
};

use crate::{
app::udp_association_manager::UdpPacket,
proxy::{
Address, AnyInboundHandler, InboundResult, Network, Session,
TcpInboundHandlerTrait,
Address, AnyInboundHandler, InboundResult, Network, Session, TcpInboundHandlerTrait,
UdpInboundHandlerTrait,
},
};

use super::dispatcher::Dispatcher;
use super::{dispatcher::Dispatcher, UdpAssociationManager};

pub struct InboundListener {}
type TaskFuture = BoxFuture<'static, ()>;
Expand All @@ -21,6 +23,7 @@ impl InboundListener {
dispatcher: Arc<Dispatcher>,
handler: AnyInboundHandler,
addr: SocketAddr,
nat: Arc<UdpAssociationManager>,
) -> Result<Vec<TaskFuture>> {
let mut tasks: Vec<TaskFuture> = vec![];
if handler.has_tcp() {
Expand All @@ -33,13 +36,11 @@ impl InboundListener {
// 2. tcp_listener 不能依赖self,listen调用 dispatcher.clone() 后将 cloned dispatcher 传给 tcp_listener
// 这就要求 tcp_listener 改为 InboundListener
// 实在不想在 listen 糅合一堆代码,我在这里采用 2
let f =
InboundListener::tcp_listener(handler.clone(), dispatcher.clone(), addr);
let f = InboundListener::tcp_listener(handler.clone(), dispatcher.clone(), addr);
tasks.push(f);
}
if handler.has_udp() {
let f =
InboundListener::udp_listener(handler.clone(), dispatcher.clone(), addr);
let f = InboundListener::udp_listener(handler.clone(), dispatcher.clone(), addr, nat);
tasks.push(f);
}
Ok(tasks)
Expand All @@ -64,16 +65,16 @@ impl InboundListener {
destination: Address::Ip(addr),
network: Network::TCP,
local_peer: local,
peer_address: conn.peer_addr().expect("peer")
peer_address: conn.peer_addr().expect("peer"),
};
match TcpInboundHandlerTrait::handle(&*handler, session, conn).await {
Ok(InboundResult::Stream(stream, mut sess)) => {
dispatcher.dispatch_tcp(stream, &mut sess).await;
}
Ok(InboundResult::Datagram(socket, sess)) => {
dispatcher.dispatch_udp(socket, sess).await;
Ok(InboundResult::Datagram(socket)) => {
// dispatcher.dispatch_udp(socket, sess).await;
}
Ok(InboundResult::NOT_SUPPORTED) => {
Ok(InboundResult::NotSupported) => {
error!("not supported");
}
Err(err) => {
Expand All @@ -87,22 +88,83 @@ impl InboundListener {
return;
}
}

}
}.boxed();
}
.boxed();
task
}
fn udp_listener(
_handler: AnyInboundHandler,
_dispatcher: Arc<Dispatcher>,
handler: AnyInboundHandler,
dispatcher: Arc<Dispatcher>,
addr: SocketAddr,
nat: Arc<UdpAssociationManager>,
) -> TaskFuture {
let future = async move {
let _listener = UdpSocket::bind(addr).await.unwrap();
info!("Udp listen at {}", addr);
()
}.boxed();
// todo!("udp listener")
let socket = UdpSocket::bind(addr).await.unwrap();
info!("Udp listening at {}", addr);
match UdpInboundHandlerTrait::handle(handler.as_ref(), socket).await {
Ok(res) => {
match res {
InboundResult::Datagram(send_recv_socket) => {
tokio::spawn(async move {
let mut buf = vec![0u8; 1024];
let (source_addr, real_addr) =
match send_recv_socket.recv_from(&mut buf).await {
Ok(x) => x,
Err(err) => {
debug!("{}", err);
return;
}
};
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Vec<u8>>(100);
let send_recv_socket1 = send_recv_socket.clone();
tokio::spawn(async move {
// inbound <- send <- tunnel <- recv <- outbound
loop {
let res = match receiver.recv().await {
Some(x) => x,
None => {
// closed
debug!("{} closed channel", &source_addr);
return;
}
};
match send_recv_socket1
.send_to(res.as_ref(), source_addr.clone())
.await
{
Ok(_) => {}
Err(err) => {
debug!("{}", err);
return;
}
}
}
});
let packet = UdpPacket {
data: buf,
dest: real_addr.clone(),
};
match nat
.send_packet(real_addr, source_addr, packet, sender)
.await
{
Ok(_) => {}
Err(err) => {
debug!("{}", err);
}
}
});
}
InboundResult::Stream(stream, sess) => {}
InboundResult::NotSupported => {}
}
}
Err(err) => {}
}
}
.boxed();
future
}
}
3 changes: 3 additions & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ pub use sniffer::Sniffer;

mod router;
pub use router::Router;

mod udp_association_manager;
pub use self::udp_association_manager::UdpAssociationManager;
Loading