-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
156 lines (133 loc) · 2.79 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"flag"
"log"
"mqtt-go/src/channel"
"mqtt-go/src/codec"
"mqtt-go/src/handler"
"net"
"time"
)
var (
addr string
heartbeat time.Duration
)
func main() {
flag.StringVar(&addr, "addr", ":1883", "监听地址及端口")
arg1 := flag.String("heartbeat", "1m", "心跳周期")
flag.Parse()
if v, err := time.ParseDuration(*arg1); err != nil {
log.Fatalf("非法的心跳格式:%s\n", v)
} else {
heartbeat = v
}
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}
log.Printf("监听: %s", l.Addr().String())
for {
conn, err := l.Accept()
if err != nil {
log.Printf("连接建立失败: %s", err.Error())
continue
}
go handleNewConn(conn)
}
}
func handleNewConn(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
log.Printf("panic info:%v\n", err)
}
}()
log.Printf("remote: [%s]", conn.RemoteAddr().String())
wrapConn := channel.NewChannel(conn, heartbeat)
handler.ChannelActive(wrapConn)
// 释放资源并广播连接断开事件
defer func() {
err := wrapConn.Close()
if err != nil {
log.Printf("连接关闭异常:%v", err)
}
log.Printf("客户端[%s]连接断开", wrapConn.Id)
handler.ChannelInactive(wrapConn)
}()
// 心跳
go startHandleIdle(wrapConn)
// 启动写入 goroutine
go startWriter(wrapConn)
// 开始处理数据流
startReader(wrapConn)
}
func startReader(channel *channel.Channel) {
// 开始解码
var cumulation []byte
for {
// 读 buf
buf := channel.Get()
n, err := channel.Read(buf)
if err != nil {
log.Printf("连接断开: %s\n", err.Error())
return
}
if n == 0 {
// 回收 buf
channel.Put(buf)
continue
}
// 可用 slice
cumulation = append(cumulation, buf[:n]...)
// 回收 buf
channel.Put(buf)
// 新的报文读取通知
channel.InputNotify <- channel.Heartbeat
// 开始解码
for {
if mqttMessage, left, err := codec.Decode(cumulation); err != nil {
log.Printf("解码错误: %s\n", err.Error())
return
} else {
if mqttMessage != nil {
handler.ChannelRead(channel, mqttMessage)
cumulation = left
continue
}
if left != nil && len(left) > 0 {
cumulation = left
} else {
cumulation = make([]byte, 0)
}
break
}
}
}
}
func startWriter(channel *channel.Channel) {
for {
select {
case buf := <-channel.Out:
if _, err := channel.Write0(buf); err != nil {
log.Printf("写入失败: %s\n", err)
}
case <-channel.Stop:
return
}
}
}
// 心跳处理
func startHandleIdle(channel *channel.Channel) {
interval := channel.Heartbeat
for {
select {
case <-time.After(interval):
log.Printf("心跳超时: %v\n", time.Now())
if channel.Closed {
return
}
channel.Close()
return
case interval = <-channel.InputNotify:
}
}
}