Skip to content

Commit

Permalink
Set default timeout on curl handles (#66)
Browse files Browse the repository at this point in the history
Co-authored-by: David Goffredo <[email protected]>
  • Loading branch information
dmehala and dgoffredo authored Nov 15, 2023
1 parent f9e4217 commit 7038b02
Show file tree
Hide file tree
Showing 20 changed files with 291 additions and 166 deletions.
114 changes: 80 additions & 34 deletions src/datadog/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <unordered_map>
#include <unordered_set>

#include "clock.h"
#include "dict_reader.h"
#include "dict_writer.h"
#include "http_client.h"
Expand Down Expand Up @@ -95,6 +96,10 @@ CURLcode CurlLibrary::easy_setopt_writefunction(CURL *handle,
return curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, on_write);
}

CURLcode CurlLibrary::easy_setopt_timeout_ms(CURL *handle, long timeout_ms) {
return curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, timeout_ms);
}

const char *CurlLibrary::easy_strerror(CURLcode error) {
return curl_easy_strerror(error);
}
Expand Down Expand Up @@ -162,6 +167,7 @@ class CurlImpl {
std::mutex mutex_;
CurlLibrary &curl_;
const std::shared_ptr<Logger> logger_;
Clock clock_;
CURLM *multi_handle_;
std::unordered_set<CURL *> request_handles_;
std::list<CURL *> new_handles_;
Expand All @@ -179,6 +185,7 @@ class CurlImpl {
char error_buffer[CURL_ERROR_SIZE] = "";
std::unordered_map<std::string, std::string> response_headers_lower;
std::string response_body;
std::chrono::steady_clock::time_point deadline;

~Request();
};
Expand Down Expand Up @@ -221,13 +228,14 @@ class CurlImpl {
static StringView trim(StringView);

public:
explicit CurlImpl(const std::shared_ptr<Logger> &, CurlLibrary &,
const Curl::ThreadGenerator &);
explicit CurlImpl(const std::shared_ptr<Logger> &, const Clock &,
CurlLibrary &, const Curl::ThreadGenerator &);
~CurlImpl();

Expected<void> post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error);
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline);

void drain(std::chrono::steady_clock::time_point deadline);
};
Expand All @@ -242,22 +250,25 @@ void throw_on_error(CURLcode result) {

} // namespace

Curl::Curl(const std::shared_ptr<Logger> &logger) : Curl(logger, libcurl) {}
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock)
: Curl(logger, clock, libcurl) {}

Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl)
: Curl(logger, curl,
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl)
: Curl(logger, clock, curl,
[](auto &&func) { return std::thread(std::move(func)); }) {}

Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
const Curl::ThreadGenerator &make_thread)
: impl_(new CurlImpl{logger, curl, make_thread}) {}
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
: impl_(new CurlImpl{logger, clock, curl, make_thread}) {}

Curl::~Curl() { delete impl_; }

Expected<void> Curl::post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error) {
return impl_->post(url, set_headers, body, on_response, on_error);
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) {
return impl_->post(url, set_headers, body, on_response, on_error, deadline);
}

void Curl::drain(std::chrono::steady_clock::time_point deadline) {
Expand All @@ -268,10 +279,11 @@ nlohmann::json Curl::config_json() const {
return nlohmann::json::object({{"type", "datadog::tracing::Curl"}});
}

CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
const Curl::ThreadGenerator &make_thread)
CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
: curl_(curl),
logger_(logger),
clock_(clock),
shutting_down_(false),
num_active_handles_(0) {
curl_.global_init(CURL_GLOBAL_ALL);
Expand Down Expand Up @@ -311,24 +323,35 @@ CurlImpl::~CurlImpl() {
}
log_on_error(curl_.multi_wakeup(multi_handle_));
event_loop_.join();

log_on_error(curl_.multi_cleanup(multi_handle_));
curl_.global_cleanup();
}

