Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT - Adding more metrics into the Candidate Pair Stats / Transport Stats #721

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (a *Agent) onSelectedCandidatePairChange(p *CandidatePair) {
if h, ok := a.onSelectedCandidatePairChangeHdlr.Load().(func(Candidate, Candidate)); ok {
h(p.Local, p.Remote)
}
//add to the counter for conn.selectedCandidatePairChanges
}

func (a *Agent) onCandidate(c Candidate) {
Expand Down
8 changes: 4 additions & 4 deletions agent_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats {
RemoteCandidateID: cp.Remote.ID(),
State: cp.state,
Nominated: cp.nominated,
// PacketsSent uint32
// PacketsReceived uint32
// BytesSent uint64
// BytesReceived uint64
PacketsSent: cp.PacketsSent(),
PacketsReceived: cp.PacketsReceived(),
BytesSent: cp.BytesSent(),
BytesReceived: cp.BytesReceived(),
// LastPacketSentTimestamp time.Time
// LastPacketReceivedTimestamp time.Time
// FirstRequestTimestamp time.Time
Expand Down
12 changes: 12 additions & 0 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
c.addRemoteCandidateCache(remoteCandidate, srcAddr)
}

pair := a.getSelectedPair()
if pair != nil && pair.Local.ID() == c.id {
atomic.AddUint64(&pair.bytesReceived, uint64(len(buf)))
atomic.AddUint32(&pair.packetsReceived, uint32(1))
}

// Note: This will return packetio.ErrFull if the buffer ever manages to fill up.
if _, err := a.buf.Write(buf); err != nil {
a.log.Warnf("Failed to write packet: %s", err)
Expand Down Expand Up @@ -334,6 +340,8 @@ func (c *candidateBase) close() error {
func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) {
n, err := c.conn.WriteTo(raw, dst.addr())
if err != nil {
//packetsDiscardedOnSend
//bytesDiscardedOnSend
// If the connection is closed, we should return the error
if errors.Is(err, io.ErrClosedPipe) {
return n, err
Expand All @@ -342,6 +350,10 @@ func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) {
return n, nil
}
c.seen(true)
pair := c.agent().getSelectedPair()
if pair != nil && pair.Local.ID() == c.id {
atomic.AddUint32(&pair.packetsSent, uint32(1))
}
return n, nil
}

Expand Down
26 changes: 26 additions & 0 deletions candidatepair.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ice

import (
"fmt"
"sync/atomic"

"github.com/pion/stun/v2"
)
Expand All @@ -21,6 +22,10 @@ func newCandidatePair(local, remote Candidate, controlling bool) *CandidatePair
// CandidatePair is a combination of a
// local and remote candidate
type CandidatePair struct {
bytesReceived uint64
bytesSent uint64
packetsReceived uint32
packetsSent uint32
iceRoleControlling bool
Remote Candidate
Local Candidate
Expand Down Expand Up @@ -90,7 +95,28 @@ func (p *CandidatePair) priority() uint64 {
return (1<<32-1)*min(g, d) + 2*max(g, d) + cmp(g, d)
}

// BytesSent returns the number of bytes sent
func (p *CandidatePair) BytesSent() uint64 {
return atomic.LoadUint64(&p.bytesSent)
}

// BytesReceived returns the number of bytes received
func (p *CandidatePair) BytesReceived() uint64 {
return atomic.LoadUint64(&p.bytesReceived)
}

// PacketsSent returns the number of bytes sent
func (p *CandidatePair) PacketsSent() uint32 {
return atomic.LoadUint32(&p.packetsSent)
}

// PacketsReceived returns the number of bytes received
func (p *CandidatePair) PacketsReceived() uint32 {
return atomic.LoadUint32(&p.packetsReceived)
}

func (p *CandidatePair) Write(b []byte) (int, error) {
atomic.AddUint64(&p.bytesSent, uint64(len(b)))
return p.Local.writeTo(b, p.Remote)
}

Expand Down
22 changes: 18 additions & 4 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ func (a *Agent) Accept(ctx context.Context, remoteUfrag, remotePwd string) (*Con
// Conn represents the ICE connection.
// At the moment the lifetime of the Conn is equal to the Agent.
type Conn struct {
bytesReceived uint64
bytesSent uint64
agent *Agent
bytesReceived uint64
bytesSent uint64
packetsReceived uint32
packetsSent uint32
//selectedCandidatePairChanges uint32
agent *Agent
}

// BytesSent returns the number of bytes sent
Expand All @@ -42,6 +45,16 @@ func (c *Conn) BytesReceived() uint64 {
return atomic.LoadUint64(&c.bytesReceived)
}

// PacketsSent returns the number of bytes sent
func (c *Conn) PacketsSent() uint32 {
return atomic.LoadUint32(&c.packetsSent)
}

// PacketsReceived returns the number of bytes received
func (c *Conn) PacketsReceived() uint32 {
return atomic.LoadUint32(&c.packetsReceived)
}

func (a *Agent) connect(ctx context.Context, isControlling bool, remoteUfrag, remotePwd string) (*Conn, error) {
err := a.loop.Err()
if err != nil {
Expand Down Expand Up @@ -74,6 +87,7 @@ func (c *Conn) Read(p []byte) (int, error) {
}

n, err := c.agent.buf.Read(p)
atomic.AddUint32(&c.packetsReceived, uint32(1))
atomic.AddUint64(&c.bytesReceived, uint64(n))
return n, err
}
Expand Down Expand Up @@ -101,7 +115,7 @@ func (c *Conn) Write(p []byte) (int, error) {
return 0, err
}
}

atomic.AddUint32(&c.packetsSent, uint32(1))
atomic.AddUint64(&c.bytesSent, uint64(len(p)))
return pair.Write(p)
}
Expand Down