Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of UDPMux: Add BatchIO and multiple ports options #608

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/pion/mdns v0.0.8
github.com/pion/randutil v0.1.0
github.com/pion/stun v0.6.1
github.com/pion/transport/v2 v2.2.1
github.com/pion/transport/v2 v2.2.2
github.com/pion/turn/v2 v2.1.3
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.14.0
Expand Down
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.2 h1:yv+EKSU2dpmInuCebQ1rsBFCYL7p+aV90xIlshSBO+A=
github.com/pion/transport/v2 v2.2.2/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA=
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -37,6 +38,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand All @@ -48,6 +50,7 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -62,6 +65,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand All @@ -70,13 +74,15 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
12 changes: 12 additions & 0 deletions udp_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type UDPMux interface {
GetListenAddresses() []net.Addr
}

// MuxConnCount return count of working connections created by the mux.
type MuxConnCount interface {
ConnCount() int
cnderrauber marked this conversation as resolved.
Show resolved Hide resolved
}

// UDPMuxDefault is an implementation of the interface
type UDPMuxDefault struct {
params UDPMuxParams
Expand Down Expand Up @@ -176,6 +181,13 @@ func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, er
return c, nil
}

// ConnCount returns count of working connections created by UDPMuxDefault
func (m *UDPMuxDefault) ConnCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.connsIPv4) + len(m.connsIPv6)
}

// RemoveConnByUfrag stops and removes the muxed packet connection
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
removedConns := make([]*udpMuxedConn, 0, 2)
Expand Down
180 changes: 156 additions & 24 deletions udp_mux_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,130 @@
package ice

import (
"errors"
"fmt"
"net"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v2"
"github.com/pion/transport/v2/stdnet"
tudp "github.com/pion/transport/v2/udp"
)

var errPortBalanceRequireConnCount = errors.New("port balancing requires a UDPMux which implements MuxConnCount interface")

// MultiUDPMuxDefault implements both UDPMux and AllConnsGetter,
// allowing users to pass multiple UDPMux instances to the ICE agent
// configuration.
type MultiUDPMuxDefault struct {
muxes []UDPMux
localAddrToMux map[string]UDPMux

enablePortBalance bool
// Manage port balance for mux that listen on multiple ports for same IP,
// for each IP, only return one addr (one port) for each GetListenAddresses call to
// avoid duplicate ip candidates be gathered for a single ice agent.
multiPortsAddresses []*multiPortsAddress
}

type addrMux struct {
addr net.Addr
mux MuxConnCount
}

// Each multiPortsAddress represents muxes listen on different ports of a same IP
type multiPortsAddress struct {
addresseMuxes []*addrMux
}

func (mpa *multiPortsAddress) next() net.Addr {
leastAddr, leastConns := mpa.addresseMuxes[0].addr, mpa.addresseMuxes[0].mux.ConnCount()
for i := 1; i < len(mpa.addresseMuxes); i++ {
am := mpa.addresseMuxes[i]
if count := am.mux.ConnCount(); count < leastConns {
leastConns = count
leastAddr = am.addr
}
}
return leastAddr
}

// MultiUDPMuxOption provide options for NewMultiUDPMuxDefault
type MultiUDPMuxOption func(*multipleUDPMuxDefaultParams)

// MultiUDPMuxOptionWithPortBalance enables load balancing traffic on multiple ports belonging to the same IP
// When enabled, GetListenAddresses will return the port with the least number of connections for each corresponding IP
func MultiUDPMuxOptionWithPortBalance() MultiUDPMuxOption {
return func(params *multipleUDPMuxDefaultParams) {
params.portBalance = true
}
}

type multipleUDPMuxDefaultParams struct {
portBalance bool
}

// NewMultiUDPMuxDefault creates an instance of MultiUDPMuxDefault that
// uses the provided UDPMux instances.
func NewMultiUDPMuxDefault(muxes ...UDPMux) *MultiUDPMuxDefault {
mux, err := NewMultiUDPMuxDefaultWithOptions(muxes)
// The error should always be nil as no options given to NewMultiUDPMuxDefaultWithOptions that it
// only return error when port balance enabled but mux don't support MuxConnCount.
if err != nil {
//nolint:forbidigo
panic(err)
}
return mux
}

