Skip to content

Commit efe2551

Browse files
committed
Add BatchIO and multiple ports options to UDPMux
Add BatchIO and multiple ports options to NewMultiUDPMuxPort(s)
1 parent 0ec2333 commit efe2551

File tree

6 files changed

+256
-55
lines changed

6 files changed

+256
-55
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/pion/mdns v0.0.7
1111
github.com/pion/randutil v0.1.0
1212
github.com/pion/stun v0.6.1
13-
github.com/pion/transport/v2 v2.2.1
13+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0
1414
github.com/pion/turn/v2 v2.1.3
1515
github.com/stretchr/testify v1.8.4
1616
golang.org/x/net v0.13.0

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB
1919
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
2020
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
2121
github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc=
22-
github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c=
2322
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
23+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0 h1:7z51t0GDPVHvR8KTnVfUGgeE0KqZvc9o5J3UMVMrykY=
24+
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
2425
github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA=
2526
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
2627
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

udp_mux.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type UDPMux interface {
2323
GetConn(ufrag string, addr net.Addr) (net.PacketConn, error)
2424
RemoveConnByUfrag(ufrag string)
2525
GetListenAddresses() []net.Addr
26+
ConnCount() int
2627
}
2728

2829
// UDPMuxDefault is an implementation of the interface
@@ -176,6 +177,13 @@ func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, er
176177
return c, nil
177178
}
178179

180+
// ConnCount return count of working connections created by UDPMuxDefault
181+
func (m *UDPMuxDefault) ConnCount() int {
182+
m.mu.Lock()
183+
defer m.mu.Unlock()
184+
return len(m.connsIPv4) + len(m.connsIPv6)
185+
}
186+
179187
// RemoveConnByUfrag stops and removes the muxed packet connection
180188
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
181189
removedConns := make([]*udpMuxedConn, 0, 2)

udp_mux_multi.go

Lines changed: 144 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ package ice
66
import (
77
"fmt"
88
"net"
9+
"time"
910

1011
"github.com/pion/logging"
1112
"github.com/pion/transport/v2"
1213
"github.com/pion/transport/v2/stdnet"
14+
tudp "github.com/pion/transport/v2/udp"
1315
)
1416

