-
Notifications
You must be signed in to change notification settings - Fork 537
/
messageids.go
200 lines (174 loc) · 5.25 KB
/
messageids.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
* Copyright (c) 2013 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Matt Brittan
*/
package mqtt
import (
"fmt"
"sync"
"time"
)
// MId is 16 bit message id as specified by the MQTT spec.
// In general, these values should not be depended upon by
// the client application.
type MId uint16
type messageIds struct {
mu sync.RWMutex // Named to prevent Mu from being accessible directly via client
index map[uint16]tokenCompletor
lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
}
const (
midMin uint16 = 1
midMax uint16 = 65535
)
// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.mu.Lock()
for _, token := range mids.index {
switch token.(type) {
case *PublishToken:
token.setError(fmt.Errorf("connection lost before Publish completed"))
case *SubscribeToken:
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
case nil: // should not be any nil entries
continue
}
token.flowComplete()
}
mids.index = make(map[uint16]tokenCompletor)
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up")
}
// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.mu.Lock()
for mid, token := range mids.index {
switch token.(type) {
case *SubscribeToken:
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
delete(mids.index, mid)
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
delete(mids.index, mid)
}
}
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up subs")
}
func (mids *messageIds) freeID(id uint16) {
mids.mu.Lock()
delete(mids.index, id)
mids.mu.Unlock()
}
func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
mids.mu.Lock()
defer mids.mu.Unlock()
if _, ok := mids.index[id]; !ok {
mids.index[id] = token
} else {
old := mids.index[id]
old.flowComplete()
mids.index[id] = token
}
if id > mids.lastIssuedID {
mids.lastIssuedID = id
}
}
// getID will return an available id or 0 if none available
// The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
func (mids *messageIds) getID(t tokenCompletor) uint16 {
mids.mu.Lock()
defer mids.mu.Unlock()
i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
looped := false // uint16 will loop from 65535->0
for {
i++
if i == 0 { // skip 0 because its not a valid id (Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1])
i++
looped = true
}
if _, ok := mids.index[i]; !ok {
mids.index[i] = t
mids.lastIssuedID = i
return i
}
if (looped && i == mids.lastIssuedID) || (mids.lastIssuedID == 0 && i == midMax) { // lastIssuedID will be 0 at startup
return 0 // no free ids
}
}
}
func (mids *messageIds) getToken(id uint16) tokenCompletor {
mids.mu.RLock()
defer mids.mu.RUnlock()
if token, ok := mids.index[id]; ok {
return token
}
return &DummyToken{id: id}
}
type DummyToken struct {
id uint16
}
// Wait implements the Token Wait method.
func (d *DummyToken) Wait() bool {
return true
}
// WaitTimeout implements the Token WaitTimeout method.
func (d *DummyToken) WaitTimeout(t time.Duration) bool {
return true
}
// Done implements the Token Done method.
func (d *DummyToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
func (d *DummyToken) flowComplete() {
ERROR.Printf("A lookup for token %d returned nil\n", d.id)
}
func (d *DummyToken) Error() error {
return nil
}
func (d *DummyToken) setError(e error) {}
// PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
// it differs from DummyToken in that calling flowComplete does not generate an error (it
// is expected that flowComplete will be called when the token is overwritten with a real token)
type PlaceHolderToken struct {
id uint16
}
// Wait implements the Token Wait method.
func (p *PlaceHolderToken) Wait() bool {
return true
}
// WaitTimeout implements the Token WaitTimeout method.
func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
return true
}
// Done implements the Token Done method.
func (p *PlaceHolderToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
func (p *PlaceHolderToken) flowComplete() {
}
func (p *PlaceHolderToken) Error() error {
return nil
}
func (p *PlaceHolderToken) setError(e error) {}