Skip to content

Commit

Permalink
Merge pull request #79 from thearossman/main
Browse files Browse the repository at this point in the history
Minor TCP Reassembly and Packet Tracking Changes
  • Loading branch information
thearossman authored Dec 4, 2024
2 parents 5598165 + 6764ee8 commit 4cb8f72
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 102 deletions.
62 changes: 34 additions & 28 deletions core/src/conntrack/conn/conn_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
use crate::conntrack::pdu::L4Pdu;
use crate::filter::Actions;
use crate::lcore::CoreId;
use crate::protocols::packet::tcp::TCP_PROTOCOL;
use crate::protocols::stream::{
ConnData, ParseResult, ParserRegistry, ParsingState, ProbeRegistryResult,
};
use crate::subscription::{Subscription, Trackable};
use crate::{FiveTuple, Mbuf};
use crate::FiveTuple;

#[derive(Debug)]
pub(crate) struct ConnInfo<T>
Expand Down Expand Up @@ -50,6 +49,31 @@ where
self.actions = pkt_actions;
}

#[inline]
pub(crate) fn update_sdata(
&mut self,
pdu: &L4Pdu,
subscription: &Subscription<T::Subscribed>,
reassembled: bool,
) {
// Typically use for calculating connection metrics
if self.actions.update_pdu() {
self.sdata.update(pdu, reassembled);
}
// Used for non-terminal matches on packet datatypes (`PacketCache` action,
// pre-reassembly only) and for datatypes that require tracking packets
// (`PacketTrack`, pre- and/or post-reassembly).
if self.actions.buffer_packet(reassembled) {
self.sdata.buffer_packet(pdu, &self.actions, reassembled);
}
// Packet-level subscriptions ready for delivery should be
// delivered 1x (before reassembly).
if !reassembled && self.actions.packet_deliver() {
// Delivering all remaining packets in connection
subscription.deliver_packet(pdu.mbuf_ref(), &self.cdata, &self.sdata);
}
}

pub(crate) fn consume_pdu(
&mut self,
pdu: L4Pdu,
Expand All @@ -65,22 +89,7 @@ where
self.handle_parse(&pdu, subscription, registry);
}

// Post-reassembly `update`
if self.actions.update_pdu_reassembled() && pdu.ctxt.proto == TCP_PROTOCOL {
// Forward PDU to any subscriptions that require
// tracking ongoing connection data post-reassembly
self.sdata.update(&pdu, true);
}
if self.actions.packet_deliver() {
// Delivering all remaining packets in connection
subscription.deliver_packet(pdu.mbuf_ref(), &self.cdata, &self.sdata);
}
if self.actions.buffer_frame() {
// Track frame for (potential) future delivery
// Used when a filter has partially matched for a
// subscription that requests packets
self.sdata.track_packet(Mbuf::new_ref(pdu.mbuf_ref()));
}
self.update_sdata(&pdu, subscription, true);
}

fn handle_parse(
Expand Down Expand Up @@ -208,18 +217,15 @@ where
self.actions.clear();
}

// Helper used after filter updates
pub(crate) fn clear_packets(&mut self) {
self.sdata.drain_packets();
}

