Skip to content

Commit 0ad5b09

Browse files
committed
Add BatchIO and load balance on ports to UDPMux
Improve performance of UDPMux by BatchIO and load balance on ports
1 parent 0ec2333 commit 0ad5b09

File tree

6 files changed

+255
-55
lines changed

6 files changed

+255
-55
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/pion/mdns v0.0.7
1111
github.com/pion/randutil v0.1.0
1212
github.com/pion/stun v0.6.1
13-
github.com/pion/transport/v2 v2.2.1
13+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0
1414
github.com/pion/turn/v2 v2.1.3
1515
github.com/stretchr/testify v1.8.4
1616
golang.org/x/net v0.13.0

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB
1919
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
2020
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
2121
github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc=
22-
github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c=
2322
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
23+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0 h1:7z51t0GDPVHvR8KTnVfUGgeE0KqZvc9o5J3UMVMrykY=
24+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
2425
github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA=
2526
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
2627
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

udp_mux.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type UDPMux interface {
2323
GetConn(ufrag string, addr net.Addr) (net.PacketConn, error)
2424
RemoveConnByUfrag(ufrag string)
2525
GetListenAddresses() []net.Addr
26+
ConnCount() int
2627
}
2728

2829
// UDPMuxDefault is an implementation of the interface
@@ -176,6 +177,13 @@ func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, er
176177
return c, nil
177178
}
178179

180+
// ConnCount return count of working connections created by UDPMuxDefault
181+
func (m *UDPMuxDefault) ConnCount() int {
182+
m.mu.Lock()
183+
defer m.mu.Unlock()
184+
return len(m.connsIPv4) + len(m.connsIPv6)
185+
}
186+
179187
// RemoveConnByUfrag stops and removes the muxed packet connection
180188
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
181189
removedConns := make([]*udpMuxedConn, 0, 2)

udp_mux_multi.go

Lines changed: 143 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ package ice
66
import (
77
"fmt"
88
"net"
9+
"time"
910

1011
"github.com/pion/logging"
1112
"github.com/pion/transport/v2"
1213
"github.com/pion/transport/v2/stdnet"
14+
tudp "github.com/pion/transport/v2/udp"
1315
)
1416

