From e1040204384d3429e5242a8e99e618c78f348e68 Mon Sep 17 00:00:00 2001 From: Dan Jenkins Date: Fri, 9 Aug 2024 16:08:05 +0100 Subject: [PATCH 1/2] Add Bytes Sent and Received to the CP stats --- agent_stats.go | 4 ++-- candidatepair.go | 14 ++++++++++++++ transport.go | 5 +++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/agent_stats.go b/agent_stats.go index 035c652b..7e7b6382 100644 --- a/agent_stats.go +++ b/agent_stats.go @@ -22,8 +22,8 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats { Nominated: cp.nominated, // PacketsSent uint32 // PacketsReceived uint32 - // BytesSent uint64 - // BytesReceived uint64 + BytesSent: cp.BytesSent(), + BytesReceived: cp.BytesReceived(), // LastPacketSentTimestamp time.Time // LastPacketReceivedTimestamp time.Time // FirstRequestTimestamp time.Time diff --git a/candidatepair.go b/candidatepair.go index 93470fec..60e09584 100644 --- a/candidatepair.go +++ b/candidatepair.go @@ -5,6 +5,7 @@ package ice import ( "fmt" + "sync/atomic" "github.com/pion/stun/v2" ) @@ -21,6 +22,8 @@ func newCandidatePair(local, remote Candidate, controlling bool) *CandidatePair // CandidatePair is a combination of a // local and remote candidate type CandidatePair struct { + bytesReceived uint64 + bytesSent uint64 iceRoleControlling bool Remote Candidate Local Candidate @@ -90,7 +93,18 @@ func (p *CandidatePair) priority() uint64 { return (1<<32-1)*min(g, d) + 2*max(g, d) + cmp(g, d) } +// BytesSent returns the number of bytes sent +func (p *CandidatePair) BytesSent() uint64 { + return atomic.LoadUint64(&p.bytesSent) +} + +// BytesReceived returns the number of bytes received +func (p *CandidatePair) BytesReceived() uint64 { + return atomic.LoadUint64(&p.bytesReceived) +} + func (p *CandidatePair) Write(b []byte) (int, error) { + atomic.AddUint64(&p.bytesSent, uint64(len(b))) return p.Local.writeTo(b, p.Remote) } diff --git a/transport.go b/transport.go index f800152d..4fa1d5ea 100644 --- a/transport.go +++ b/transport.go @@ -74,6 +74,11 @@ func (c *Conn) Read(p []byte) (int, error) { } n, err := c.agent.buf.Read(p) + pair := c.agent.getSelectedPair() + if pair != nil { + atomic.AddUint64(&pair.bytesReceived, uint64(n)) + } + atomic.AddUint64(&c.bytesReceived, uint64(n)) return n, err } From 2be4d380458f58b0d0f06ff7baf7787bb92d341d Mon Sep 17 00:00:00 2001 From: Dan Jenkins Date: Mon, 12 Aug 2024 16:32:02 +0100 Subject: [PATCH 2/2] start expanding out packets and bytes stats --- agent_handlers.go | 1 + agent_stats.go | 8 ++++---- candidate_base.go | 12 ++++++++++++ candidatepair.go | 12 ++++++++++++ transport.go | 27 ++++++++++++++++++--------- 5 files changed, 47 insertions(+), 13 deletions(-) diff --git a/agent_handlers.go b/agent_handlers.go index 0c02277b..ed98c02b 100644 --- a/agent_handlers.go +++ b/agent_handlers.go @@ -29,6 +29,7 @@ func (a *Agent) onSelectedCandidatePairChange(p *CandidatePair) { if h, ok := a.onSelectedCandidatePairChangeHdlr.Load().(func(Candidate, Candidate)); ok { h(p.Local, p.Remote) } + //add to the counter for conn.selectedCandidatePairChanges } func (a *Agent) onCandidate(c Candidate) { diff --git a/agent_stats.go b/agent_stats.go index 7e7b6382..752aefda 100644 --- a/agent_stats.go +++ b/agent_stats.go @@ -20,10 +20,10 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats { RemoteCandidateID: cp.Remote.ID(), State: cp.state, Nominated: cp.nominated, - // PacketsSent uint32 - // PacketsReceived uint32 - BytesSent: cp.BytesSent(), - BytesReceived: cp.BytesReceived(), + PacketsSent: cp.PacketsSent(), + PacketsReceived: cp.PacketsReceived(), + BytesSent: cp.BytesSent(), + BytesReceived: cp.BytesReceived(), // LastPacketSentTimestamp time.Time // LastPacketReceivedTimestamp time.Time // FirstRequestTimestamp time.Time diff --git a/candidate_base.go b/candidate_base.go index e1fbfe11..9cdfeeb5 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -287,6 +287,12 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { c.addRemoteCandidateCache(remoteCandidate, srcAddr) } + pair := a.getSelectedPair() + if pair != nil && pair.Local.ID() == c.id { + atomic.AddUint64(&pair.bytesReceived, uint64(len(buf))) + atomic.AddUint32(&pair.packetsReceived, uint32(1)) + } + // Note: This will return packetio.ErrFull if the buffer ever manages to fill up. if _, err := a.buf.Write(buf); err != nil { a.log.Warnf("Failed to write packet: %s", err) @@ -334,6 +340,8 @@ func (c *candidateBase) close() error { func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) { n, err := c.conn.WriteTo(raw, dst.addr()) if err != nil { + //packetsDiscardedOnSend + //bytesDiscardedOnSend // If the connection is closed, we should return the error if errors.Is(err, io.ErrClosedPipe) { return n, err @@ -342,6 +350,10 @@ func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) { return n, nil } c.seen(true) + pair := c.agent().getSelectedPair() + if pair != nil && pair.Local.ID() == c.id { + atomic.AddUint32(&pair.packetsSent, uint32(1)) + } return n, nil } diff --git a/candidatepair.go b/candidatepair.go index 60e09584..dc9fd319 100644 --- a/candidatepair.go +++ b/candidatepair.go @@ -24,6 +24,8 @@ func newCandidatePair(local, remote Candidate, controlling bool) *CandidatePair type CandidatePair struct { bytesReceived uint64 bytesSent uint64 + packetsReceived uint32 + packetsSent uint32 iceRoleControlling bool Remote Candidate Local Candidate @@ -103,6 +105,16 @@ func (p *CandidatePair) BytesReceived() uint64 { return atomic.LoadUint64(&p.bytesReceived) } +// PacketsSent returns the number of bytes sent +func (p *CandidatePair) PacketsSent() uint32 { + return atomic.LoadUint32(&p.packetsSent) +} + +// PacketsReceived returns the number of bytes received +func (p *CandidatePair) PacketsReceived() uint32 { + return atomic.LoadUint32(&p.packetsReceived) +} + func (p *CandidatePair) Write(b []byte) (int, error) { atomic.AddUint64(&p.bytesSent, uint64(len(b))) return p.Local.writeTo(b, p.Remote) diff --git a/transport.go b/transport.go index 4fa1d5ea..b99a0809 100644 --- a/transport.go +++ b/transport.go @@ -27,9 +27,12 @@ func (a *Agent) Accept(ctx context.Context, remoteUfrag, remotePwd string) (*Con // Conn represents the ICE connection. // At the moment the lifetime of the Conn is equal to the Agent. type Conn struct { - bytesReceived uint64 - bytesSent uint64 - agent *Agent + bytesReceived uint64 + bytesSent uint64 + packetsReceived uint32 + packetsSent uint32 + //selectedCandidatePairChanges uint32 + agent *Agent } // BytesSent returns the number of bytes sent @@ -42,6 +45,16 @@ func (c *Conn) BytesReceived() uint64 { return atomic.LoadUint64(&c.bytesReceived) } +// PacketsSent returns the number of bytes sent +func (c *Conn) PacketsSent() uint32 { + return atomic.LoadUint32(&c.packetsSent) +} + +// PacketsReceived returns the number of bytes received +func (c *Conn) PacketsReceived() uint32 { + return atomic.LoadUint32(&c.packetsReceived) +} + func (a *Agent) connect(ctx context.Context, isControlling bool, remoteUfrag, remotePwd string) (*Conn, error) { err := a.loop.Err() if err != nil { @@ -74,11 +87,7 @@ func (c *Conn) Read(p []byte) (int, error) { } n, err := c.agent.buf.Read(p) - pair := c.agent.getSelectedPair() - if pair != nil { - atomic.AddUint64(&pair.bytesReceived, uint64(n)) - } - + atomic.AddUint32(&c.packetsReceived, uint32(1)) atomic.AddUint64(&c.bytesReceived, uint64(n)) return n, err } @@ -106,7 +115,7 @@ func (c *Conn) Write(p []byte) (int, error) { return 0, err } } - + atomic.AddUint32(&c.packetsSent, uint32(1)) atomic.AddUint64(&c.bytesSent, uint64(len(p))) return pair.Write(p) }