Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
theangryangel committed Oct 18, 2024
1 parent cedcd1f commit c3bf921
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 621 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ missing_debug_implementations = "deny"
broken_intra_doc_links = "deny"

[workspace.dependencies]
async-trait = { version = "0.1.68" }
binrw = "0.14.0"
bitflags = "2.4.0"
bytes = "1.4.0"
Expand Down
15 changes: 0 additions & 15 deletions examples/low-level-async/Cargo.toml

This file was deleted.

5 changes: 0 additions & 5 deletions examples/low-level-async/README.md

This file was deleted.

157 changes: 0 additions & 157 deletions examples/low-level-async/src/main.rs

This file was deleted.

2 changes: 1 addition & 1 deletion examples/strobe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{net::SocketAddr, time::Duration};
use clap::Parser;
use insim::{
identifiers::{PlayerId, RequestId},
insim::{IsiFlags, LclFlags, Small, SmallType, Tiny, TinyType},
insim::{IsiFlags, LclFlags, Tiny, TinyType},
Packet, Result,
};
use tokio::time::interval;
Expand Down
10 changes: 7 additions & 3 deletions insim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ workspace = true
[features]
default = ["tokio", "blocking", "websocket"]
blocking = []
tokio = ["dep:tokio", "dep:async-trait"]
tokio = [
"dep:tokio",
]
serde = [
"dep:serde",
"insim_core/serde",
Expand All @@ -27,10 +29,12 @@ serde = [
]
pth = ["dep:insim_pth"]
smx = ["dep:insim_smx"]
websocket = ["tokio", "dep:tokio-tungstenite", "futures-util"]
websocket = [
"tokio", "dep:tokio-tungstenite",
"futures-util"
]

[dependencies]
async-trait = { workspace = true, optional = true }
bitflags = { workspace = true }
bytes = { workspace = true }
from_variants = { workspace = true }
Expand Down
64 changes: 42 additions & 22 deletions insim/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{fmt::Debug, net::SocketAddr, time::Duration};

#[cfg(feature = "blocking")]
use crate::net::blocking_impl::{Framed as BlockingFramed, FramedInner as BlockingFramedInner};
use crate::net::blocking_impl::Framed as BlockingFramed;
#[cfg(feature = "tokio")]
use crate::net::tokio_impl::{Framed as AsyncFramed, FramedInner as AsyncFramedInner};
use crate::net::tokio_impl::Framed as AsyncFramed;
use crate::{
identifiers::RequestId,
insim::{Isi, IsiFlags},
net::{Codec, Mode},
net::{Codec, Mode, DEFAULT_TIMEOUT_SECS},
relay::Sel,
result::Result,
};
Expand Down Expand Up @@ -256,36 +256,44 @@ impl Builder {
pub fn connect_blocking(&self) -> Result<BlockingFramed> {
use std::net::ToSocketAddrs;

use crate::LFSW_RELAY_ADDR;
use crate::{net::blocking_impl::UdpStream, LFSW_RELAY_ADDR};

match self.proto {
Proto::Tcp => {
let stream =
std::net::TcpStream::connect_timeout(&self.remote, self.connect_timeout)?;
stream.set_nodelay(self.tcp_nodelay)?;
stream.set_read_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;
stream.set_write_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;

let mut stream = BlockingFramedInner::new(stream, Codec::new(self.mode.clone()));
let mut stream =
BlockingFramed::new(Box::new(stream), Codec::new(self.mode.clone()));
stream.verify_version(self.verify_version);
stream.handshake(self.isi())?;

Ok(BlockingFramed::Tcp(stream))
Ok(stream)
},
Proto::Udp => {
let local = self.udp_local_address.unwrap_or("0.0.0.0:0".parse()?);

let stream = std::net::UdpSocket::bind(local)?;
stream.connect(self.remote)?;
stream.set_read_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;
stream.set_write_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;

let mut isi = self.isi();
if self.udp_local_address.is_none() {
isi.udpport = local.port();
}

let mut stream = BlockingFramedInner::new(stream, Codec::new(self.mode.clone()));
let mut stream = BlockingFramed::new(
Box::new(UdpStream::from(stream)),
Codec::new(self.mode.clone()),
);
stream.verify_version(self.verify_version);
stream.handshake(isi)?;

Ok(BlockingFramed::Udp(stream))
Ok(stream)
},
Proto::Relay => {
let addrs = LFSW_RELAY_ADDR.to_socket_addrs()?;
Expand All @@ -295,7 +303,11 @@ impl Builder {
self.connect_timeout,
)?;
stream.set_nodelay(self.tcp_nodelay)?;
let mut stream = BlockingFramedInner::new(stream, Codec::new(Mode::Uncompressed));
stream.set_read_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;
stream.set_write_timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_SECS)))?;

let mut stream =
BlockingFramed::new(Box::new(stream), Codec::new(Mode::Uncompressed));

if let Some(hostname) = &self.relay_select_host {
let packet = Sel {
Expand All @@ -313,10 +325,10 @@ impl Builder {
.to_owned(),
};

stream.write(packet.into())?;
stream.write(packet)?;
}

Ok(BlockingFramed::Tcp(stream))
Ok(stream)
},
}
}
Expand All @@ -326,7 +338,9 @@ impl Builder {
/// The `Builder` is not consumed and may be reused.
#[cfg(feature = "tokio")]
pub async fn connect_async(&self) -> Result<AsyncFramed> {
use tokio::{io::BufWriter, time::timeout};
use tokio::time::timeout;

use crate::net::tokio_impl::udp::UdpStream;

match self.proto {
Proto::Tcp => {
Expand All @@ -337,13 +351,11 @@ impl Builder {
.await??;
stream.set_nodelay(self.tcp_nodelay)?;

let stream = BufWriter::new(stream);

let mut stream = AsyncFramedInner::new(stream, Codec::new(self.mode.clone()));
let mut stream = AsyncFramed::new(Box::new(stream), Codec::new(self.mode.clone()));
stream.verify_version(self.verify_version);
stream.handshake(self.isi(), self.handshake_timeout).await?;

Ok(AsyncFramed::BufferedTcp(stream))
Ok(stream)
},
Proto::Udp => {
let local = self.udp_local_address.unwrap_or("0.0.0.0:0".parse()?);
Expand All @@ -356,11 +368,14 @@ impl Builder {
isi.udpport = local.port();
}

let mut stream = AsyncFramedInner::new(stream, Codec::new(self.mode.clone()));
let mut stream = AsyncFramed::new(
Box::new(UdpStream::from(stream)),
Codec::new(self.mode.clone()),
);
stream.verify_version(self.verify_version);
stream.handshake(isi, self.handshake_timeout).await?;

Ok(AsyncFramed::Udp(stream))
Ok(stream)
},
Proto::Relay => {
let mut stream = self._connect_relay().await?;
Expand Down Expand Up @@ -393,6 +408,8 @@ impl Builder {
async fn _connect_relay(&self) -> Result<AsyncFramed> {
use tokio::time::timeout;

use crate::net::tokio_impl::websocket::WebsocketStream;

#[cfg(feature = "websocket")]
if self.relay_websocket {
let stream = timeout(
Expand All @@ -401,9 +418,12 @@ impl Builder {
)
.await??;

let mut inner = AsyncFramedInner::new(stream, Codec::new(Mode::Uncompressed));
let mut inner = AsyncFramed::new(
Box::new(WebsocketStream::from(stream)),
Codec::new(Mode::Uncompressed),
);
inner.verify_version(self.verify_version);
return Ok(AsyncFramed::WebSocket(inner));
return Ok(inner);
}

let stream = timeout(
Expand All @@ -413,8 +433,8 @@ impl Builder {
.await??;
stream.set_nodelay(self.tcp_nodelay)?;

let mut inner = AsyncFramedInner::new(stream, Codec::new(Mode::Uncompressed));
let mut inner = AsyncFramed::new(Box::new(stream), Codec::new(Mode::Uncompressed));
inner.verify_version(self.verify_version);
Ok(AsyncFramed::Tcp(inner))
Ok(inner)
}
}
Loading

0 comments on commit c3bf921

Please sign in to comment.