From f629524f464bb22b8c098a548902e00d428d7cfa Mon Sep 17 00:00:00 2001 From: Thea Rossman Date: Thu, 7 Nov 2024 20:44:51 -0800 Subject: [PATCH 1/5] Perform pkt delivery/tracking without reassembly when possible --- core/src/conntrack/conn/conn_info.rs | 27 +++++++++++++++++---------- core/src/conntrack/mod.rs | 15 +++++++++++++-- core/src/filter/actions.rs | 10 ---------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/core/src/conntrack/conn/conn_info.rs b/core/src/conntrack/conn/conn_info.rs index cbf25acc..0740a9eb 100644 --- a/core/src/conntrack/conn/conn_info.rs +++ b/core/src/conntrack/conn/conn_info.rs @@ -50,6 +50,21 @@ where self.actions = pkt_actions; } + #[inline] + pub(crate) fn release_mbuf(&mut self, mbuf: Mbuf, + subscription: &Subscription) { + if self.actions.packet_deliver() { + // Delivering all remaining packets in connection + subscription.deliver_packet(&mbuf, &self.cdata, &self.sdata); + } + if self.actions.buffer_frame() { + // Track frame for (potential) future delivery + // Used when a filter has partially matched for a + // subscription that requests packets + self.sdata.track_packet(mbuf); + } + } + pub(crate) fn consume_pdu( &mut self, pdu: L4Pdu, @@ -71,16 +86,8 @@ where // tracking ongoing connection data post-reassembly self.sdata.update(&pdu, true); } - if self.actions.packet_deliver() { - // Delivering all remaining packets in connection - subscription.deliver_packet(pdu.mbuf_ref(), &self.cdata, &self.sdata); - } - if self.actions.buffer_frame() { - // Track frame for (potential) future delivery - // Used when a filter has partially matched for a - // subscription that requests packets - self.sdata.track_packet(Mbuf::new_ref(pdu.mbuf_ref())); - } + + self.release_mbuf(pdu.mbuf_own(), subscription); } fn handle_parse( diff --git a/core/src/conntrack/mod.rs b/core/src/conntrack/mod.rs index 4f94195b..6457279b 100644 --- a/core/src/conntrack/mod.rs +++ b/core/src/conntrack/mod.rs @@ -103,10 +103,15 @@ where if conn.info.actions.update_pdu() { conn.info.sdata.update(&pdu, false); } - if conn.info.actions.update_conn() { + // Consume PDU for reassembly or parsing + if conn.info.actions.parse_any() || + conn.info.actions.update_pdu_reassembled() { conn.update(pdu, subscription, &self.registry); } else { + // Ensure FIN is handled, if appl. conn.update_tcp_flags(pdu.flags(), pdu.dir); + // Handle packet delivery to subscription, if appl. + conn.info.release_mbuf(pdu.mbuf_own(), subscription); } // Delete stale data for connections no longer matching @@ -141,7 +146,13 @@ where if conn.info.actions.update_pdu() { conn.info.sdata.update(&pdu, false); } - conn.info.consume_pdu(pdu, subscription, &self.registry); + if conn.info.actions.parse_any() || + conn.info.actions.update_pdu_reassembled() { + conn.info.consume_pdu(pdu, subscription, &self.registry); + } else { + // Handle packet delivery, if applicable + conn.info.release_mbuf(pdu.mbuf_own(), subscription); + } if !conn.remove_from_table() { self.timerwheel.insert( &conn_id, diff --git a/core/src/filter/actions.rs b/core/src/filter/actions.rs index fafeb8dc..0ba8f773 100644 --- a/core/src/filter/actions.rs +++ b/core/src/filter/actions.rs @@ -143,16 +143,6 @@ impl Actions { ) } - /// True if the framework should consume PDUs for reassembly (TCP), - /// parsing, or operations that require ownership of packets. - #[inline] - pub(crate) fn update_conn(&self) -> bool { - self.parse_any() - || self.update_pdu_reassembled() - || self.buffer_frame() - || self.packet_deliver() - } - /// True if nothing except delivery is required /// Allows delivering and dropping the connection to happen early #[inline] From 5ba4e9b1bf8f0049a0d3388d89e85e6c3f53ff2d Mon Sep 17 00:00:00 2001 From: Thea Rossman Date: Fri, 8 Nov 2024 09:51:05 -0800 Subject: [PATCH 2/5] Update TCP termination heuristic to get last ACK We shouldn't consider a TCP connection complete until both sides have acknowledged the other's FIN. This introduces slight overhead (tracking acknowledgement numbers), but is needed for some packet analysis (e.g., TCP flags) use cases. --- core/src/conntrack/conn/tcp_conn/mod.rs | 11 +++++++---- core/src/conntrack/conn/tcp_conn/reassembly.rs | 18 ++++++++++++++++-- core/src/conntrack/pdu.rs | 11 +++++++++++ 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/core/src/conntrack/conn/tcp_conn/mod.rs b/core/src/conntrack/conn/tcp_conn/mod.rs index 393a2eb8..7e02734c 100644 --- a/core/src/conntrack/conn/tcp_conn/mod.rs +++ b/core/src/conntrack/conn/tcp_conn/mod.rs @@ -16,8 +16,9 @@ impl TcpConn { pub(crate) fn new_on_syn(ctxt: L4Context, max_ooo: usize) -> Self { let flags = ctxt.flags; let next_seq = ctxt.seq_no.wrapping_add(1 + ctxt.length as u32); + let ack = ctxt.ack_no; TcpConn { - ctos: TcpFlow::new(max_ooo, next_seq, flags), + ctos: TcpFlow::new(max_ooo, next_seq, flags, ack), stoc: TcpFlow::default(max_ooo), } } @@ -43,9 +44,11 @@ impl TcpConn { /// Returns `true` if the connection should be terminated #[inline] pub(crate) fn is_terminated(&self) -> bool { - // Both sides have sent FIN, or a RST has been sent - (self.ctos.consumed_flags & self.stoc.consumed_flags & FIN - | self.ctos.consumed_flags & RST + // Both sides have sent, reassembled, and acknowledged FIN, or RST has been sent + (self.ctos.consumed_flags & self.stoc.consumed_flags & FIN != 0 && + self.ctos.last_ack == self.stoc.next_seq && + self.stoc.last_ack == self.ctos.next_seq) || + (self.ctos.consumed_flags & RST | self.stoc.consumed_flags & RST) != 0 } diff --git a/core/src/conntrack/conn/tcp_conn/reassembly.rs b/core/src/conntrack/conn/tcp_conn/reassembly.rs index acff6c1c..ae270f25 100644 --- a/core/src/conntrack/conn/tcp_conn/reassembly.rs +++ b/core/src/conntrack/conn/tcp_conn/reassembly.rs @@ -13,6 +13,8 @@ use std::collections::VecDeque; pub(crate) struct TcpFlow { /// Expected sequence number of next segment pub(super) next_seq: Option, + /// Last-seen ack number for peer's flow + pub(crate) last_ack: Option, /// Flow status for consumed control packets. /// Matches TCP flag bits. pub(super) consumed_flags: u8, @@ -26,6 +28,7 @@ impl TcpFlow { pub(super) fn default(capacity: usize) -> Self { TcpFlow { next_seq: None, + last_ack: None, consumed_flags: 0, ooo_buf: OutOfOrderBuffer::new(capacity), } @@ -34,9 +37,10 @@ impl TcpFlow { /// Creates a new TCP flow with given next sequence number, flags, /// and out-of-order buffer #[inline] - pub(super) fn new(capacity: usize, next_seq: u32, flags: u8) -> Self { + pub(super) fn new(capacity: usize, next_seq: u32, flags: u8, ack: u32) -> Self { TcpFlow { next_seq: Some(next_seq), + last_ack: Some(ack), consumed_flags: flags, ooo_buf: OutOfOrderBuffer::new(capacity), } @@ -68,6 +72,7 @@ impl TcpFlow { if segment.flags() & FIN != 0 { expected_seq = cur_seq.wrapping_add(1); } + self.last_ack = Some(segment.ack_no()); info.consume_pdu(segment, subscription, registry); self.flush_ooo_buffer::(expected_seq, info, subscription, registry); } else if wrapping_lt(next_seq, cur_seq) { @@ -76,6 +81,7 @@ impl TcpFlow { } else if let Some(expected_seq) = overlap(&mut segment, next_seq) { // Segment starts before the next expected segment but has new data self.consumed_flags |= segment.flags(); + self.last_ack = Some(segment.ack_no()); info.consume_pdu(segment, subscription, registry); self.flush_ooo_buffer::(expected_seq, info, subscription, registry); } else { @@ -93,6 +99,7 @@ impl TcpFlow { let expected_seq = cur_seq.wrapping_add(1 + length); self.next_seq = Some(expected_seq); self.consumed_flags |= segment.flags(); + self.last_ack = Some(segment.ack_no()); info.consume_pdu(segment, subscription, registry); self.flush_ooo_buffer::(expected_seq, info, subscription, registry); } else { @@ -128,6 +135,7 @@ impl TcpFlow { } let next_seq = self.ooo_buf.flush_ordered::( expected_seq, + &mut self.last_ack, &mut self.consumed_flags, info, subscription, @@ -177,6 +185,7 @@ impl OutOfOrderBuffer { fn flush_ordered( &mut self, expected_seq: u32, + last_ack: &mut Option, consumed_flags: &mut u8, info: &mut ConnInfo, subscription: &Subscription, @@ -204,6 +213,7 @@ impl OutOfOrderBuffer { if segment.flags() & FIN != 0 { next_seq = next_seq.wrapping_add(1); } + *last_ack = Some(segment.ack_no()); info.consume_pdu(segment, subscription, registry); index = 0; } else if wrapping_lt(next_seq, cur_seq) { @@ -213,6 +223,7 @@ impl OutOfOrderBuffer { if let Some(update_seq) = overlap(&mut segment, next_seq) { next_seq = update_seq; *consumed_flags |= segment.flags(); + *last_ack = Some(segment.ack_no()); info.consume_pdu(segment, subscription, registry); index = 0; } else { @@ -242,7 +253,10 @@ pub fn wrapping_lt(lhs: u32, rhs: u32) -> bool { fn overlap(segment: &mut L4Pdu, expected_seq: u32) -> Option { let length = segment.length(); let cur_seq = segment.seq_no(); - let end_seq = cur_seq.wrapping_add(length as u32); + let mut end_seq = cur_seq.wrapping_add(length as u32); + if segment.flags() & FIN != 0 { + end_seq = end_seq.wrapping_add(1); + } if wrapping_lt(expected_seq, end_seq) { // contains new data diff --git a/core/src/conntrack/pdu.rs b/core/src/conntrack/pdu.rs index 5f91805c..6060c7f5 100644 --- a/core/src/conntrack/pdu.rs +++ b/core/src/conntrack/pdu.rs @@ -51,6 +51,11 @@ impl L4Pdu { self.ctxt.seq_no } + #[inline] + pub fn ack_no(&self) -> u32 { + self.ctxt.ack_no + } + #[inline] pub fn flags(&self) -> u8 { self.ctxt.flags @@ -72,6 +77,8 @@ pub struct L4Context { pub length: usize, /// Raw sequence number of segment. pub seq_no: u32, + /// Raw acknowledgment number of segment. + pub ack_no: u32, /// TCP flags. pub flags: u8, } @@ -91,6 +98,7 @@ impl L4Context { offset: tcp.next_header_offset(), length: payload_size, seq_no: tcp.seq_no(), + ack_no: tcp.ack_no(), flags: tcp.flags(), }) } else { @@ -107,6 +115,7 @@ impl L4Context { offset: udp.next_header_offset(), length: payload_size, seq_no: 0, + ack_no: 0, flags: 0, }) } else { @@ -127,6 +136,7 @@ impl L4Context { offset: tcp.next_header_offset(), length: payload_size, seq_no: tcp.seq_no(), + ack_no: tcp.ack_no(), flags: tcp.flags(), }) } else { @@ -143,6 +153,7 @@ impl L4Context { offset: udp.next_header_offset(), length: payload_size, seq_no: 0, + ack_no: 0, flags: 0, }) } else { From 2fec59b0691f11496e3c7ae72260d792e9f5c621 Mon Sep 17 00:00:00 2001 From: Thea Rossman Date: Fri, 8 Nov 2024 10:35:41 -0800 Subject: [PATCH 3/5] discard OOO buffers when no longer needed May provide modest mempool utilization improvement for certain connection-tracking applications on lossy networks. --- core/src/conntrack/conn/mod.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/conntrack/conn/mod.rs b/core/src/conntrack/conn/mod.rs index 2650ab2c..bffe1a9c 100644 --- a/core/src/conntrack/conn/mod.rs +++ b/core/src/conntrack/conn/mod.rs @@ -96,6 +96,18 @@ where match &mut self.l4conn { L4Conn::Tcp(tcp_conn) => { tcp_conn.reassemble(pdu, &mut self.info, subscription, registry); + // Check if, after actions update, the framework/subscriptions no longer require + // receiving reassembled traffic + if !self.info.actions.parse_any() && + !self.info.actions.update_pdu_reassembled() { + // Safe to discard out-of-order buffers + if tcp_conn.ctos.ooo_buf.len() != 0 { + tcp_conn.ctos.ooo_buf.buf.clear(); + } + if tcp_conn.stoc.ooo_buf.len() != 0 { + tcp_conn.stoc.ooo_buf.buf.clear(); + } + } } L4Conn::Udp(_udp_conn) => self.info.consume_pdu(pdu, subscription, registry), } From 46c95d125737f9a50f15caa37afa45fb415e01de Mon Sep 17 00:00:00 2001 From: Thea Rossman Date: Tue, 12 Nov 2024 18:40:30 -0800 Subject: [PATCH 4/5] Drop tracked packets quickly; combine `update sdata` actions - Distinguish between `update` (metadata) and `track_packets` (presumed to be less common and more expensive). Also distinguish between `cache_packets` (storing raw mbufs in case of filter match on packet-level subscription) and `track_packets` (tracking for duration of connection). tl;dr - try to only use mbufs if needed and release them quickly. - Remove `update_reassembled` and `track_packets_reassembled`. The expensive operation is reassembly itself, not potentially invoking no-op functions. Replace with a single `reassemble` action. --- core/src/conntrack/conn/conn_info.rs | 57 +++++++++++++--------------- core/src/conntrack/conn/mod.rs | 3 +- core/src/conntrack/mod.rs | 19 ++-------- core/src/filter/actions.rs | 40 +++++++++++++------ core/src/filter/datatypes.rs | 44 +++++++++++++-------- core/src/filter/ptree.rs | 4 +- core/src/subscription/mod.rs | 14 +++++-- datatypes/src/lib.rs | 2 +- datatypes/src/packet_list.rs | 20 +++++----- datatypes/src/typedefs.rs | 3 +- filtergen/src/data.rs | 32 ++++++++++++++-- 11 files changed, 142 insertions(+), 96 deletions(-) diff --git a/core/src/conntrack/conn/conn_info.rs b/core/src/conntrack/conn/conn_info.rs index 0740a9eb..1972515d 100644 --- a/core/src/conntrack/conn/conn_info.rs +++ b/core/src/conntrack/conn/conn_info.rs @@ -7,12 +7,11 @@ use crate::conntrack::pdu::L4Pdu; use crate::filter::Actions; use crate::lcore::CoreId; -use crate::protocols::packet::tcp::TCP_PROTOCOL; use crate::protocols::stream::{ ConnData, ParseResult, ParserRegistry, ParsingState, ProbeRegistryResult, }; use crate::subscription::{Subscription, Trackable}; -use crate::{FiveTuple, Mbuf}; +use crate::FiveTuple; #[derive(Debug)] pub(crate) struct ConnInfo @@ -51,17 +50,25 @@ where } #[inline] - pub(crate) fn release_mbuf(&mut self, mbuf: Mbuf, - subscription: &Subscription) { - if self.actions.packet_deliver() { + pub(crate) fn update_sdata(&mut self, pdu: &L4Pdu, + subscription: &Subscription, + reassembled: bool) { + // Typically use for calculating connection metrics + if self.actions.update_pdu() { + self.sdata.update(&pdu, reassembled); + } + // Used for non-terminal matches on packet datatypes (`PacketCache` action, + // pre-reassembly only) and for datatypes that require tracking packets + // (`PacketTrack`, pre- and/or post-reassembly). + if self.actions.buffer_packet(reassembled) { + self.sdata.buffer_packet(pdu, &self.actions, reassembled); + } + // Packet-level subscriptions ready for delivery should be + // delivered 1x (before reassembly). + if !reassembled && self.actions.packet_deliver() { // Delivering all remaining packets in connection - subscription.deliver_packet(&mbuf, &self.cdata, &self.sdata); - } - if self.actions.buffer_frame() { - // Track frame for (potential) future delivery - // Used when a filter has partially matched for a - // subscription that requests packets - self.sdata.track_packet(mbuf); + subscription.deliver_packet(pdu.mbuf_ref(), + &self.cdata, &self.sdata); } } @@ -80,14 +87,7 @@ where self.handle_parse(&pdu, subscription, registry); } - // Post-reassembly `update` - if self.actions.update_pdu_reassembled() && pdu.ctxt.proto == TCP_PROTOCOL { - // Forward PDU to any subscriptions that require - // tracking ongoing connection data post-reassembly - self.sdata.update(&pdu, true); - } - - self.release_mbuf(pdu.mbuf_own(), subscription); + self.update_sdata(&pdu, subscription, true); } fn handle_parse( @@ -215,18 +215,15 @@ where self.actions.clear(); } - // Helper used after filter updates - pub(crate) fn clear_packets(&mut self) { - self.sdata.drain_packets(); - } - // Helper to be used after applying protocol or session filter pub(crate) fn clear_stale_data(&mut self, new_actions: &Actions) { - if self.actions.buffer_frame() && !new_actions.buffer_frame() && !self.actions.drop() { - // No longer need tracked packets; delete to save memory - // Don't clear if all connection data may be about to be dropped - self.clear_packets(); - assert!(!new_actions.buffer_frame()); + if !self.actions.drop() { + if self.actions.cache_packet() && !new_actions.cache_packet() { + self.sdata.drain_cached_packets(); + } + if self.actions.buffer_packet(true) && !new_actions.buffer_packet(true) { + self.sdata.drain_tracked_packets(); + } } // Don't clear sessions, as SessionTrack is never // a terminal action at the protocol stage diff --git a/core/src/conntrack/conn/mod.rs b/core/src/conntrack/conn/mod.rs index bffe1a9c..dadaf4a0 100644 --- a/core/src/conntrack/conn/mod.rs +++ b/core/src/conntrack/conn/mod.rs @@ -98,8 +98,7 @@ where tcp_conn.reassemble(pdu, &mut self.info, subscription, registry); // Check if, after actions update, the framework/subscriptions no longer require // receiving reassembled traffic - if !self.info.actions.parse_any() && - !self.info.actions.update_pdu_reassembled() { + if !self.info.actions.reassemble() { // Safe to discard out-of-order buffers if tcp_conn.ctos.ooo_buf.len() != 0 { tcp_conn.ctos.ooo_buf.buf.clear(); diff --git a/core/src/conntrack/mod.rs b/core/src/conntrack/mod.rs index 6457279b..4e350a0b 100644 --- a/core/src/conntrack/mod.rs +++ b/core/src/conntrack/mod.rs @@ -100,18 +100,13 @@ where return; } let pdu = L4Pdu::new(mbuf, ctxt, dir); - if conn.info.actions.update_pdu() { - conn.info.sdata.update(&pdu, false); - } + conn.info.update_sdata(&pdu, subscription, false); // Consume PDU for reassembly or parsing - if conn.info.actions.parse_any() || - conn.info.actions.update_pdu_reassembled() { + if conn.info.actions.reassemble() { conn.update(pdu, subscription, &self.registry); } else { // Ensure FIN is handled, if appl. conn.update_tcp_flags(pdu.flags(), pdu.dir); - // Handle packet delivery to subscription, if appl. - conn.info.release_mbuf(pdu.mbuf_own(), subscription); } // Delete stale data for connections no longer matching @@ -143,15 +138,9 @@ where }; if let Ok(mut conn) = conn { conn.info.filter_first_packet(&pdu, subscription); - if conn.info.actions.update_pdu() { - conn.info.sdata.update(&pdu, false); - } - if conn.info.actions.parse_any() || - conn.info.actions.update_pdu_reassembled() { + conn.info.update_sdata(&pdu, subscription, false); + if conn.info.actions.reassemble() { conn.info.consume_pdu(pdu, subscription, &self.registry); - } else { - // Handle packet delivery, if applicable - conn.info.release_mbuf(pdu.mbuf_own(), subscription); } if !conn.remove_from_table() { self.timerwheel.insert( diff --git a/core/src/filter/actions.rs b/core/src/filter/actions.rs index 0ba8f773..5dee6edc 100644 --- a/core/src/filter/actions.rs +++ b/core/src/filter/actions.rs @@ -28,8 +28,12 @@ pub enum ActionData { /// - All other filters: post-reassembly PacketDeliver, - /// Store future packets in this connection in tracked data - /// TCP packets are tracked post-reassembly + /// Store packets in this connection in tracked data for + /// potential future delivery. Used on a non-terminal match + /// for a packet-level datatype. + PacketCache, + /// Store packets in this connection in tracked data for a + /// datatype that requires tracking and delivering packets. PacketTrack, // Connection/session actions // @@ -48,8 +52,9 @@ pub enum ActionData { /// The subscribable type "update" methods should be invoked (for TCP: pre-reassembly) UpdatePDU, + /// The subscribable type "update" methods should be invoked post-reassembly (TCP only) - ReassembledUpdatePDU, + Reassemble, /// Deliver connection data (via the ConnectionDelivery filter) when it terminates ConnDeliver, @@ -120,15 +125,26 @@ impl Actions { self.data.intersects(ActionData::UpdatePDU) } - /// Conn tracker must deliver PDU to tracked data after reassembly - pub(crate) fn update_pdu_reassembled(&self) -> bool { - self.data.intersects(ActionData::ReassembledUpdatePDU) + /// True if the connection needs to be reassembled + pub(crate) fn reassemble(&self) -> bool { + self.data.intersects(ActionData::Reassemble) || self.parse_any() + } + + /// True if the framework should buffer mbufs for this connection, + /// either for future delivery (Cache) or for a datatype that requires + /// tracking packets. + #[inline] + pub(crate) fn buffer_packet(&self, reassembled: bool) -> bool { + match reassembled { + true => self.data.intersects(ActionData::PacketTrack), + false => self.data.intersects(ActionData::PacketTrack | ActionData::PacketCache) + } + } - /// True if the framework should track (buffer) mbufs for this connection #[inline] - pub(crate) fn buffer_frame(&self) -> bool { - self.data.intersects(ActionData::PacketTrack) + pub(crate) fn cache_packet(&self) -> bool { + self.data.intersects(ActionData::PacketCache) } /// True if application-layer probing or parsing should be applied @@ -283,8 +299,9 @@ impl FromStr for ActionData { "SessionDeliver" => Ok(ActionData::SessionDeliver), "SessionTrack" => Ok(ActionData::SessionTrack), "UpdatePDU" => Ok(ActionData::UpdatePDU), - "ReassembledUpdatePDU" => Ok(ActionData::ReassembledUpdatePDU), + "Reassemble" => Ok(ActionData::Reassemble), "PacketTrack" => Ok(ActionData::PacketTrack), + "PacketCache" => Ok(ActionData::PacketCache), "ConnDeliver" => Ok(ActionData::ConnDeliver), _ => Result::Err(core::fmt::Error), } @@ -302,8 +319,9 @@ impl fmt::Display for ActionData { ActionData::SessionDeliver => "SessionDeliver", ActionData::SessionTrack => "SessionTrack", ActionData::UpdatePDU => "UpdatePDU", - ActionData::ReassembledUpdatePDU => "ReassembledUpdatePDU", + ActionData::Reassemble => "Reassemble", ActionData::PacketTrack => "PacketTrack", + ActionData::PacketCache => "PacketCache", ActionData::ConnDeliver => "ConnDeliver", _ => panic!("Unknown ActionData"), }; diff --git a/core/src/filter/datatypes.rs b/core/src/filter/datatypes.rs index d9876199..e9a98d3e 100644 --- a/core/src/filter/datatypes.rs +++ b/core/src/filter/datatypes.rs @@ -55,8 +55,10 @@ pub struct DataType { pub track_sessions: bool, /// True if the datatype requires invoking `update` method before reassembly pub needs_update: bool, - /// True if the datatype requires invoking `update` method after reassembly - pub needs_update_reassembled: bool, + /// True if the datatype requires reassembly (for `track_packets` or `update`) + pub needs_reassembly: bool, + /// True if the datatype requires tracking packets + pub needs_packet_track: bool, /// A vector of the application-layer parsers required by this datatype /// Retina loads the union of parsers required by all datatypes and filters pub stream_protos: Vec<&'static str>, @@ -73,7 +75,8 @@ impl DataType { needs_parse: false, track_sessions: false, needs_update: true, - needs_update_reassembled: false, + needs_reassembly: false, + needs_packet_track: false, stream_protos: vec![], as_str, } @@ -87,7 +90,8 @@ impl DataType { needs_parse: true, track_sessions: false, needs_update: false, - needs_update_reassembled: false, + needs_reassembly: false, + needs_packet_track: false, stream_protos, as_str, } @@ -101,7 +105,8 @@ impl DataType { needs_parse: false, track_sessions: false, needs_update: false, - needs_update_reassembled: false, + needs_reassembly: false, + needs_packet_track: false, stream_protos: vec![], as_str, } @@ -115,19 +120,21 @@ impl DataType { needs_parse: false, track_sessions: false, needs_update: false, - needs_update_reassembled: false, + needs_reassembly: false, + needs_packet_track: false, stream_protos: vec![], as_str, } } - pub fn new_default_pktlist(as_str: &'static str, reassembly: bool) -> Self { + pub fn new_default_pktlist(as_str: &'static str, needs_reassembly: bool) -> Self { DataType { level: Level::Connection, needs_parse: false, track_sessions: false, - needs_update: !reassembly, - needs_update_reassembled: reassembly, + needs_update: false, + needs_reassembly, + needs_packet_track: true, stream_protos: vec![], as_str, } @@ -207,10 +214,15 @@ impl DataType { actions.if_matched.terminal_actions |= ActionData::UpdatePDU; actions.if_matching.data |= ActionData::UpdatePDU; } - if self.needs_update_reassembled { - actions.if_matched.data |= ActionData::ReassembledUpdatePDU; - actions.if_matched.terminal_actions |= ActionData::ReassembledUpdatePDU; - actions.if_matching.data |= ActionData::ReassembledUpdatePDU; + if self.needs_reassembly { + actions.if_matched.data |= ActionData::Reassemble; + actions.if_matched.terminal_actions |= ActionData::Reassemble; + actions.if_matching.data |= ActionData::Reassemble; + } + if self.needs_packet_track { + actions.if_matched.data |= ActionData::PacketTrack; + actions.if_matched.terminal_actions |= ActionData::PacketTrack; + actions.if_matching.data |= ActionData::PacketTrack; } } @@ -242,7 +254,7 @@ impl DataType { // and (2) tracked until then. if matches!(self.level, Level::Packet) { assert!(matches!(sub_level, Level::Packet)); - actions.if_matching.data |= ActionData::PacketTrack; + actions.if_matching.data |= ActionData::PacketCache; // Matched packet-level subscription is delivered in filter } @@ -283,7 +295,7 @@ impl DataType { actions.if_matched.data |= ActionData::PacketDeliver; actions.if_matched.terminal_actions |= ActionData::PacketDeliver; // Track in case of match in next filter - actions.if_matching.data |= ActionData::PacketTrack; + actions.if_matching.data |= ActionData::PacketCache; } // Connection- and session-level subscriptions depend on the actions required @@ -591,6 +603,6 @@ mod tests { let mut spec = SubscriptionSpec::new(String::from(""), String::from("cb")); spec.add_datatype(DataType::new_default_packet("Packet")); assert!(spec.proto_filter().if_matched.packet_deliver()); - assert!(spec.proto_filter().if_matching.buffer_frame()); + assert!(spec.proto_filter().if_matching.cache_packet()); } } diff --git a/core/src/filter/ptree.rs b/core/src/filter/ptree.rs index a6228eb7..6b461985 100644 --- a/core/src/filter/ptree.rs +++ b/core/src/filter/ptree.rs @@ -982,7 +982,7 @@ mod tests { // Remaining: eth -> tcp, udp assert!(ptree.size == 4); expected_actions.clear(); - expected_actions.data = ActionData::ProtoFilter | ActionData::PacketTrack; + expected_actions.data = ActionData::ProtoFilter | ActionData::PacketCache; assert!(ptree.actions == expected_actions); let mut ptree = PTree::new_empty(FilterLayer::PacketContinue); @@ -1003,7 +1003,7 @@ mod tests { let mut ptree = PTree::new_empty(FilterLayer::Packet); ptree.add_filter(&filter.get_patterns_flat(), &datatype, &DELIVER); let mut expected_actions = Actions::new(); - expected_actions.data |= ActionData::PacketTrack | ActionData::ProtoFilter; + expected_actions.data |= ActionData::PacketCache | ActionData::ProtoFilter; assert!(ptree.actions == expected_actions); } diff --git a/core/src/subscription/mod.rs b/core/src/subscription/mod.rs index e1c709c8..38de90e6 100644 --- a/core/src/subscription/mod.rs +++ b/core/src/subscription/mod.rs @@ -29,13 +29,19 @@ pub trait Trackable { fn track_session(&mut self, session: Session); /// Store packets for (possible) future delivery - fn track_packet(&mut self, mbuf: Mbuf); + fn buffer_packet(&mut self, pdu: &L4Pdu, actions: &Actions, reassembled: bool); - /// Get reference to stored packets + /// Get reference to stored packets (those buffered for delivery) fn packets(&self) -> &Vec; - /// Drain vector of mbufs - fn drain_packets(&mut self); + /// Drain data from all types that require storing packets + /// Can help free mbufs for future use + fn drain_tracked_packets(&mut self); + + /// Drain data from packets cached for future potential delivery + /// Used after these packets have been delivered or when associated + /// subscription fails to match + fn drain_cached_packets(&mut self); /// Return the core ID that this tracked conn. is on fn core_id(&self) -> &CoreId; diff --git a/datatypes/src/lib.rs b/datatypes/src/lib.rs index 687df939..c0d49882 100644 --- a/datatypes/src/lib.rs +++ b/datatypes/src/lib.rs @@ -103,7 +103,7 @@ pub trait PacketList { /// New packet in connection received (or reassembled, if reassembled=true) /// Note this may be invoked both pre- and post-reassembly; types /// should check `reassembled` to avoid double-counting. - fn update(&mut self, pdu: &L4Pdu, reassembled: bool); + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool); /// Clear internal data; called if connection no longer matches filter /// that requires the Tracked type. fn clear(&mut self); diff --git a/datatypes/src/packet_list.rs b/datatypes/src/packet_list.rs index 59867edb..bb557b74 100644 --- a/datatypes/src/packet_list.rs +++ b/datatypes/src/packet_list.rs @@ -108,7 +108,7 @@ impl PacketList for BidirPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !reassembled { self.push(pdu); } @@ -154,7 +154,7 @@ impl PacketList for OrigPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if pdu.dir && !reassembled { self.push(pdu); } @@ -201,7 +201,7 @@ impl PacketList for RespPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !pdu.dir && !reassembled { self.push(pdu); } @@ -245,7 +245,7 @@ impl PacketList for OrigPktsReassembled { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if pdu.dir && reassembled { self.push(pdu); } @@ -289,7 +289,7 @@ impl PacketList for RespPktsReassembled { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !pdu.dir && reassembled { self.push(pdu); } @@ -314,7 +314,7 @@ impl PacketList for BidirZcPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !reassembled { self.packets.push(Mbuf::new_ref(&pdu.mbuf)); } @@ -340,7 +340,7 @@ impl PacketList for OrigZcPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !reassembled && pdu.dir { self.packets.push(Mbuf::new_ref(&pdu.mbuf)); } @@ -368,7 +368,7 @@ impl PacketList for RespZcPktStream { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if !reassembled && !pdu.dir { self.packets.push(Mbuf::new_ref(&pdu.mbuf)); } @@ -392,7 +392,7 @@ impl PacketList for OrigZcPktsReassembled { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if reassembled && pdu.dir { self.packets.push(Mbuf::new_ref(&pdu.mbuf)); } @@ -416,7 +416,7 @@ impl PacketList for RespZcPktsReassembled { } } - fn update(&mut self, pdu: &L4Pdu, reassembled: bool) { + fn track_packet(&mut self, pdu: &L4Pdu, reassembled: bool) { if reassembled && !pdu.dir { self.packets.push(Mbuf::new_ref(&pdu.mbuf)); } diff --git a/datatypes/src/typedefs.rs b/datatypes/src/typedefs.rs index ac04aefb..a07e2ffb 100644 --- a/datatypes/src/typedefs.rs +++ b/datatypes/src/typedefs.rs @@ -56,7 +56,8 @@ lazy_static! { needs_parse: true, track_sessions: true, needs_update: false, - needs_update_reassembled: false, + needs_reassembly: false, + needs_packet_track: false, stream_protos: vec!["tls", "dns", "http", "quic"], as_str: "SessionList", } diff --git a/filtergen/src/data.rs b/filtergen/src/data.rs index 427aa524..955a7a95 100644 --- a/filtergen/src/data.rs +++ b/filtergen/src/data.rs @@ -10,9 +10,11 @@ use crate::SubscriptionConfig; pub(crate) struct TrackedDataBuilder { update: Vec, + track_packet: Vec, struct_def: Vec, new: Vec, clear: Vec, + pkts_clear: Vec, stream_protocols: HashSet<&'static str>, datatypes: HashSet<&'static str>, } @@ -21,9 +23,11 @@ impl TrackedDataBuilder { pub(crate) fn new(subscribed_data: &SubscriptionConfig) -> Self { let mut ret = Self { update: vec![], + track_packet: vec![], struct_def: vec![], new: vec![], clear: vec![], + pkts_clear: vec![], stream_protocols: HashSet::new(), datatypes: HashSet::new(), }; @@ -62,6 +66,12 @@ impl TrackedDataBuilder { self.update .push(quote! { self.#field_name.update(pdu, reassembled); }); } + + if datatype.needs_packet_track { + self.track_packet + .push(quote! { self.#field_name.track_packet(pdu, reassembled); }); + self.pkts_clear.push(quote! { self.#field_name.clear(); }); + } } } self.print(); @@ -92,8 +102,10 @@ impl TrackedDataBuilder { pub(crate) fn tracked(&mut self) -> proc_macro2::TokenStream { let def = std::mem::take(&mut self.struct_def); let update = std::mem::take(&mut self.update); + let track_packet = std::mem::take(&mut self.track_packet); let new = std::mem::take(&mut self.new); let clear = std::mem::take(&mut self.clear); + let pkts_clear = std::mem::take(&mut self.pkts_clear); let mut conn_parsers: Vec = vec![]; for datatype in &self.stream_protocols { @@ -132,20 +144,32 @@ impl TrackedDataBuilder { &self.core_id } - fn track_packet(&mut self, mbuf: retina_core::Mbuf) { - self.mbufs.push(mbuf); + fn buffer_packet(&mut self, pdu: &retina_core::L4Pdu, actions: &Actions, + reassembled: bool) { + if !reassembled && + actions.data.intersects(ActionData::PacketCache) { + self.mbufs.push(retina_core::Mbuf::new_ref(&pdu.mbuf)); + } + if actions.data.intersects(ActionData::PacketTrack) { + #( #track_packet )* + } } fn packets(&self) -> &Vec { &self.mbufs } - fn drain_packets(&mut self) { + fn drain_cached_packets(&mut self) { self.mbufs = vec![]; } + fn drain_tracked_packets(&mut self) { + #( #pkts_clear )* + } + fn clear(&mut self) { - self.drain_packets(); + self.drain_tracked_packets(); + self.drain_cached_packets(); self.sessions = vec![]; #( #clear )* } From 6764ee8837619d0593136d1b5546c81db3304f15 Mon Sep 17 00:00:00 2001 From: Thea Rossman Date: Sun, 1 Dec 2024 17:32:42 -0800 Subject: [PATCH 5/5] cargo fmt, clippy --- core/src/conntrack/conn/conn_info.rs | 14 ++++++++------ core/src/conntrack/conn/tcp_conn/mod.rs | 10 ++++------ core/src/filter/actions.rs | 5 +++-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/conntrack/conn/conn_info.rs b/core/src/conntrack/conn/conn_info.rs index 1972515d..70f7b4d2 100644 --- a/core/src/conntrack/conn/conn_info.rs +++ b/core/src/conntrack/conn/conn_info.rs @@ -50,12 +50,15 @@ where } #[inline] - pub(crate) fn update_sdata(&mut self, pdu: &L4Pdu, - subscription: &Subscription, - reassembled: bool) { + pub(crate) fn update_sdata( + &mut self, + pdu: &L4Pdu, + subscription: &Subscription, + reassembled: bool, + ) { // Typically use for calculating connection metrics if self.actions.update_pdu() { - self.sdata.update(&pdu, reassembled); + self.sdata.update(pdu, reassembled); } // Used for non-terminal matches on packet datatypes (`PacketCache` action, // pre-reassembly only) and for datatypes that require tracking packets @@ -67,8 +70,7 @@ where // delivered 1x (before reassembly). if !reassembled && self.actions.packet_deliver() { // Delivering all remaining packets in connection - subscription.deliver_packet(pdu.mbuf_ref(), - &self.cdata, &self.sdata); + subscription.deliver_packet(pdu.mbuf_ref(), &self.cdata, &self.sdata); } } diff --git a/core/src/conntrack/conn/tcp_conn/mod.rs b/core/src/conntrack/conn/tcp_conn/mod.rs index 7e02734c..f0157b05 100644 --- a/core/src/conntrack/conn/tcp_conn/mod.rs +++ b/core/src/conntrack/conn/tcp_conn/mod.rs @@ -45,12 +45,10 @@ impl TcpConn { #[inline] pub(crate) fn is_terminated(&self) -> bool { // Both sides have sent, reassembled, and acknowledged FIN, or RST has been sent - (self.ctos.consumed_flags & self.stoc.consumed_flags & FIN != 0 && - self.ctos.last_ack == self.stoc.next_seq && - self.stoc.last_ack == self.ctos.next_seq) || - (self.ctos.consumed_flags & RST - | self.stoc.consumed_flags & RST) - != 0 + (self.ctos.consumed_flags & self.stoc.consumed_flags & FIN != 0 + && self.ctos.last_ack == self.stoc.next_seq + && self.stoc.last_ack == self.ctos.next_seq) + || (self.ctos.consumed_flags & RST | self.stoc.consumed_flags & RST) != 0 } /// Updates connection termination flags diff --git a/core/src/filter/actions.rs b/core/src/filter/actions.rs index 5dee6edc..4da24b26 100644 --- a/core/src/filter/actions.rs +++ b/core/src/filter/actions.rs @@ -137,9 +137,10 @@ impl Actions { pub(crate) fn buffer_packet(&self, reassembled: bool) -> bool { match reassembled { true => self.data.intersects(ActionData::PacketTrack), - false => self.data.intersects(ActionData::PacketTrack | ActionData::PacketCache) + false => self + .data + .intersects(ActionData::PacketTrack | ActionData::PacketCache), } - } #[inline]