forked from porthos-rpc/porthos-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
162 lines (127 loc) · 3.21 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package porthos
import (
"fmt"
"log"
"sync"
"time"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
// Client is an entry point for making remote calls.
type Client struct {
serviceName string
defaultTTL time.Duration
broker *Broker
responseQueueName string
slots map[string]*slot
slotLock sync.Mutex
m sync.Mutex
closed bool
}
func newUniqueQueueName(prefix string) string {
return fmt.Sprintf("%s@%d-porthos", prefix, time.Now().UnixNano())
}
// NewClient creates a new instance of Client, responsible for making remote calls.
func NewClient(b *Broker, serviceName string, defaultTTL time.Duration) (*Client, error) {
c := &Client{
serviceName: serviceName,
defaultTTL: defaultTTL,
broker: b,
slots: make(map[string]*slot, 3000),
responseQueueName: newUniqueQueueName(serviceName),
}
go c.start()
return c, nil
}
func (c *Client) start() {
rs := c.broker.NotifyReestablish()
for !c.closed {
if !c.broker.IsConnected() {
log.Printf("[PORTHOS] Connection not established. Waiting connection to be reestablished.")
<-rs
continue
}
err := c.consume()
if err != nil {
log.Printf("[PORTHOS] Error consuming responses. Error: %s", err)
time.Sleep(5 * time.Second)
} else {
log.Print("[PORTHOS] Consuming stopped.")
}
}
}
func (c *Client) consume() error {
ch, err := c.broker.openChannel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer ch.Close()
// create the response queue
_, err = ch.QueueDeclare(
c.responseQueueName, // name
false, // durable
false, // auto-delete
true, // exclusive
false, // no-wait
nil,
)
if err != nil {
return errors.Wrap(err, "failed to declare queue")
}
dc, err := ch.Consume(
c.responseQueueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return errors.Wrap(err, "failed to consume queue")
}
for d := range dc {
c.processResponse(d)
}
return nil
}
func (c *Client) processResponse(d amqp.Delivery) {
d.Ack(false)
statusCode := d.Headers["statusCode"].(int32)
res, ok := c.popSlot(d.CorrelationId)
if ok {
res.sendResponse(ClientResponse{
Content: d.Body,
ContentType: d.ContentType,
StatusCode: statusCode,
Headers: *NewHeadersFromMap(d.Headers),
})
} else {
log.Printf("[PORTHOS] Slot %s not exists.", d.CorrelationId)
}
}
// Call prepares a remote call.
func (c *Client) Call(method string) *call {
return newCall(c, method)
}
// Close the client and AMQP chanel.
// Client only will die if broker was closed.
func (c *Client) Close() {
c.m.Lock()
defer c.m.Unlock()
c.closed = true
}
func (c *Client) pushSlot(correlationID string, slot *slot) {
c.slotLock.Lock()
defer c.slotLock.Unlock()
c.slots[correlationID] = slot
}
func (c *Client) popSlot(correlationID string) (*slot, bool) {
c.slotLock.Lock()
defer c.slotLock.Unlock()
slot, ok := c.slots[correlationID]
if ok {
delete(c.slots, correlationID)
}
return slot, ok
}