-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.h
171 lines (139 loc) · 4.17 KB
/
socket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* COPYRIGHT 2014 (C) Jason Volk
* COPYRIGHT 2014 (C) Svetlana Tkachenko
*
* DISTRIBUTED UNDER THE GNU GENERAL PUBLIC LICENSE (GPL) (see: LICENSE)
*/
class Socket
{
const Opts &opts;
boost::asio::io_service &ios;
boost::asio::ip::tcp::endpoint ep;
boost::asio::ip::tcp::socket sd;
std::ostringstream sendq;
milliseconds delay;
Throttle throttle;
int cork; // makes operator<<(flush_t) ineffective
public:
using flush_t = Stream::flush_t;
static constexpr flush_t flush {};
auto &get_ep() const { return ep; }
auto &get_sd() const { return sd; }
auto &get_delay() const { return delay; }
auto &get_throttle() const { return throttle; }
auto has_cork() const { return cork > 0; }
auto has_pending() const { return !sendq.str().empty(); }
bool is_connected() const;
auto &get_ep() { return ep; }
auto &get_sd() { return sd; }
void set_ecb(const sendq::ECb &cb) { sendq::set_ecb(&get_sd(),cb); }
void set_throttle(const milliseconds &inc) { this->throttle.set_inc(inc); }
void set_delay(const milliseconds &delay) { this->delay = delay; }
void set_cork() { this->cork++; }
void unset_cork() { this->cork--; }
void purge() { sendq::purge(&get_sd()); }
void clear(); // Clears the instance sendq buffer
Socket &operator<<(const flush_t);
template<class T> Socket &operator<<(const T &t);
bool disconnect(const bool &fin = true);
void connect(); // Blocking/Synchronous
Socket(const Opts &opts, boost::asio::io_service &ios);
~Socket() noexcept;
};
inline
Socket::Socket(const Opts &opts,
boost::asio::io_service &ios):
opts(opts),
ios(ios),
ep([&]() -> decltype(ep)
{
using namespace boost::asio::ip;
const auto &host(opts.has("proxy")? split(opts["proxy"],":").first : opts["host"]);
const auto &port(opts.has("proxy")? split(opts["proxy"],":").second : opts["port"]);
boost::system::error_code ec;
tcp::resolver res(ios);
const tcp::resolver::query query(tcp::v4(),host,port,tcp::resolver::query::numeric_service);
const auto it(res.resolve(query,ec));
if(ec)
throw Internal(ec.value(),ec.message());
return *it;
}()),
sd(ios),
delay(0ms),
cork(0)
{
}
inline
Socket::~Socket()
noexcept
{
purge();
}
inline
void Socket::connect()
try
{
sd.open(boost::asio::ip::tcp::v4());
sd.connect(ep);
}
catch(const boost::system::system_error &e)
{
throw Internal(e.what());
}
inline
bool Socket::disconnect(const bool &fin)
try
{
if(!sd.is_open())
return false;
if(fin)
{
boost::system::error_code ec;
sd.shutdown(boost::asio::ip::tcp::socket::shutdown_type::shutdown_both,ec);
}
sd.close();
return true;
}
catch(const boost::system::system_error &e)
{
std::cerr << "Socket::disconnect(): " << e.what() << std::endl;
return false;
}
template<class T>
Socket &Socket::operator<<(const T &t)
{
sendq << t;
return *this;
}
inline
Socket &Socket::operator<<(const flush_t)
{
if(has_cork())
{
sendq << "\r\n";
return *this;
}
const scope clr(std::bind(&Socket::clear,this));
const auto xmit_time(delay == 0ms? throttle.next_abs() : steady_clock::now() + delay);
const std::lock_guard<decltype(sendq::mutex)> lock(sendq::mutex);
sendq::queue.push_back({xmit_time,&sd,sendq.str()});
sendq::cond.notify_one();
return *this;
}
inline
void Socket::clear()
{
sendq.clear();
sendq.str(std::string{});
delay = 0ms;
}
inline
bool Socket::is_connected()
const try
{
return sd.is_open();
}
catch(const boost::system::system_error &e)
{
return false;
}