// Helper to be used after applying protocol or session filter
pub(crate) fn clear_stale_data(&mut self, new_actions: &Actions) {
if self.actions.buffer_frame() && !new_actions.buffer_frame() && !self.actions.drop() {
// No longer need tracked packets; delete to save memory
// Don't clear if all connection data may be about to be dropped
self.clear_packets();
assert!(!new_actions.buffer_frame());
if !self.actions.drop() {
if self.actions.cache_packet() && !new_actions.cache_packet() {
self.sdata.drain_cached_packets();
}
if self.actions.buffer_packet(true) && !new_actions.buffer_packet(true) {
self.sdata.drain_tracked_packets();
}
}
// Don't clear sessions, as SessionTrack is never
// a terminal action at the protocol stage
Expand Down
11 changes: 11 additions & 0 deletions core/src/conntrack/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ where
match &mut self.l4conn {
L4Conn::Tcp(tcp_conn) => {
tcp_conn.reassemble(pdu, &mut self.info, subscription, registry);
// Check if, after actions update, the framework/subscriptions no longer require
// receiving reassembled traffic
if !self.info.actions.reassemble() {
// Safe to discard out-of-order buffers
if tcp_conn.ctos.ooo_buf.len() != 0 {
tcp_conn.ctos.ooo_buf.buf.clear();
}
if tcp_conn.stoc.ooo_buf.len() != 0 {
tcp_conn.stoc.ooo_buf.buf.clear();
}
}
}
L4Conn::Udp(_udp_conn) => self.info.consume_pdu(pdu, subscription, registry),
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/conntrack/conn/tcp_conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ impl TcpConn {
pub(crate) fn new_on_syn(ctxt: L4Context, max_ooo: usize) -> Self {
let flags = ctxt.flags;
let next_seq = ctxt.seq_no.wrapping_add(1 + ctxt.length as u32);
let ack = ctxt.ack_no;
TcpConn {
ctos: TcpFlow::new(max_ooo, next_seq, flags),
ctos: TcpFlow::new(max_ooo, next_seq, flags, ack),
stoc: TcpFlow::default(max_ooo),
}
}
Expand All @@ -43,11 +44,11 @@ impl TcpConn {
/// Returns `true` if the connection should be terminated
#[inline]
pub(crate) fn is_terminated(&self) -> bool {
// Both sides have sent FIN, or a RST has been sent
(self.ctos.consumed_flags & self.stoc.consumed_flags & FIN
| self.ctos.consumed_flags & RST
| self.stoc.consumed_flags & RST)
!= 0
// Both sides have sent, reassembled, and acknowledged FIN, or RST has been sent
(self.ctos.consumed_flags & self.stoc.consumed_flags & FIN != 0
&& self.ctos.last_ack == self.stoc.next_seq
&& self.stoc.last_ack == self.ctos.next_seq)
|| (self.ctos.consumed_flags & RST | self.stoc.consumed_flags & RST) != 0
}

/// Updates connection termination flags
Expand Down
18 changes: 16 additions & 2 deletions core/src/conntrack/conn/tcp_conn/reassembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::collections::VecDeque;
pub(crate) struct TcpFlow {
/// Expected sequence number of next segment
pub(super) next_seq: Option<u32>,
/// Last-seen ack number for peer's flow
pub(crate) last_ack: Option<u32>,
/// Flow status for consumed control packets.
/// Matches TCP flag bits.
pub(super) consumed_flags: u8,
Expand All @@ -26,6 +28,7 @@ impl TcpFlow {
pub(super) fn default(capacity: usize) -> Self {
TcpFlow {
next_seq: None,
last_ack: None,
consumed_flags: 0,
ooo_buf: OutOfOrderBuffer::new(capacity),
}
Expand All @@ -34,9 +37,10 @@ impl TcpFlow {
/// Creates a new TCP flow with given next sequence number, flags,
/// and out-of-order buffer
#[inline]
pub(super) fn new(capacity: usize, next_seq: u32, flags: u8) -> Self {
pub(super) fn new(capacity: usize, next_seq: u32, flags: u8, ack: u32) -> Self {
TcpFlow {
next_seq: Some(next_seq),
last_ack: Some(ack),
consumed_flags: flags,
ooo_buf: OutOfOrderBuffer::new(capacity),
}
Expand Down Expand Up @@ -68,6 +72,7 @@ impl TcpFlow {
if segment.flags() & FIN != 0 {
expected_seq = cur_seq.wrapping_add(1);
}
self.last_ack = Some(segment.ack_no());
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else if wrapping_lt(next_seq, cur_seq) {
Expand All @@ -76,6 +81,7 @@ impl TcpFlow {
} else if let Some(expected_seq) = overlap(&mut segment, next_seq) {
// Segment starts before the next expected segment but has new data
self.consumed_flags |= segment.flags();
self.last_ack = Some(segment.ack_no());
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else {
Expand All @@ -93,6 +99,7 @@ impl TcpFlow {
let expected_seq = cur_seq.wrapping_add(1 + length);
self.next_seq = Some(expected_seq);
self.consumed_flags |= segment.flags();
self.last_ack = Some(segment.ack_no());
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else {
Expand Down Expand Up @@ -128,6 +135,7 @@ impl TcpFlow {
}
let next_seq = self.ooo_buf.flush_ordered::<T>(
expected_seq,
&mut self.last_ack,
&mut self.consumed_flags,
info,
subscription,
Expand Down Expand Up @@ -177,6 +185,7 @@ impl OutOfOrderBuffer {
fn flush_ordered<T: Trackable>(
&mut self,
expected_seq: u32,
last_ack: &mut Option<u32>,
consumed_flags: &mut u8,
info: &mut ConnInfo<T>,
subscription: &Subscription<T::Subscribed>,
Expand Down Expand Up @@ -204,6 +213,7 @@ impl OutOfOrderBuffer {
if segment.flags() & FIN != 0 {
next_seq = next_seq.wrapping_add(1);
}
*last_ack = Some(segment.ack_no());
info.consume_pdu(segment, subscription, registry);
index = 0;
} else if wrapping_lt(next_seq, cur_seq) {
Expand All @@ -213,6 +223,7 @@ impl OutOfOrderBuffer {
if let Some(update_seq) = overlap(&mut segment, next_seq) {
next_seq = update_seq;
*consumed_flags |= segment.flags();
*last_ack = Some(segment.ack_no());
info.consume_pdu(segment, subscription, registry);
index = 0;
} else {
Expand Down Expand Up @@ -242,7 +253,10 @@ pub fn wrapping_lt(lhs: u32, rhs: u32) -> bool {
fn overlap(segment: &mut L4Pdu, expected_seq: u32) -> Option<u32> {
let length = segment.length();
let cur_seq = segment.seq_no();
let end_seq = cur_seq.wrapping_add(length as u32);
let mut end_seq = cur_seq.wrapping_add(length as u32);
if segment.flags() & FIN != 0 {
end_seq = end_seq.wrapping_add(1);
}

if wrapping_lt(expected_seq, end_seq) {
// contains new data
Expand Down
14 changes: 7 additions & 7 deletions core/src/conntrack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ where
return;
}
let pdu = L4Pdu::new(mbuf, ctxt, dir);
if conn.info.actions.update_pdu() {
conn.info.sdata.update(&pdu, false);
}
if conn.info.actions.update_conn() {
conn.info.update_sdata(&pdu, subscription, false);
// Consume PDU for reassembly or parsing
if conn.info.actions.reassemble() {
conn.update(pdu, subscription, &self.registry);
} else {
// Ensure FIN is handled, if appl.
conn.update_tcp_flags(pdu.flags(), pdu.dir);
}

Expand Down Expand Up @@ -138,10 +138,10 @@ where
};
if let Ok(mut conn) = conn {
conn.info.filter_first_packet(&pdu, subscription);
if conn.info.actions.update_pdu() {
conn.info.sdata.update(&pdu, false);
conn.info.update_sdata(&pdu, subscription, false);
if conn.info.actions.reassemble() {
conn.info.consume_pdu(pdu, subscription, &self.registry);
}
conn.info.consume_pdu(pdu, subscription, &self.registry);
if !conn.remove_from_table() {
self.timerwheel.insert(
&conn_id,
Expand Down
11 changes: 11 additions & 0 deletions core/src/conntrack/pdu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ impl L4Pdu {
self.ctxt.seq_no
}

#[inline]
pub fn ack_no(&self) -> u32 {
self.ctxt.ack_no
}

#[inline]
pub fn flags(&self) -> u8 {
self.ctxt.flags
Expand All @@ -72,6 +77,8 @@ pub struct L4Context {
pub length: usize,
/// Raw sequence number of segment.
pub seq_no: u32,
/// Raw acknowledgment number of segment.
pub ack_no: u32,
/// TCP flags.
pub flags: u8,
}
Expand All @@ -91,6 +98,7 @@ impl L4Context {
offset: tcp.next_header_offset(),
length: payload_size,
seq_no: tcp.seq_no(),
ack_no: tcp.ack_no(),
flags: tcp.flags(),
})
} else {
Expand All @@ -107,6 +115,7 @@ impl L4Context {
offset: udp.next_header_offset(),
length: payload_size,
seq_no: 0,
ack_no: 0,
flags: 0,
})
} else {
Expand All @@ -127,6 +136,7 @@ impl L4Context {
offset: tcp.next_header_offset(),
length: payload_size,
seq_no: tcp.seq_no(),
ack_no: tcp.ack_no(),
flags: tcp.flags(),
})
} else {
Expand All @@ -143,6 +153,7 @@ impl L4Context {
offset: udp.next_header_offset(),
length: payload_size,
seq_no: 0,
ack_no: 0,
flags: 0,
})
} else {
Expand Down
Loading

0 comments on commit 4cb8f72

Please sign in to comment.