Skip to content

Commit

Permalink
Merge pull request #388 from wazuh/enhancement/385-change-batch-size-…
Browse files Browse the repository at this point in the history
…calculation-from-message-count-to-message-size

Change batch size calculation from message count to message size
  • Loading branch information
TomasTurina authored Dec 18, 2024
2 parents 49dc23d + 40885d3 commit 1eb2738
Show file tree
Hide file tree
Showing 33 changed files with 710 additions and 463 deletions.
2 changes: 1 addition & 1 deletion etc/config/wazuh-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ agent:
retry_interval: 30s
events:
batch_interval: 10s
batch_size: 1000
batch_size: 1MB
inventory:
enabled: true
interval: 1h
Expand Down
26 changes: 7 additions & 19 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,12 @@ namespace communicator
m_retryInterval = getConfigValue.template operator()<std::time_t>("agent", "retry_interval")
.value_or(config::agent::DEFAULT_RETRY_INTERVAL);

m_batchInterval = getConfigValue.template operator()<std::time_t>("events", "batch_interval")
.value_or(config::agent::DEFAULT_BATCH_INTERVAL);

if (m_batchInterval < 1'000 || m_batchInterval > (1'000 * 60 * 60))
{
LogWarn("batch_interval must be between 1s and 1h. Using default value.");
m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL;
}

m_batchSize = getConfigValue.template operator()<int>("events", "batch_size")
m_batchSize = getConfigValue.template operator()<size_t>("events", "batch_size")
.value_or(config::agent::DEFAULT_BATCH_SIZE);

if (m_batchSize < 1 || m_batchSize > 1'000'000)
if (m_batchSize < 1000ULL || m_batchSize > 100000000ULL)
{
LogWarn("batch_size must be between 1 and 1000000. Using default value.");
LogWarn("batch_size must be between 1KB and 100MB. Using default value.");
m_batchSize = config::agent::DEFAULT_BATCH_SIZE;
}
}
Expand All @@ -93,14 +84,14 @@ namespace communicator
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void> StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Processes messages in a stateless manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void> StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Retrieves group configuration from the manager
Expand Down Expand Up @@ -135,11 +126,8 @@ namespace communicator
/// @brief Time in milliseconds between authentication attemps in case of failure
std::time_t m_retryInterval = config::agent::DEFAULT_RETRY_INTERVAL;

/// @brief Time between batch requests
std::time_t m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL;

/// @brief Maximum number of messages to batch
int m_batchSize = config::agent::DEFAULT_BATCH_SIZE;
/// @brief Size for batch requests
size_t m_batchSize = config::agent::DEFAULT_BATCH_SIZE;

/// @brief The server URL
std::string m_serverUrl;
Expand Down
8 changes: 3 additions & 5 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,17 @@ namespace http_client
/// @param messageGetter Function to get the message body asynchronously
/// @param onUnauthorized Callback for unauthorized access
/// @param connectionRetry Time in milliseconds to wait before retrying the connection
/// @param batchInterval Time to wait between requests
/// @param batchSize The maximum number of messages to batch
/// @param batchSize The minimum number of bytes of messages to batch
/// @param onSuccess Callback for successful request completion
/// @param loopRequestCondition Condition to continue looping requests
/// @return Awaitable task for the HTTP request
boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;

Expand Down
8 changes: 3 additions & 5 deletions src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,17 @@ namespace http_client
/// @param messageGetter Function to retrieve messages
/// @param onUnauthorized Action to take on unauthorized access
/// @param connectionRetry Time to wait before retrying the connection
/// @param batchInterval Time to wait between requests
/// @param batchSize The maximum number of messages to batch
/// @param batchSize The minimum number of bytes of messages to batch
/// @param onSuccess Action to take on successful request
/// @param loopRequestCondition Condition to continue the request loop
/// @return Awaitable task for the HTTP request
virtual boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;

Expand Down
17 changes: 4 additions & 13 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,8 @@ namespace communicator

