-
Notifications
You must be signed in to change notification settings - Fork 1
/
remote.go
384 lines (325 loc) · 10.1 KB
/
remote.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
// Copyright 2018 Dan Jacques. All rights reserved.
// Use of this source code is governed under the MIT License
// that can be found in the LICENSE file.
package device
import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/danjacques/gopushpixels/protocol"
"github.com/danjacques/gopushpixels/support/logging"
"github.com/danjacques/gopushpixels/support/network"
"github.com/pkg/errors"
)
// remoteDeviceState is the state held by a remoteDevice. This state
// will be updated atomically within the device.
type remoteDeviceState struct {
headers *protocol.DiscoveryHeaders
addr net.Addr
observed time.Time
}
// Remote is a device implementation for a remote PixelPusher device.
//
// Remote is typically backed by a discovered device's headers. In this case,
// Remote is a fully-functional local stub for that device and its most recent
// state.
//
// Remote may also be constructed directly as a stub with MakeRemoteStub. In
// this case, it will not have access to any headers, and is used solely as a
// local stub to the remote device.
type Remote struct {
// Logger, if not nil, is the logger that this device and its supporting
// constructs will use.
Logger logging.L
// We lock around these headers. They can be updated any time by a call
// to "observe".
state atomic.Value
// closed when this device's discovery expires.
doneC chan struct{}
doneOnce sync.Once
// id is the device's self-reported ID, derived from its hardware address.
id string
// monitoring is the device's monitoring state.
monitoring Monitoring
// dispatcherMu controls the creation of our dispatcher singleton.
dispatcherMu sync.Mutex
// dispatcher is a singleton connection owned by this device. A dispatcher is
// created when the first Sender is created and destroyed when the device is
// marked Done.
//
// The Sender returned by the Remote's Sender method will use dispatcher to
// perform its higher-level packet writing functionality. The dispatcher will
// own its own connection to the device, which will be shut down when the
// dispatcher is destroyed.
//
// It is responsible for interfacing between users (who send packets) and the
// remote system, potentially shaping, optimizing, or throttling packets as
// appropriate.
//
// A dispatcher fills the higher-level packet aspect of the Sender interface.
//
// dispatcher must be safe for concurrent use.
dispatcher *packetDispatcher
infoMu sync.Mutex
// info is the latest device information.
info Info
// createTime is the time when this device was created.
createdTime time.Time
}
var remoteDeviceType = &Remote{}
var _ D = (*Remote)(nil)
// MakeRemote initializes a Remote device instance.
//
// The device must not be used until it has been observed via Observe(), at
// which point it will become fully active and valid.
func MakeRemote(id string, dh *protocol.DiscoveryHeaders) *Remote {
d := makeRemoteDevice(id)
d.UpdateHeaders(d.createdTime, dh)
return d
}
// MakeRemoteStub constructs a new Remote device without requiring a full
// set of headers.
//
// MakeRemoteStub can be used to communicate with devices at known addresses.
func MakeRemoteStub(id string, addr *net.UDPAddr) *Remote {
d := makeRemoteDevice(id)
d.setState(&remoteDeviceState{
headers: nil,
addr: addr,
observed: time.Now(),
})
return d
}
func makeRemoteDevice(id string) *Remote {
return &Remote{
doneC: make(chan struct{}),
id: id,
createdTime: time.Now(),
}
}
// UpdateHeaders live-updates this device's headers.
//
// This can be used to update an instance of the device that has been observed
// with a new set of headers (e.g., via discovery).
func (d *Remote) UpdateHeaders(now time.Time, dh *protocol.DiscoveryHeaders) {
d.setState(&remoteDeviceState{
headers: dh,
addr: dh.Addr(),
observed: now,
})
d.monitoring.Update(d)
}
func (d *Remote) setState(rds *remoteDeviceState) { d.state.Store(rds) }
func (d *Remote) getState() *remoteDeviceState {
return d.state.Load().(*remoteDeviceState)
}
func (d *Remote) String() string {
st := d.getState()
if st.headers == nil {
return fmt.Sprintf("%q @%v", d.id, st.addr)
}
return fmt.Sprintf("%q @%v (%v)", d.id, st.addr, st.headers.DeviceType)
}
// ID implements D.
func (d *Remote) ID() string { return d.id }
// Ordinal implements D.
func (d *Remote) Ordinal() Ordinal {
st := d.getState()
var ord Ordinal
if st.headers != nil {
if pp := st.headers.PixelPusher; pp != nil {
ord.Group = int(pp.GroupOrdinal)
ord.Controller = int(pp.ControllerOrdinal)
}
}
return ord
}
// Sender implements D.
func (d *Remote) Sender() (Sender, error) {
// Make sure that we have an active Dispatcher, and retain it.
disp, err := d.ensureAndRetainDispatcher()
if err != nil {
return nil, err
}
// Create a base (raw) connection to our underlying device.
//
// The dispatcher's reference will be Released when the Sender is Closed.
var s Sender
s = &remoteSender{
packetDispatcher: disp,
d: d,
}
s = MonitorSender(d, s)
return s, nil
}
func (d *Remote) ensureAndRetainDispatcher() (*packetDispatcher, error) {
d.dispatcherMu.Lock()
defer d.dispatcherMu.Unlock()
// Check if someone else instantiated the singleton in between our lock
// acquisitions.
if d.dispatcher != nil {
d.dispatcher.Retain()
return d.dispatcher, nil
}
// Create a new datagram sender for this device.
rds := remoteDynamicDatagramSender{d: d}
if err := rds.ensureSenderConnected(); err != nil {
return nil, err
}
// Create a new dispatcher.
d.dispatcher = &packetDispatcher{
d: d,
logger: logging.Must(d.Logger),
onShutdown: d.clearDispatcher,
sender: &rds,
}
if err := d.dispatcher.RetainAndStart(); err != nil {
return nil, err
}
return d.dispatcher, nil
}
// clearDispatcher clears the Remote's dispatcher. It is called by the
// packetDispatcher's onShutdown callback.
//
// A new dispatcher will be created when the next Sender is instantiated.
//
// As added protection, we first ensure that the dispatcher that we are clearing
// matches the current dispatcher. This could be false if users are sloppy
// with their Closes.
//
// Either way, the dispatcher is responsible for shutting itself down.
func (d *Remote) clearDispatcher(disp *packetDispatcher) {
d.dispatcherMu.Lock()
defer d.dispatcherMu.Unlock()
if d.dispatcher == disp {
// The dispatchers match, so clear the current reference.
d.dispatcher = nil
}
}
// DiscoveryHeaders implements D.
func (d *Remote) DiscoveryHeaders() *protocol.DiscoveryHeaders {
if dh := d.getState().headers; dh != nil {
return dh
}
return &protocol.DiscoveryHeaders{}
}
// DoneC implements D.
func (d *Remote) DoneC() <-chan struct{} { return d.doneC }
// Addr implements D.
func (d *Remote) Addr() net.Addr {
return d.getState().addr
}
// Info implements D.
func (d *Remote) Info() (i Info) {
state := d.getState()
d.modInfo(func(di *Info) {
i = Info{
PacketsReceived: di.PacketsReceived,
BytesReceived: di.BytesReceived,
PacketsSent: di.PacketsSent,
BytesSent: di.BytesSent,
Created: d.createdTime,
Observed: state.observed,
}
})
return
}
func (d *Remote) modInfo(fn func(*Info)) {
d.infoMu.Lock()
defer d.infoMu.Unlock()
fn(&d.info)
}
// MarkDone closes this device's done channel, shutting down any observation
// and marking this device "done" to external users.
//
// MarkDone is safe for concurrent use, and may be called multiple times;
// however, calls past the first time will do nothing.
func (d *Remote) MarkDone() {
d.doneOnce.Do(func() { close(d.doneC) })
d.monitoring.Update(d)
}
// remoteDynamicDatagramSender is a network.DatagramSender that sends datagrams
// to a Remote device.
//
// Because a Remote device can receive header updates, it's possible for its
// address and port to change dynamically. remoteDynamicDatagramSender
// accommodates this by transparently opening a new connection if such a change
// is observed.
//
// remoteDynamicDatagramSender is not safe for concurrent use.
type remoteDynamicDatagramSender struct {
d *Remote
base network.DatagramSender
baseAddr *net.UDPAddr
// When we create a new base, we record its datagram size and report it
// here. This prevents us from needing to potentially create a new connection
// when users call the otherwise-lightweight MaxDatagramSize.
lastDatagramSize int
}
func (rds *remoteDynamicDatagramSender) Close() error {
if rds.base == nil {
return nil
}
ds := rds.base
rds.base, rds.baseAddr = nil, nil
return ds.Close()
}
func (rds *remoteDynamicDatagramSender) SendDatagram(d []byte) error {
if err := rds.ensureSenderConnected(); err != nil {
return err
}
if err := rds.base.SendDatagram(d); err != nil {
return err
}
// Update stats.
rds.d.modInfo(func(i *Info) {
i.PacketsSent++
i.BytesSent += int64(len(d))
})
return nil
}
func (rds *remoteDynamicDatagramSender) MaxDatagramSize() int {
return rds.lastDatagramSize
}
// We bother going through this process to get maximum reuse of a bound UDP
// DatagramSender. Since in the common case the base will not change, we can
// avoid the overhead of binding to a new port for each packet most of the time.
func (rds *remoteDynamicDatagramSender) ensureSenderConnected() error {
addr, ok := rds.d.Addr().(*net.UDPAddr)
if !ok {
return errors.New("device address is not a *net.UDPAddr")
}
// Loop repeatedly until the address settles and we can return with a reader
// lock.
addrMatches := func() bool {
return rds.base != nil &&
(addr.IP.Equal(rds.baseAddr.IP) && addr.Port == rds.baseAddr.Port)
}
// (Common case) Do we have a base Sender, and does it match the address?
if addrMatches() {
return nil
}
// Close our current base, if we have one.
if rds.base != nil {
if err := rds.base.Close(); err != nil {
return err
}
}
w, err := net.DialUDP("udp4", nil, addr)
if err != nil {
return err
}
rds.base = network.UDPDatagramSender(w)
rds.baseAddr = addr
rds.lastDatagramSize = rds.base.MaxDatagramSize()
return nil
}
// remoteSender is an individual Sender instance that uses a shared dispatcher
// singleton to write.
type remoteSender struct {
*packetDispatcher
d *Remote
}
func (rs *remoteSender) Close() error { return rs.packetDispatcher.Release() }