// NewMultiUDPMuxDefaultWithOptions creates an instance of MultiUDPMuxDefault that
// uses the provided UDPMux instances and options.
func NewMultiUDPMuxDefaultWithOptions(muxes []UDPMux, opts ...MultiUDPMuxOption) (*MultiUDPMuxDefault, error) {
var params multipleUDPMuxDefaultParams
for _, opt := range opts {
opt(&params)
}

if params.portBalance {
for _, mux := range muxes {
if _, ok := mux.(MuxConnCount); !ok {
return nil, errPortBalanceRequireConnCount
}
}
}

addrToMux := make(map[string]UDPMux)
ipToAddrs := make(map[string]*multiPortsAddress)
for _, mux := range muxes {
for _, addr := range mux.GetListenAddresses() {
addrToMux[addr.String()] = mux

if params.portBalance {
muxCount, _ := mux.(MuxConnCount)
udpAddr, _ := addr.(*net.UDPAddr)
ip := udpAddr.IP.String()
if mpa, ok := ipToAddrs[ip]; ok {
mpa.addresseMuxes = append(mpa.addresseMuxes, &addrMux{addr, muxCount})
} else {
ipToAddrs[ip] = &multiPortsAddress{
addresseMuxes: []*addrMux{{addr, muxCount}},
}
}
}
}
}
return &MultiUDPMuxDefault{
muxes: muxes,
localAddrToMux: addrToMux,

multiPortsAddresses := make([]*multiPortsAddress, 0, len(ipToAddrs))
for _, mpa := range ipToAddrs {
multiPortsAddresses = append(multiPortsAddresses, mpa)
}
return &MultiUDPMuxDefault{
muxes: muxes,
localAddrToMux: addrToMux,
multiPortsAddresses: multiPortsAddresses,
enablePortBalance: params.portBalance,
}, nil
}

// GetConn returns a PacketConn given the connection's ufrag and network
Expand Down Expand Up @@ -64,8 +159,18 @@ func (m *MultiUDPMuxDefault) Close() error {
return err
}

// GetListenAddresses returns the list of addresses that this mux is listening on
// GetListenAddresses returns the list of addresses that this mux is listening on,
// if port balance enabled and there are multiple muxes listening to different ports of the same IP addr,
// it will return the mux that has the least number of connections.
func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
if m.enablePortBalance {
addrs := make([]net.Addr, 0, len(m.multiPortsAddresses))
for _, mpa := range m.multiPortsAddresses {
addrs = append(addrs, mpa.next())
}
return addrs
}

addrs := make([]net.Addr, 0, len(m.localAddrToMux))
for _, mux := range m.muxes {
addrs = append(addrs, mux.GetListenAddresses()...)
Expand All @@ -76,6 +181,12 @@ func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
// NewMultiUDPMuxFromPort creates an instance of MultiUDPMuxDefault that
// listen all interfaces on the provided port.
func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
return NewMultiUDPMuxFromPorts([]int{port}, opts...)
}

// NewMultiUDPMuxFromPorts creates an instance of MultiUDPMuxDefault that
// listens to all interfaces and balances traffic on the provided ports.
func NewMultiUDPMuxFromPorts(ports []int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
params := multiUDPMuxFromPortParam{
networks: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
}
Expand All @@ -95,20 +206,29 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
return nil, err
}

conns := make([]net.PacketConn, 0, len(ips))
conns := make([]net.PacketConn, 0, len(ports)*len(ips))
for _, ip := range ips {
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
if listenErr != nil {
err = listenErr
break
for _, port := range ports {
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
if listenErr != nil {
err = listenErr
break
}
if params.readBufferSize > 0 {
_ = conn.SetReadBuffer(params.readBufferSize)
}
if params.writeBufferSize > 0 {
_ = conn.SetWriteBuffer(params.writeBufferSize)
}
if params.batchWriteSize > 0 {
conns = append(conns, tudp.NewBatchConn(conn, params.batchWriteSize, params.batchWriteInterval))
stv0g marked this conversation as resolved.
Show resolved Hide resolved
} else {
conns = append(conns, conn)
}
}
if params.readBufferSize > 0 {
_ = conn.SetReadBuffer(params.readBufferSize)
}
if params.writeBufferSize > 0 {
_ = conn.SetWriteBuffer(params.writeBufferSize)
if err != nil {
break
}
conns = append(conns, conn)
}

if err != nil {
Expand All @@ -128,7 +248,7 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
muxes = append(muxes, mux)
}

return NewMultiUDPMuxDefault(muxes...), nil
return NewMultiUDPMuxDefaultWithOptions(muxes, MultiUDPMuxOptionWithPortBalance())
}

// UDPMuxFromPortOption provide options for NewMultiUDPMuxFromPort
Expand All @@ -137,14 +257,16 @@ type UDPMuxFromPortOption interface {
}

type multiUDPMuxFromPortParam struct {
ifFilter func(string) bool
ipFilter func(ip net.IP) bool
networks []NetworkType
readBufferSize int
writeBufferSize int
logger logging.LeveledLogger
includeLoopback bool
net transport.Net
ifFilter func(string) bool
ipFilter func(ip net.IP) bool
networks []NetworkType
readBufferSize int
writeBufferSize int
logger logging.LeveledLogger
includeLoopback bool
net transport.Net
batchWriteSize int
batchWriteInterval time.Duration
}

type udpMuxFromPortOption struct {
Expand Down Expand Up @@ -226,3 +348,13 @@ func UDPMuxFromPortWithNet(n transport.Net) UDPMuxFromPortOption {
},
}
}

// UDPMuxFromPortWithBatchWrite enable batch write for UDPMux
func UDPMuxFromPortWithBatchWrite(batchWriteSize int, batchWriteInterval time.Duration) UDPMuxFromPortOption {
return &udpMuxFromPortOption{
f: func(p *multiUDPMuxFromPortParam) {
p.batchWriteSize = batchWriteSize
p.batchWriteInterval = batchWriteInterval
},
}
}
Loading