-
Notifications
You must be signed in to change notification settings - Fork 1
/
local.go
233 lines (186 loc) · 5.97 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Copyright 2018 Dan Jacques. All rights reserved.
// Use of this source code is governed under the MIT License
// that can be found in the LICENSE file.
package device
import (
"fmt"
"net"
"sync"
"github.com/danjacques/gopushpixels/protocol"
"github.com/danjacques/gopushpixels/support/bufferpool"
"github.com/danjacques/gopushpixels/support/fmtutil"
"github.com/danjacques/gopushpixels/support/logging"
"github.com/danjacques/gopushpixels/support/network"
"github.com/pkg/errors"
)
// Local is a local "virtual' device. A Local allows your local system to
// instantiate its own devices. Local can be useful for testing and the
// simluation of devices.
//
// Local's exported fields must not be changed after Start is called.
type Local struct {
// The local device's ID.
DeviceID string
// OnPacketData is wthe callback that is called when new packet data is
// received.
//
// OnPacketData must not be nil.
//
// The packet data is owned by a bufferpool.Pool. Recipients of the buffer may
// Retain it and Release it to prevent it from reentering the pool. The buffer
// that is handed to the callback is automatically Released when the callback
// returns; the callback SHOULD NOT release the buffer.
OnPacketData func(buf *bufferpool.Buffer)
// UDPPacketPool, if not nil, is the packet pool to use for UDP packet data.
//
// If nil, a local packet pool will be generated and used.
UDPPacketPool *bufferpool.Pool
// Logger, if not nil, is the logger to use to log events.
//
// Changes to Logger after Start is called will have no effect.
Logger logging.L
// logger is the resolved logger on Start.
logger logging.L
// addr is the address of the remote device.
addr *net.UDPAddr
conn *net.UDPConn
// doneC is used to implement DoneC().
doneC chan struct{}
// monitoring is the device's monitoring state.
monitoring Monitoring
// udpPacketPool is a pool of buffers to use and reuse for UDP packet data.
packetPool *bufferpool.Pool
// listenDoneC is used to signal that our listen goroutine has finished.
listenDoneC chan struct{}
// mu protects the following data.
mu sync.RWMutex
// dh is the set of retained DiscoveryHeaders.
dh *protocol.DiscoveryHeaders
}
var _ D = (*Local)(nil)
// Start finishes device setup and begins listening for packets.
//
// The returned device presumes ownership over conn, and will close it when
// closed.
func (d *Local) Start(conn *net.UDPConn) {
switch {
case d.conn != nil:
panic("already started")
case d.OnPacketData == nil:
panic("no OnPacketData callback defined")
}
// Resolve our logger.
d.logger = logging.Must(d.Logger)
d.conn = conn
d.addr = conn.LocalAddr().(*net.UDPAddr)
d.doneC = make(chan struct{})
// Configure our packet buffer pool.
d.packetPool = d.UDPPacketPool
if d.packetPool == nil {
d.packetPool = &bufferpool.Pool{
Size: network.MaxUDPSize,
}
}
// Listen for packets to this Local in a separate goroutine.
d.listenDoneC = make(chan struct{})
go d.listenForPackets()
// Update monitoring information.
d.monitoring.Update(d)
}
// Close closes the Local, freeing its remote connection resource and marking it
// Done.
//
// After Close has returned, no more packet callbacks will be sent.
func (d *Local) Close() error {
// Clouse our done channel. This notifies our callers that we have finished,
// and our internal listener goroutine to stop.
close(d.doneC)
// Close our connection. This should cause our listener goroutine to break
// out of any blocking read calls.
err := d.conn.Close()
// Wait for our listener goroutine to complete.
<-d.listenDoneC
// Update monitoring information.
d.monitoring.Update(d)
return err
}
// UpdateHeaders sets the base discovery headers to use for this device.
//
// These headers will be updated internally to include the local device address
// information.
//
// UpdateHeaders must be called at least once before DiscoveryHeaders is
// invoked, ideally at setup.
func (d *Local) UpdateHeaders(dh *protocol.DiscoveryHeaders) {
d.mu.Lock()
defer d.mu.Unlock()
// Clone the provided headers.
d.dh = dh.Clone()
// Fill in our address and port.
d.dh.SetIP4Address(d.addr.IP)
d.dh.PixelPusher.MyPort = uint16(d.addr.Port)
// Update monitoring information.
d.monitoring.Update(d)
}
// String implements D.
func (d *Local) String() string { return fmt.Sprintf("Local{%s}", d.addr.String()) }
// ID implements D.
func (d *Local) ID() string { return d.DeviceID }
// Ordinal implements D.
//
// The Local is not part of any ordinal group.
func (d *Local) Ordinal() Ordinal { return InvalidOrdinal() }
// Sender implements D.
func (d *Local) Sender() (Sender, error) {
return nil, errors.New("local device Sender is not supported")
}
// DiscoveryHeaders implements D.
func (d *Local) DiscoveryHeaders() *protocol.DiscoveryHeaders {
d.mu.RLock()
defer d.mu.RUnlock()
if d.dh == nil {
return &protocol.DiscoveryHeaders{}
}
return d.dh
}
// DoneC implements D.
func (d *Local) DoneC() <-chan struct{} { return d.doneC }
// Addr implements D.
//
// Addr will always be a *net.UDPAddr.
func (d *Local) Addr() net.Addr { return d.addr }
// Info implements D.
func (d *Local) Info() Info { return Info{} }
// ListenForPackets listens on D's address for packets, sending all received
// packets to d's callback.
func (d *Local) listenForPackets() {
defer close(d.listenDoneC)
for {
// If we've been closed, then we're done.
select {
case <-d.doneC:
return
default:
}
// Listen for an incoming packet.
buf := d.packetPool.Get()
size, _, _, addr, err := d.conn.ReadMsgUDP(buf.Bytes(), nil)
if err != nil {
// TODO: Log error?
continue
}
buf.Truncate(size)
d.logger.Debugf("Received packet from %s (%d byte(s)) on %s:\n%s",
addr, size, d.DeviceID, fmtutil.Hex(buf.Bytes()))
d.dispatchPacketToCallback(buf)
}
}
func (d *Local) dispatchPacketToCallback(buf *bufferpool.Buffer) {
defer buf.Release()
defer func() {
if err := recover(); err != nil {
d.logger.Warnf("Dropping panic in callback: %s", err)
}
}()
d.OnPacketData(buf)
}