-
Notifications
You must be signed in to change notification settings - Fork 5
/
homa_listener.h
170 lines (141 loc) · 6.09 KB
/
homa_listener.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#ifndef HOMA_LISTENER_H
#define HOMA_LISTENER_H
#include <mutex>
#include <optional>
#include <unordered_map>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/surface/server.h"
#include "homa_socket.h"
#include "homa_stream.h"
#include "wire.h"
#ifndef __UNIT_TEST__
#define PROTECTED protected
#else
#define PROTECTED public
#endif
/**
* Stores all the state needed to serve Homa requests on a particular
* port.
*/
class HomaListener : public grpc_core::Server::ListenerInterface {
public:
static HomaListener *Get(grpc_server* server, int *port, bool ipv6);
static std::shared_ptr<grpc::ServerCredentials> insecureCredentials(void);
PROTECTED:
/**
* This class provides credentials to create Homa listeners.
*/
class InsecureCredentials final : public grpc::ServerCredentials {
public:
int AddPortToServer(const std::string& addr, grpc_server* server);
void SetAuthMetadataProcessor(
const std::shared_ptr<grpc::AuthMetadataProcessor>& processor)
override {
(void)processor;
GPR_ASSERT(0); // Should not be called on insecure credentials.
}
};
/**
* This class manages the Homa socket associated with the listener, and
* contains most of the listener functionality. In the TCP world, the
* listener corresponds to the listen socket and the transport corresponds
* to all of the individual data connections; in the Homa world, a single
* Homa socket serves both purposes, and it is managed here in the
* transport. Thus there isn't much left in the listener.
*/
class Transport {
public:
Transport(grpc_server* server, int *port, bool ipv6);
~Transport();
HomaStream * getStream(StreamId *streamId,
std::optional<grpc_core::MutexLock>& streamLock,
bool create);
void shutdown();
void start(grpc_core::Server* server,
const std::vector<grpc_pollset*>* pollsets);
static void destroy(grpc_transport* gt);
static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure);
static grpc_endpoint*
get_endpoint(grpc_transport* gt);
static int init_stream(grpc_transport* gt, grpc_stream* gs,
grpc_stream_refcount* refcount,
const void* init_info, grpc_core::Arena* arena);
static void onRead(void* arg, grpc_error_handle error);
static void perform_op(grpc_transport* gt, grpc_transport_op* op);
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op);
static void set_pollset(grpc_transport* gt, grpc_stream* gs,
grpc_pollset* pollset);
static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
grpc_pollset_set* pollset_set);
PROTECTED:
/**
* This structure is used to pass data down through callbacks to
* init_stream and back up again.
*/
struct StreamInit {
// Identifying information from incoming RPC.
StreamId *streamId;
// Used to return the HomaStream address back through callbacks.
HomaStream *stream;
};
// Points to a virtual function table for use by the rest of gRPC to
// treat this object as a transport. gRPC uses a pointer to this field
// as a generic handle for the object.
grpc_transport vtable;
// Associated gRPC server. Not owned by this object
grpc_core::Server *server;
// Keeps track of all RPCs currently in some stage of processing;
// used to look up the Stream for an RPC based on its id.
std::unordered_map<StreamId, HomaStream*, StreamId::Hasher,
StreamId::Pred> activeRpcs;
typedef std::unordered_map<StreamId, HomaStream*,
StreamId::Hasher, StreamId::Pred>::iterator ActiveIterator;
// Must be held when accessing @activeRpcs. Must not be acquired while
// holding a stream lock.
grpc_core::Mutex mutex;
// Manages the Homa socket, including buffer space.
HomaSocket sock;
// Used to call us back when fd is readable.
grpc_closure read_closure;
grpc_core::ConnectivityStateTracker state_tracker;
// Used to notify gRPC of new incoming requests.
void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
const void* server_data);
void* accept_stream_data;
friend class TestListener;
};
HomaListener(grpc_server* server, int *port, bool ipv6);
~HomaListener();
void Orphan() override ;
void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
void Start(grpc_core::Server* server,
const std::vector<grpc_pollset*>* pollsets) override;
grpc_core::channelz::ListenSocketNode* channelz_listen_socket_node()
const override;
// Transport associated with the listener; its lifetime is managed
// outside this class.
Transport *transport;
// Homa port number associated with this listener.
int port;
grpc_closure* on_destroy_done;
/**
* Information that is shared across all HomaListener/Transport objects.
*/
struct Shared {
// Contains pointers to all open Homa ports: keys are port numbers.
std::unordered_map<int, HomaListener *> ports;
// Synchronizes access to this structure.
grpc_core::Mutex mutex;
// Function table shared across all HomaListeners.
struct grpc_transport_vtable vtable;
Shared() : ports(), mutex() {}
};
// Singleton object with common info.
static std::optional<Shared> shared;
static gpr_once shared_once;
static void InitShared(void);
friend class TestListener;
};
#endif // HOMA_LISTENER_H