1517
// MultiUDPMuxDefault implements both UDPMux and AllConnsGetter,
@@ -18,20 +20,93 @@ import (
1820
type MultiUDPMuxDefault struct {
1921
muxes []UDPMux
2022
localAddrToMux map[string]UDPMux
23+
24+
enablePortBalance bool
25+
// Manage port balance for mux that listen on multiple ports for same IP,
26+
// for each IP, only return one addr (one port) for each GetListenAddresses call to
27+
// avoid duplicate ip candidates be gathered for a single ice agent.
28+
multiPortsAddresses []*multiPortsAddress
29+
}
30+
31+
type addrMux struct {
32+
addr net.Addr
33+
mux UDPMux
34+
}
35+
36+
// each multiPortsAddress represents muxes listen on different ports of a same IP
37+
type multiPortsAddress struct {
38+
addresseMuxes []*addrMux
39+
}
40+
41+
func (mpa *multiPortsAddress) next() net.Addr {
42+
leastAddr, leastConns := mpa.addresseMuxes[0].addr, mpa.addresseMuxes[0].mux.ConnCount()
43+
for i := 1; i < len(mpa.addresseMuxes); i++ {
44+
am := mpa.addresseMuxes[i]
45+
if count := am.mux.ConnCount(); count < leastConns {
46+
leastConns = count
47+
leastAddr = am.addr
48+
}
49+
}
50+
return leastAddr
51+
}
52+
53+
// MultiUDPMuxOption provide options for NewMultiUDPMuxDefault
54+
type MultiUDPMuxOption func(*multipleUDPMuxDefaultParams)
55+
56+
// MultiUDPMuxOptionWithPortBalance enable traffic balance on ports that belongs to same IP address,
57+
// that means the MultiUDPMuxDefault will return a single port that has least connection for each IP address
58+
// in GetListenAddresses return.
59+
func MultiUDPMuxOptionWithPortBalance() MultiUDPMuxOption {
60+
return func(params *multipleUDPMuxDefaultParams) {
61+
params.portBalance = true
62+
}
63+
}
64+
65+
type multipleUDPMuxDefaultParams struct {
66+
portBalance bool
2167
}
2268

2369
// NewMultiUDPMuxDefault creates an instance of MultiUDPMuxDefault that
2470
// uses the provided UDPMux instances.
2571
func NewMultiUDPMuxDefault(muxes ...UDPMux) *MultiUDPMuxDefault {
72+
return NewMultiUDPMuxDefaultWithOptions(muxes)
73+
}
74+
75+
// NewMultiUDPMuxDefaultWithOptions creates an instance of MultiUDPMuxDefault that
76+
// uses the provided UDPMux instances and options.
77+
func NewMultiUDPMuxDefaultWithOptions(muxes []UDPMux, opts ...MultiUDPMuxOption) *MultiUDPMuxDefault {
78+
var params multipleUDPMuxDefaultParams
79+
for _, opt := range opts {
80+
opt(&params)
81+
}
82+
2683
addrToMux := make(map[string]UDPMux)
84+
ipToAddrs := make(map[string]*multiPortsAddress)
2785
for _, mux := range muxes {
2886
for _, addr := range mux.GetListenAddresses() {
2987
addrToMux[addr.String()] = mux
88+
89+
udpAddr, _ := addr.(*net.UDPAddr)
90+
ip := udpAddr.IP.String()
91+
if mpa, ok := ipToAddrs[ip]; ok {
92+
mpa.addresseMuxes = append(mpa.addresseMuxes, &addrMux{addr, mux})
93+
} else {
94+
ipToAddrs[ip] = &multiPortsAddress{
95+
addresseMuxes: []*addrMux{{addr, mux}},
96+
}
97+
}
3098
}
3199
}
100+
101+
multiPortsAddresses := make([]*multiPortsAddress, 0, len(ipToAddrs))
102+
for _, mpa := range ipToAddrs {
103+
multiPortsAddresses = append(multiPortsAddresses, mpa)
104+
}
32105
return &MultiUDPMuxDefault{
33-
muxes: muxes,
34-
localAddrToMux: addrToMux,
106+
muxes: muxes,
107+
localAddrToMux: addrToMux,
108+
multiPortsAddresses: multiPortsAddresses,
109+
enablePortBalance: params.portBalance,
35110
}
36111
}
37112

@@ -45,6 +120,15 @@ func (m *MultiUDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketCon
45120
return mux.GetConn(ufrag, addr)
46121
}
47122

123+
// ConnCount return count of working connections created by the mux.
124+
func (m *MultiUDPMuxDefault) ConnCount() int {
125+
var count int
126+
for _, mux := range m.muxes {
127+
count += mux.ConnCount()
128+
}
129+
return count
130+
}
131+
48132
// RemoveConnByUfrag stops and removes the muxed packet connection
49133
// from all underlying UDPMux instances.
50134
func (m *MultiUDPMuxDefault) RemoveConnByUfrag(ufrag string) {
@@ -64,8 +148,18 @@ func (m *MultiUDPMuxDefault) Close() error {
64148
return err
65149
}
66150

67-
// GetListenAddresses returns the list of addresses that this mux is listening on
151+
// GetListenAddresses returns the list of addresses that this mux is listening on,
152+
// if port balance enabled and there are multiple mux listen on different ports of a same IP addr,
153+
// will return the mux who has least connections of that IP addr.
68154
func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
155+
if m.enablePortBalance {
156+
addrs := make([]net.Addr, 0, len(m.multiPortsAddresses))
157+
for _, mpa := range m.multiPortsAddresses {
158+
addrs = append(addrs, mpa.next())
159+
}
160+
return addrs
161+
}
162+
69163
addrs := make([]net.Addr, 0, len(m.localAddrToMux))
70164
for _, mux := range m.muxes {
71165
addrs = append(addrs, mux.GetListenAddresses()...)
@@ -76,6 +170,12 @@ func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
76170
// NewMultiUDPMuxFromPort creates an instance of MultiUDPMuxDefault that
77171
// listen all interfaces on the provided port.
78172
func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
173+
return NewMultiUDPMuxFromPorts([]int{port}, opts...)
174+
}
175+
176+
// NewMultiUDPMuxFromPorts creates an instance of MultiUDPMuxDefault that
177+
// listen all interfaces and balance traffic on the provided ports.
178+
func NewMultiUDPMuxFromPorts(ports []int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
79179
params := multiUDPMuxFromPortParam{
80180
networks: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
81181
}
@@ -95,20 +195,29 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
95195
return nil, err
96196
}
97197

98-
conns := make([]net.PacketConn, 0, len(ips))
198+
conns := make([]net.PacketConn, 0, len(ports)*len(ips))
99199
for _, ip := range ips {
100-
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
101-
if listenErr != nil {
102-
err = listenErr
103-
break
104-
}
105-
if params.readBufferSize > 0 {
106-
_ = conn.SetReadBuffer(params.readBufferSize)
200+
for _, port := range ports {
201+
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
202+
if listenErr != nil {
203+
err = listenErr
204+
break
205+
}
206+
if params.readBufferSize > 0 {
207+
_ = conn.SetReadBuffer(params.readBufferSize)
208+
}
209+
if params.writeBufferSize > 0 {
210+
_ = conn.SetWriteBuffer(params.writeBufferSize)
211+
}
212+
if params.batchWriteSize > 0 {
213+
conns = append(conns, tudp.NewBatchConn(conn, params.batchWriteSize, params.batchWriteInterval))
214+
} else {
215+
conns = append(conns, conn)
216+
}
107217
}
108-
if params.writeBufferSize > 0 {
109-
_ = conn.SetWriteBuffer(params.writeBufferSize)
218+
if err != nil {
219+
break
110220
}
111-
conns = append(conns, conn)
112221
}
113222

114223
if err != nil {
@@ -128,7 +237,7 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
128237
muxes = append(muxes, mux)
129238
}
130239

131-
return NewMultiUDPMuxDefault(muxes...), nil
240+
return NewMultiUDPMuxDefaultWithOptions(muxes, MultiUDPMuxOptionWithPortBalance()), nil
132241
}
133242

134243
// UDPMuxFromPortOption provide options for NewMultiUDPMuxFromPort
@@ -137,14 +246,16 @@ type UDPMuxFromPortOption interface {
137246
}
138247

139248
type multiUDPMuxFromPortParam struct {
140-
ifFilter func(string) bool
141-
ipFilter func(ip net.IP) bool
142-
networks []NetworkType
143-
readBufferSize int
144-
writeBufferSize int
145-
logger logging.LeveledLogger
146-
includeLoopback bool
147-
net transport.Net
249+
ifFilter func(string) bool
250+
ipFilter func(ip net.IP) bool
251+
networks []NetworkType
252+
readBufferSize int
253+
writeBufferSize int
254+
logger logging.LeveledLogger
255+
includeLoopback bool
256+
net transport.Net
257+
batchWriteSize int
258+
batchWriteInterval time.Duration
148259
}
149260

150261
type udpMuxFromPortOption struct {
@@ -226,3 +337,13 @@ func UDPMuxFromPortWithNet(n transport.Net) UDPMuxFromPortOption {
226337
},
227338
}
228339
}
340+
341+
// UDPMuxFromPortWithBatchWrite enable batch write for UDPMux
342+
func UDPMuxFromPortWithBatchWrite(batchWriteSize int, batchWriteInterval time.Duration) UDPMuxFromPortOption {
343+
return &udpMuxFromPortOption{
344+
f: func(p *multiUDPMuxFromPortParam) {
345+
p.batchWriteSize = batchWriteSize
346+
p.batchWriteInterval = batchWriteInterval
347+
},
348+
}
349+
}

0 commit comments

Comments
 (0)