Expected<void> CurlImpl::post(const HTTPClient::URL &url,
HeadersSetter set_headers, std::string body,
ResponseHandler on_response,
ErrorHandler on_error) try {
Expected<void> CurlImpl::post(
const HTTPClient::URL &url, HeadersSetter set_headers, std::string body,
ResponseHandler on_response, ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) try {
if (multi_handle_ == nullptr) {
return Error{Error::CURL_HTTP_CLIENT_NOT_RUNNING,
"Unable to send request via libcurl because the HTTP client "
"failed to start."};
}

HeaderWriter writer{curl_};
set_headers(writer);
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
writer.release(), std::move(cleanup_list)};

auto request = std::make_unique<Request>();

request->curl = &curl_;
request->request_headers = headers.get();
request->request_body = std::move(body);
request->on_response = std::move(on_response);
request->on_error = std::move(on_error);
request->deadline = std::move(deadline);

auto cleanup_handle = [&](auto handle) { curl_.easy_cleanup(handle); };
std::unique_ptr<CURL, decltype(cleanup_handle)> handle{
Expand All @@ -339,6 +362,8 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
"unable to initialize a curl handle for request sending"};
}

throw_on_error(
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));
throw_on_error(curl_.easy_setopt_private(handle.get(), request.get()));
throw_on_error(
curl_.easy_setopt_errorbuffer(handle.get(), request->error_buffer));
Expand All @@ -365,25 +390,17 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
}

HeaderWriter writer{curl_};
set_headers(writer);
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
writer.release(), std::move(cleanup_list)};
request->request_headers = headers.get();
throw_on_error(
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));

std::list<CURL *> node;
node.push_back(handle.get());
{
std::lock_guard<std::mutex> lock(mutex_);
new_handles_.splice(new_handles_.end(), node);

headers.release();
handle.release();
request.release();
(void)headers.release();
(void)handle.release();
(void)request.release();
}

log_on_error(curl_.multi_wakeup(multi_handle_));

return nullopt;
Expand Down Expand Up @@ -464,6 +481,7 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
void CurlImpl::run() {
int num_messages_remaining;
CURLMsg *message;
const int max_wait_milliseconds = 10000;
std::unique_lock<std::mutex> lock(mutex_);

for (;;) {
Expand All @@ -478,16 +496,46 @@ void CurlImpl::run() {
&num_messages_remaining))) {
handle_message(*message, lock);
}

const int max_wait_milliseconds = 10 * 1000;
lock.unlock();
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
max_wait_milliseconds, nullptr));
lock.lock();

// New requests might have been added while we were sleeping.
for (; !new_handles_.empty(); new_handles_.pop_front()) {
CURL *const handle = new_handles_.front();
CURL *handle = new_handles_.front();
char *user_data;
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
CURLE_OK) {
curl_.easy_cleanup(handle);
continue;
}

auto *request = reinterpret_cast<Request *>(user_data);
const auto timeout = request->deadline - clock_().tick;
if (timeout <= std::chrono::steady_clock::time_point::duration::zero()) {
std::string message;
message +=
"Request deadline exceeded before request was even added to "
"libcurl "
"event loop. Deadline was ";
message += std::to_string(
-std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
.count());
message += " nanoseconds ago.";
request->on_error(
Error{Error::CURL_DEADLINE_EXCEEDED_BEFORE_REQUEST_START,
std::move(message)});

curl_.easy_cleanup(handle);
delete request;

continue;
}

log_on_error(curl_.easy_setopt_timeout_ms(
handle, std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
.count()));
log_on_error(curl_.multi_add_handle(multi_handle_, handle));
request_handles_.insert(handle);
}
Expand All @@ -510,8 +558,6 @@ void CurlImpl::run() {
}

request_handles_.clear();
log_on_error(curl_.multi_cleanup(multi_handle_));
curl_.global_cleanup();
}

