-
Notifications
You must be signed in to change notification settings - Fork 8
/
connection.go
108 lines (84 loc) · 2.52 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package amqprpc
import (
"errors"
"fmt"
"maps"
amqp "github.com/rabbitmq/amqp091-go"
)
// ErrUnexpectedConnClosed is returned by ListenAndServe() if the server
// shuts down without calling Stop() and if AMQP does not give an error
// when said shutdown happens.
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")
// OnStartedFunc can be registered at Server.OnStarted(f) and
// Client.OnStarted(f). This is used when you want to do more setup on the
// connections and/or channels from amqp, for example setting Qos,
// NotifyPublish etc.
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
func monitorAndWait(
restartChan,
stopChan chan struct{},
inputConnClose,
outputConnClose,
inputChClose,
outputChClose chan *amqp.Error,
) (bool, error) {
select {
case <-restartChan:
return true, nil
case <-stopChan:
return false, nil
case err, ok := <-inputConnClose:
if !ok {
return false, ErrUnexpectedConnClosed
}
return false, err
case err, ok := <-outputConnClose:
if !ok {
return false, ErrUnexpectedConnClosed
}
return false, err
case err, ok := <-inputChClose:
if !ok {
return false, ErrUnexpectedConnClosed
}
return false, err
case err, ok := <-outputChClose:
if !ok {
return false, ErrUnexpectedConnClosed
}
return false, err
}
}
func createConnections(url, name string, config amqp.Config) (consumerConn, publisherConn *amqp.Connection, err error) {
if config.Properties == nil {
config.Properties = amqp.Table{}
}
consumerConnConfig := config
publisherConnConfig := config
if _, ok := config.Properties["connection_name"]; !ok {
consumerConnConfig.Properties = maps.Clone(config.Properties)
publisherConnConfig.Properties = maps.Clone(config.Properties)
consumerConnConfig.Properties["connection_name"] = fmt.Sprintf("%s-consumer", name)
publisherConnConfig.Properties["connection_name"] = fmt.Sprintf("%s-publisher", name)
}
consumerConn, err = amqp.DialConfig(url, consumerConnConfig)
if err != nil {
return nil, nil, err
}
publisherConn, err = amqp.DialConfig(url, publisherConnConfig)
if err != nil {
return nil, nil, err
}
return consumerConn, publisherConn, nil
}
func createChannels(inputConn, outputConn *amqp.Connection) (inputCh, outputCh *amqp.Channel, err error) {
inputCh, err = inputConn.Channel()
if err != nil {
return nil, nil, err
}
outputCh, err = outputConn.Channel()
if err != nil {
return nil, nil, err
}
return inputCh, outputCh, nil
}