1517
// MultiUDPMuxDefault implements both UDPMux and AllConnsGetter,
@@ -18,20 +20,92 @@ import (
1820
type MultiUDPMuxDefault struct {
1921
muxes []UDPMux
2022
localAddrToMux map[string]UDPMux
23+
24+
enablePortBalance bool
25+
// Manage port balance for mux that listen on multiple ports for same IP,
26+
// for each IP, only return one addr (one port) for each GetListenAddresses call to
27+
// avoid duplicate ip candidates be gathered for a single ice agent.
28+
multiPortsAddresses []*multiPortsAddress
29+
}
30+
31+
type addrMux struct {
32+
addr net.Addr
33+
mux UDPMux
34+
}
35+
36+
// each multiPortsAddress represents muxes listen on different ports of a same IP
37+
type multiPortsAddress struct {
38+
addresseMuxes []*addrMux
39+
}
40+
41+
func (mpa *multiPortsAddress) next() net.Addr {
42+
leastAddr, leastConns := mpa.addresseMuxes[0].addr, mpa.addresseMuxes[0].mux.ConnCount()
43+
for i := 1; i < len(mpa.addresseMuxes); i++ {
44+
am := mpa.addresseMuxes[i]
45+
if count := am.mux.ConnCount(); count < leastConns {
46+
leastConns = count
47+
leastAddr = am.addr
48+
}
49+
}
50+
return leastAddr
51+
}
52+
53+
// MultiUDPMuxOption provide options for NewMultiUDPMuxDefault
54+
type MultiUDPMuxOption func(*multipleUDPMuxDefaultParams)
55+
56+
// MultiUDPMuxOptionWithPortBalance enables load balancing traffic on multiple ports belonging to the same IP
57+
// When enabled, GetListenAddresses will return the port with the least number of connections for each corresponding IP
58+
func MultiUDPMuxOptionWithPortBalance() MultiUDPMuxOption {
59+
return func(params *multipleUDPMuxDefaultParams) {
60+
params.portBalance = true
61+
}
62+
}
63+
64+
type multipleUDPMuxDefaultParams struct {
65+
portBalance bool
2166
}
2267

2368
// NewMultiUDPMuxDefault creates an instance of MultiUDPMuxDefault that
2469
// uses the provided UDPMux instances.
2570
func NewMultiUDPMuxDefault(muxes ...UDPMux) *MultiUDPMuxDefault {
71+
return NewMultiUDPMuxDefaultWithOptions(muxes)
72+
}
73+
74+
// NewMultiUDPMuxDefaultWithOptions creates an instance of MultiUDPMuxDefault that
75+
// uses the provided UDPMux instances and options.
76+
func NewMultiUDPMuxDefaultWithOptions(muxes []UDPMux, opts ...MultiUDPMuxOption) *MultiUDPMuxDefault {
77+
var params multipleUDPMuxDefaultParams
78+
for _, opt := range opts {
79+
opt(&params)
80+
}
81+
2682
addrToMux := make(map[string]UDPMux)
83+
ipToAddrs := make(map[string]*multiPortsAddress)
2784
for _, mux := range muxes {
2885
for _, addr := range mux.GetListenAddresses() {
2986
addrToMux[addr.String()] = mux
87+
88+
udpAddr, _ := addr.(*net.UDPAddr)
89+
ip := udpAddr.IP.String()
90+
if mpa, ok := ipToAddrs[ip]; ok {
91+
mpa.addresseMuxes = append(mpa.addresseMuxes, &addrMux{addr, mux})
92+
} else {
93+
ipToAddrs[ip] = &multiPortsAddress{
94+
addresseMuxes: []*addrMux{{addr, mux}},
95+
}
96+
}
3097
}
3198
}
99+
100+
multiPortsAddresses := make([]*multiPortsAddress, 0, len(ipToAddrs))
101+
for _, mpa := range ipToAddrs {
102+
multiPortsAddresses = append(multiPortsAddresses, mpa)
103+
}
32104
return &MultiUDPMuxDefault{
33-
muxes: muxes,
34-
localAddrToMux: addrToMux,
105+
muxes: muxes,
106+
localAddrToMux: addrToMux,
107+
multiPortsAddresses: multiPortsAddresses,
108+
enablePortBalance: params.portBalance,
35109
}
36110
}
37111

@@ -45,6 +119,15 @@ func (m *MultiUDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketCon
45119
return mux.GetConn(ufrag, addr)
46120
}
47121

122+
// ConnCount return count of working connections created by the mux.
123+
func (m *MultiUDPMuxDefault) ConnCount() int {
124+
var count int
125+
for _, mux := range m.muxes {
126+
count += mux.ConnCount()
127+
}
128+
return count
129+
}
130+
48131
// RemoveConnByUfrag stops and removes the muxed packet connection
49132
// from all underlying UDPMux instances.
50133
func (m *MultiUDPMuxDefault) RemoveConnByUfrag(ufrag string) {
@@ -64,8 +147,18 @@ func (m *MultiUDPMuxDefault) Close() error {
64147
return err
65148
}
66149

67-
// GetListenAddresses returns the list of addresses that this mux is listening on
150+
// GetListenAddresses returns the list of addresses that this mux is listening on,
151+
// if port balance enabled and there are multiple muxes listening to different ports of the same IP addr,
152+
// it will return the mux that has the least number of connections.
68153
func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
154+
if m.enablePortBalance {
155+
addrs := make([]net.Addr, 0, len(m.multiPortsAddresses))
156+
for _, mpa := range m.multiPortsAddresses {
157+
addrs = append(addrs, mpa.next())
158+
}
159+
return addrs
160+
}
161+
69162
addrs := make([]net.Addr, 0, len(m.localAddrToMux))
70163
for _, mux := range m.muxes {
71164
addrs = append(addrs, mux.GetListenAddresses()...)
@@ -76,6 +169,12 @@ func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
76169
// NewMultiUDPMuxFromPort creates an instance of MultiUDPMuxDefault that
77170
// listen all interfaces on the provided port.
78171
func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
172+
return NewMultiUDPMuxFromPorts([]int{port}, opts...)
173+
}
174+
175+
// NewMultiUDPMuxFromPorts creates an instance of MultiUDPMuxDefault that
176+
// listens to all interfaces and balances traffic on the provided ports.
177+
func NewMultiUDPMuxFromPorts(ports []int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
79178
params := multiUDPMuxFromPortParam{
80179
networks: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
81180
}
@@ -95,20 +194,29 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
95194
return nil, err
96195
}
97196

98-
conns := make([]net.PacketConn, 0, len(ips))
197+
conns := make([]net.PacketConn, 0, len(ports)*len(ips))
99198
for _, ip := range ips {
100-
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
101-
if listenErr != nil {
102-
err = listenErr
103-
break
104-
}
105-
if params.readBufferSize > 0 {
106-
_ = conn.SetReadBuffer(params.readBufferSize)
199+
for _, port := range ports {
200+
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
201+
if listenErr != nil {
202+
err = listenErr
203+
break
204+
}
205+
if params.readBufferSize > 0 {
206+
_ = conn.SetReadBuffer(params.readBufferSize)
207+
}
208+
if params.writeBufferSize > 0 {
209+
_ = conn.SetWriteBuffer(params.writeBufferSize)
210+
}
211+
if params.batchWriteSize > 0 {
212+
conns = append(conns, tudp.NewBatchConn(conn, params.batchWriteSize, params.batchWriteInterval))
213+
} else {
214+
conns = append(conns, conn)
215+
}
107216
}
108-
if params.writeBufferSize > 0 {
109-
_ = conn.SetWriteBuffer(params.writeBufferSize)
217+
if err != nil {
218+
break
110219
}
111-
conns = append(conns, conn)
112220
}
113221

114222
if err != nil {
@@ -128,7 +236,7 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
128236
muxes = append(muxes, mux)
129237
}
130238

131-
return NewMultiUDPMuxDefault(muxes...), nil
239+
return NewMultiUDPMuxDefaultWithOptions(muxes, MultiUDPMuxOptionWithPortBalance()), nil
132240
}
133241

134242
// UDPMuxFromPortOption provide options for NewMultiUDPMuxFromPort
@@ -137,14 +245,16 @@ type UDPMuxFromPortOption interface {
137245
}
138246

139247
type multiUDPMuxFromPortParam struct {
140-
ifFilter func(string) bool
141-
ipFilter func(ip net.IP) bool
142-
networks []NetworkType
143-
readBufferSize int
144-
writeBufferSize int
145-
logger logging.LeveledLogger
146-
includeLoopback bool
147-
net transport.Net
248+
ifFilter func(string) bool
249+
ipFilter func(ip net.IP) bool
250+
networks []NetworkType
251+
readBufferSize int
252+
writeBufferSize int
253+
logger logging.LeveledLogger
254+
includeLoopback bool
255+
net transport.Net
256+
batchWriteSize int
257+
batchWriteInterval time.Duration
148258
}
149259

150260
type udpMuxFromPortOption struct {
@@ -226,3 +336,13 @@ func UDPMuxFromPortWithNet(n transport.Net) UDPMuxFromPortOption {
226336
},
227337
}
228338
}
339+
340+
// UDPMuxFromPortWithBatchWrite enable batch write for UDPMux
341+
func UDPMuxFromPortWithBatchWrite(batchWriteSize int, batchWriteInterval time.Duration) UDPMuxFromPortOption {
342+
return &udpMuxFromPortOption{
343+
f: func(p *multiUDPMuxFromPortParam) {
344+
p.batchWriteSize = batchWriteSize
345+
p.batchWriteInterval = batchWriteInterval
346+
},
347+
}
348+
}

0 commit comments

Comments
 (0)