void CurlImpl::handle_message(const CURLMsg &message,
Expand Down
12 changes: 8 additions & 4 deletions src/datadog/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <thread>

#include "clock.h"
#include "http_client.h"
#include "json_fwd.hpp"

Expand Down Expand Up @@ -59,6 +60,7 @@ class CurlLibrary {
virtual CURLcode easy_setopt_url(CURL *handle, const char *url);
virtual CURLcode easy_setopt_writedata(CURL *handle, void *data);
virtual CURLcode easy_setopt_writefunction(CURL *handle, WriteCallback);
virtual CURLcode easy_setopt_timeout_ms(CURL *handle, long timeout_ms);
virtual const char *easy_strerror(CURLcode error);
virtual void global_cleanup();
virtual CURLcode global_init(long flags);
Expand Down Expand Up @@ -86,16 +88,18 @@ class Curl : public HTTPClient {
public:
using ThreadGenerator = std::function<std::thread(std::function<void()> &&)>;

explicit Curl(const std::shared_ptr<Logger> &);
Curl(const std::shared_ptr<Logger> &, CurlLibrary &);
Curl(const std::shared_ptr<Logger> &, CurlLibrary &, const ThreadGenerator &);
explicit Curl(const std::shared_ptr<Logger> &, const Clock &);
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &);
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &,
const ThreadGenerator &);
~Curl();

Curl(const Curl &) = delete;

Expected<void> post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error) override;
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) override;

void drain(std::chrono::steady_clock::time_point deadline) override;

Expand Down
46 changes: 25 additions & 21 deletions src/datadog/datadog_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,19 @@ std::variant<CollectorResponse, std::string> parse_agent_traces_response(
DatadogAgent::DatadogAgent(
const FinalizedDatadogAgentConfig& config,
const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
const Clock& clock, const std::shared_ptr<Logger>& logger)
const std::shared_ptr<Logger>& logger)
: tracer_telemetry_(tracer_telemetry),
clock_(clock),
clock_(config.clock),
logger_(logger),
traces_endpoint_(traces_endpoint(config.url)),
telemetry_endpoint_(telemetry_endpoint(config.url)),
http_client_(config.http_client),
event_scheduler_(config.event_scheduler),
cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
config.flush_interval, [this]() { flush(); })),
flush_interval_(config.flush_interval) {
flush_interval_(config.flush_interval),
request_timeout_(config.request_timeout),
shutdown_timeout_(config.shutdown_timeout) {
assert(logger_);
assert(tracer_telemetry_);
if (tracer_telemetry_->enabled()) {
Expand Down Expand Up @@ -185,7 +187,7 @@ DatadogAgent::DatadogAgent(
}

DatadogAgent::~DatadogAgent() {
const auto deadline = clock_().tick + std::chrono::seconds(2);
const auto deadline = clock_().tick + shutdown_timeout_;
cancel_scheduled_flush_();
flush();
if (tracer_telemetry_->enabled()) {
Expand All @@ -208,17 +210,15 @@ Expected<void> DatadogAgent::send(
}

nlohmann::json DatadogAgent::config_json() const {
const auto flush_interval_milliseconds =
std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_)
.count();

// clang-format off
return nlohmann::json::object({
{"type", "datadog::tracing::DatadogAgent"},
{"config", nlohmann::json::object({
{"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
{"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
{"flush_interval_milliseconds", flush_interval_milliseconds},
{"flush_interval_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_).count() },
{"request_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(request_timeout_).count() },
{"shutdown_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(shutdown_timeout_).count() },
{"http_client", http_client_->config_json()},
{"event_scheduler", event_scheduler_->config_json()},
})},
Expand Down Expand Up @@ -324,9 +324,10 @@ void DatadogAgent::flush() {
};

tracer_telemetry_->metrics().trace_api.requests.inc();
auto post_result = http_client_->post(
traces_endpoint_, std::move(set_request_headers), std::move(body),
std::move(on_response), std::move(on_error));
auto post_result =
http_client_->post(traces_endpoint_, std::move(set_request_headers),
std::move(body), std::move(on_response),
std::move(on_error), clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
Expand All @@ -335,9 +336,10 @@ void DatadogAgent::flush() {

void DatadogAgent::send_app_started() {
auto payload = tracer_telemetry_->app_started();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-started event: "));
Expand All @@ -346,9 +348,10 @@ void DatadogAgent::send_app_started() {

void DatadogAgent::send_heartbeat_and_telemetry() {
auto payload = tracer_telemetry_->heartbeat_and_telemetry();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-heartbeat event: "));
Expand All @@ -357,9 +360,10 @@ void DatadogAgent::send_heartbeat_and_telemetry() {

void DatadogAgent::send_app_closing() {
auto payload = tracer_telemetry_->app_closing();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-closing event: "));
Expand Down
Loading

0 comments on commit 7038b02

Please sign in to comment.