Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Do not frequently create and destroy http client threads #3198

Merged
merged 17 commits into from
Dec 18, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Increment the:

## [Unreleased]

* [SDK] Do not frequently create and destroy http client threads
[#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198)

## [1.18 2024-11-25]

* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,25 +322,16 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

inline CURLM *GetMultiHandle() noexcept { return multi_handle_; }

void MaybeSpawnBackgroundThread();
// return true if create background thread, false is already exist background thread
bool MaybeSpawnBackgroundThread();

void ScheduleAddSession(uint64_t session_id);
void ScheduleAbortSession(uint64_t session_id);
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);

void WaitBackgroundThreadExit()
{
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}
void SetBackgroundWaitFor(std::chrono::milliseconds ms);

if (background_thread && background_thread->joinable())
{
background_thread->join();
}
}
void WaitBackgroundThreadExit();

private:
void wakeupBackgroundThread();
Expand All @@ -366,6 +357,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::unique_ptr<std::thread> background_thread_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

std::chrono::milliseconds background_thread_wait_for_;
std::atomic<bool> is_shutdown_{false};

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
};

Expand Down
62 changes: 59 additions & 3 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,13 @@ HttpClient::HttpClient()
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
background_thread_wait_for_{std::chrono::minutes{1}},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}

HttpClient::~HttpClient()
{
is_shutdown_.store(true, std::memory_order_release);
while (true)
{
std::unique_ptr<std::thread> background_thread;
Expand All @@ -211,6 +213,7 @@ HttpClient::~HttpClient()
}
if (background_thread->joinable())
{
wakeupBackgroundThread(); // if delay quit, wake up first
background_thread->join();
}
}
Expand Down Expand Up @@ -335,29 +338,33 @@ void HttpClient::CleanupSession(uint64_t session_id)
}
}

void HttpClient::MaybeSpawnBackgroundThread()
bool HttpClient::MaybeSpawnBackgroundThread()
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
if (background_thread_)
{
return;
return false;
}

background_thread_.reset(new std::thread(
[](HttpClient *self) {
int still_running = 1;
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
bool need_wait_more = false;
while (true)
{
CURLMsg *msg;
int queued;
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);

// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
// can not curl_multi_perform it again
if (mc != CURLM_OK)
{
self->resetMultiHandle();
}
else if (still_running)
else if (still_running || need_wait_more)
{
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
// timeout to do the rest jobs
Expand Down Expand Up @@ -416,6 +423,32 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
if (still_running > 0)
{
last_free_job_timepoint = now;
need_wait_more = false;
continue;
}

std::chrono::milliseconds wait_for = std::chrono::milliseconds::zero();

#if LIBCURL_VERSION_NUM >= 0x074400
// only available with curl_multi_poll+curl_multi_wakeup, because curl_multi_wait would
// cause CPU busy, curl_multi_wait+sleep could not wakeup quickly
wait_for = self->background_thread_wait_for_;
#endif
if (self->is_shutdown_.load(std::memory_order_acquire))
{
wait_for = std::chrono::milliseconds::zero();
}

if (now - last_free_job_timepoint < wait_for)
{
need_wait_more = true;
continue;
}

if (still_running == 0)
{
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
Expand Down Expand Up @@ -454,6 +487,7 @@ void HttpClient::MaybeSpawnBackgroundThread()
}
},
this));
return true;
}

void HttpClient::ScheduleAddSession(uint64_t session_id)
Expand Down Expand Up @@ -502,6 +536,28 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
wakeupBackgroundThread();
}

void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
{
background_thread_wait_for_ = ms;
}

void HttpClient::WaitBackgroundThreadExit()
{
is_shutdown_.store(true, std::memory_order_release);
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}

if (background_thread && background_thread->joinable())
{
wakeupBackgroundThread();
background_thread->join();
xiehuc marked this conversation as resolved.
Show resolved Hide resolved
}
is_shutdown_.store(false, std::memory_order_release);
}

void HttpClient::wakeupBackgroundThread()
{
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs
Expand Down
50 changes: 50 additions & 0 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include <curl/curlver.h>
#include <gtest/gtest.h>
#include <string.h>
#include <atomic>
Expand All @@ -11,6 +12,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -531,3 +533,51 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
}
}
}

TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
{
auto http_client = http_client::HttpClientFactory::Create();
std::static_pointer_cast<curl::HttpClient>(http_client)->MaybeSpawnBackgroundThread();
// start background first, then test it could wakeup
auto session = http_client->CreateSession("http://127.0.0.1:19000/get/");
auto request = session->CreateRequest();
request->SetUri("get/");
auto handler = std::make_shared<GetEventHandler>();
session->SendRequest(handler);
std::this_thread::sleep_for(std::chrono::milliseconds{10}); // let it enter poll state
auto beg = std::chrono::system_clock::now();
http_client->FinishAllSessions();
http_client.reset();
// when background_thread_wait_for_ is used, it should have no side effect on elegant quit
// wait should be less than scheduled_delay_milliseconds_
// Due to load on CI hosts (some take 10ms), we assert it is less than 20ms
auto cost = std::chrono::system_clock::now() - beg;
ASSERT_TRUE(cost < std::chrono::milliseconds{20})
<< "cost ms: " << std::chrono::duration_cast<std::chrono::milliseconds>(cost).count()
<< " libcurl version: 0x" << std::hex << LIBCURL_VERSION_NUM;
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}

TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
{
{
curl::HttpClient http_client;
http_client.MaybeSpawnBackgroundThread();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
#if LIBCURL_VERSION_NUM >= 0x074200
ASSERT_FALSE(http_client.MaybeSpawnBackgroundThread());
#else
// low version curl do not support delay quit, so old background would quit
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
#endif
}
{
curl::HttpClient http_client;
http_client.SetBackgroundWaitFor(std::chrono::milliseconds::zero());
http_client.MaybeSpawnBackgroundThread();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
// we can disable delay quit by set wait for 0
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
}
}
Loading