const auto reqParams = http_client::HttpRequestParams(
boost::beast::http::verb::get, m_serverUrl, "/api/v1/commands", m_getHeaderInfo ? m_getHeaderInfo() : "");
co_await m_httpClient->Co_PerformHttpRequest(m_token,
reqParams,
{},
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
co_await m_httpClient->Co_PerformHttpRequest(
m_token, reqParams, {}, onAuthenticationFailed, m_retryInterval, m_batchSize, onSuccess, loopCondition);
}

boost::asio::awaitable<void> Communicator::WaitForTokenExpirationAndAuthenticate()
Expand Down Expand Up @@ -147,7 +140,7 @@ namespace communicator
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand All @@ -169,14 +162,13 @@ namespace communicator
getMessages,
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand All @@ -198,7 +190,6 @@ namespace communicator
getMessages,
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
Expand Down
15 changes: 3 additions & 12 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@ namespace http_client
boost::asio::awaitable<void> HttpClient::Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess,
std::function<bool()> loopRequestCondition)
{
Expand Down Expand Up @@ -156,25 +155,17 @@ namespace http_client

if (messageGetter != nullptr)
{
boost::asio::steady_timer refreshTimer(co_await boost::asio::this_coro::executor);
boost::asio::steady_timer batchTimeoutTimer(co_await boost::asio::this_coro::executor);
batchTimeoutTimer.expires_after(std::chrono::milliseconds(batchInterval));

while (loopRequestCondition != nullptr && loopRequestCondition())
{
const auto messages = co_await messageGetter(batchSize);
messagesCount = std::get<0>(messages);

if (messagesCount >= batchSize || batchTimeoutTimer.expiry() <= std::chrono::steady_clock::now())
if (messagesCount)
{
LogTrace("Messages count: {}", messagesCount);
reqParams.Body = std::get<1>(messages);
break;
}

constexpr int refreshInterval = 100;
refreshTimer.expires_after(std::chrono::milliseconds(refreshInterval));
co_await refreshTimer.async_wait(boost::asio::use_awaitable);
}
}
else
Expand Down
23 changes: 10 additions & 13 deletions src/agent/communicator/tests/communicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)

using namespace testing;
using GetMessagesFuncType = std::function<boost::asio::awaitable<intStringTuple>(const int)>;
using GetMessagesFuncType = std::function<boost::asio::awaitable<intStringTuple>(const size_t)>;

namespace
{
Expand Down Expand Up @@ -52,7 +52,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
{
auto mockHttpClient = std::make_unique<MockHttpClient>();

auto getMessages = [](const int) -> boost::asio::awaitable<intStringTuple>
auto getMessages = [](const size_t) -> boost::asio::awaitable<intStringTuple>
{
co_return intStringTuple {1, std::string("message-content")};
};
Expand All @@ -68,8 +68,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
GetMessagesFuncType pGetMessages,
std::function<void()>,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] size_t batchSize,
std::function<void(const int, const std::string&)> pOnSuccess,
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
{
Expand All @@ -78,7 +77,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
co_return;
};

EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _))
.WillOnce(Invoke(MockCo_PerformHttpRequest));

communicator::Communicator communicator(std::move(mockHttpClient), "uuid", "key", nullptr, FUNC);
Expand Down Expand Up @@ -118,8 +117,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] size_t batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
Expand All @@ -128,7 +126,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
};

// A following call to Co_PerformHttpRequest should not have a token
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _))
.WillOnce(Invoke(MockCo_PerformHttpRequest));

boost::asio::io_context ioContext;
Expand All @@ -139,7 +137,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[](const int) -> boost::asio::awaitable<intStringTuple>
[](const size_t) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
Expand Down Expand Up @@ -179,16 +177,15 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] size_t batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
capturedToken = *token;
co_return;
};

EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _))
.WillOnce(Invoke(MockCo_PerformHttpRequest));

boost::asio::io_context ioContext;
Expand All @@ -199,7 +196,7 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[](const int) -> boost::asio::awaitable<intStringTuple>
[](const size_t) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
Expand Down
Loading

0 comments on commit 1eb2738

Please sign in to comment.