-
Notifications
You must be signed in to change notification settings - Fork 12
/
rtm.go
168 lines (159 loc) · 4.6 KB
/
rtm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package slack
import (
"encoding/json"
"errors"
"io"
"net/http"
"net/url"
gerr "github.com/go-errors/errors"
"github.com/gorilla/websocket"
)
// WSMessageResponse holds a response to a WS request
type WSMessageResponse struct {
OK bool `json:"ok"`
ReplyTo int `json:"reply_to"`
Error struct {
Code int `json:"code"`
Msg string `json:"msg"`
} `json:"error"`
}
// RTMStartReply holds the reply to the RTM start message with info about everything
type RTMStartReply struct {
slackResponse
URL string `json:"url"`
Self struct {
ID string `json:"id"`
Name string `json:"name"`
Prefs map[string]interface{} `json:"prefs"`
Created int64 `json:"created"`
ManualPresence string `json:"manual_presence"`
} `json:"self"`
Team Team `json:"team"`
LatestEventTS string `json:"latest_event_ts"`
Channels []Channel `json:"channels"`
Groups []Group `json:"groups"`
IMS []IM `json:"ims"`
Users []User `json:"users"`
Bots []Bot `json:"bots"`
}
// RTMStart starts the websocket
func (s *Slack) RTMStart(origin string, in chan *Message, context interface{}) (*RTMStartReply, error) {
r := &RTMStartReply{}
err := s.do("rtm.start", url.Values{}, r)
if err != nil {
return nil, err
}
header := http.Header{"Origin": {origin}}
s.ws, _, err = websocket.DefaultDialer.Dial(r.URL, header)
if err != nil {
return nil, err
}
// Start reading the messages and pumping them to the channel
go func(ws *websocket.Conn, in chan *Message) {
defer func() {
if err := recover(); err != nil {
s.errorlog.Printf("Recovered from error, %v\n", err)
errMsg := gerr.Wrap(err, 2).ErrorStack()
s.errorlog.Println(errMsg)
}
ws.Close()
}()
// Make sure we are receiving pongs
// ws.SetReadDeadline(t)
for {
msg := &Message{}
var unmarshallError bool
// Manually read the next message so that if there is JSON error we can
// dump the error to log
// This is a hack because the events have different fields with different structs
// while initially we defined just a simple Message event so now we need to translate
// the various events to message to keep compatibility
_, p, err := ws.ReadMessage()
if err == nil {
typeMsg := &baseTypeMessage{}
err = json.Unmarshal(p, typeMsg)
// Ignore specific messages like user_change for now
if err == nil {
switch typeMsg.Type {
case "channel_created", "channel_joined", "channel_rename", "im_created", "group_joined", "group_left", "group_rename":
channelEvent := &ChannelEvent{}
err = json.Unmarshal(p, channelEvent)
if err == nil {
msg.Type = channelEvent.Type
msg.Channel = channelEvent.Channel.ID
msg.User = channelEvent.Channel.Creator
msg.Name = channelEvent.Channel.Name
}
case "user_change", "team_join":
userEvent := &UserEvent{}
err = json.Unmarshal(p, userEvent)
if err == nil {
msg.Type = userEvent.Type
msg.User = userEvent.User.ID
msg.Name = userEvent.User.Name
}
default:
err = json.Unmarshal(p, msg)
}
}
if err == io.EOF {
// One value is expected in the message.
err = io.ErrUnexpectedEOF
}
if err != nil {
s.errorf("Error unmarshaling message - %s\n", string(p))
unmarshallError = true
}
}
// err := ws.ReadJSON(msg)
if err != nil {
msg.Type = "error"
msg.Error.Code, msg.Error.Msg = 0, err.Error()
msg.Error.Unmarshall = unmarshallError
}
// Set the custom data for every message
msg.Context = context
in <- msg
if err != nil && !unmarshallError {
break
}
}
}(s.ws, in)
return r, nil
}
// RTMMessage is sent on the channel for simple text
type RTMMessage struct {
ID int `json:"id"`
Type string `json:"type"`
Channel string `json:"channel"`
Text string `json:"text"`
}
// RTMSend a simple text message to a channel/group/dm
func (s *Slack) RTMSend(channel, text string) (int, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.ws == nil {
return 0, errors.New("RTM channel is not open")
}
s.mid++
err := s.ws.WriteJSON(&RTMMessage{
ID: s.mid,
Type: "message",
Channel: channel,
Text: text,
})
return s.mid, err
}
// RTMStop closes the WebSocket which in turn closes the in channel passed in RTMStart
func (s *Slack) RTMStop() error {
if s.ws != nil {
err := s.ws.Close()
s.ws = nil
return err
}
return nil
}
// RTMRunning Checks if the RTM is running
func (s *Slack) RTMRunning() bool {
return s.ws != nil
}