diff --git a/etc/config/wazuh-agent.yml b/etc/config/wazuh-agent.yml index 2d9087b9a2..eb079df682 100644 --- a/etc/config/wazuh-agent.yml +++ b/etc/config/wazuh-agent.yml @@ -4,7 +4,7 @@ agent: retry_interval: 30s events: batch_interval: 10s - batch_size: 10000000 + batch_size: 1MB inventory: enabled: true interval: 1h diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index 453037cb45..7999f24084 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -63,7 +63,7 @@ namespace communicator if (m_batchSize < 1'000 || m_batchSize > 100'000'000) { - LogWarn("batch_size must be between 1000B and 100MB. Using default value."); + LogWarn("batch_size must be between 1KB and 100MB. Using default value."); m_batchSize = config::agent::DEFAULT_BATCH_SIZE; } } diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index 13632c5c06..78683d97a8 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -80,14 +80,8 @@ namespace communicator const auto reqParams = http_client::HttpRequestParams( boost::beast::http::verb::get, m_serverUrl, "/api/v1/commands", m_getHeaderInfo ? m_getHeaderInfo() : ""); - co_await m_httpClient->Co_PerformHttpRequest(m_token, - reqParams, - {}, - onAuthenticationFailed, - m_retryInterval, - m_batchSize, - onSuccess, - loopCondition); + co_await m_httpClient->Co_PerformHttpRequest( + m_token, reqParams, {}, onAuthenticationFailed, m_retryInterval, m_batchSize, onSuccess, loopCondition); } boost::asio::awaitable Communicator::WaitForTokenExpirationAndAuthenticate() diff --git a/src/agent/communicator/src/http_client.cpp b/src/agent/communicator/src/http_client.cpp index a2adee13db..3cbe7b1e37 100644 --- a/src/agent/communicator/src/http_client.cpp +++ b/src/agent/communicator/src/http_client.cpp @@ -162,7 +162,7 @@ namespace http_client if (messagesCount) { - LogInfo("Messages count: {}", messagesCount); + LogTrace("Messages count: {}", messagesCount); reqParams.Body = std::get<1>(messages); break; } diff --git a/src/agent/communicator/tests/communicator_test.cpp b/src/agent/communicator/tests/communicator_test.cpp index 27714c4411..fdbbd1e665 100644 --- a/src/agent/communicator/tests/communicator_test.cpp +++ b/src/agent/communicator/tests/communicator_test.cpp @@ -20,7 +20,7 @@ // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) using namespace testing; -using GetMessagesFuncType = std::function(const int)>; +using GetMessagesFuncType = std::function(const size_t)>; namespace { @@ -52,7 +52,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success) { auto mockHttpClient = std::make_unique(); - auto getMessages = [](const int) -> boost::asio::awaitable + auto getMessages = [](const size_t) -> boost::asio::awaitable { co_return intStringTuple {1, std::string("message-content")}; }; @@ -137,7 +137,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); co_await communicatorPtr->StatelessMessageProcessingTask( - [](const int) -> boost::asio::awaitable + [](const size_t) -> boost::asio::awaitable { co_return intStringTuple(1, std::string {"message"}); }, []([[maybe_unused]] const int, const std::string&) {}); }(), @@ -196,7 +196,7 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken) { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); co_await communicatorPtr->StatelessMessageProcessingTask( - [](const int) -> boost::asio::awaitable + [](const size_t) -> boost::asio::awaitable { co_return intStringTuple(1, std::string {"message"}); }, []([[maybe_unused]] const int, const std::string&) {}); }(), diff --git a/src/agent/communicator/tests/http_client_test.cpp b/src/agent/communicator/tests/http_client_test.cpp index e941af95bb..0c9eabaf2c 100644 --- a/src/agent/communicator/tests/http_client_test.cpp +++ b/src/agent/communicator/tests/http_client_test.cpp @@ -216,7 +216,7 @@ TEST_P(HttpClientTest, Co_PerformHttpRequest_Success) SetupMockSocketReadExpectations(GetParam()); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const size_t) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -276,7 +276,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect) SetupMockSocketConnectExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const size_t) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -323,7 +323,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails SetupMockSocketWriteExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const size_t) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -378,7 +378,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails) boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const size_t) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -432,7 +432,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization SetupMockSocketReadExpectations(boost::beast::http::status::unauthorized); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const size_t) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); diff --git a/src/agent/multitype_queue/include/multitype_queue.hpp b/src/agent/multitype_queue/include/multitype_queue.hpp index ad86482c48..cc4b3e9278 100644 --- a/src/agent/multitype_queue/include/multitype_queue.hpp +++ b/src/agent/multitype_queue/include/multitype_queue.hpp @@ -33,24 +33,19 @@ class MultiTypeQueue : public IMultiTypeQueue {MessageType::COMMAND, "COMMAND"}, }; - // TODO: doc - /// @brief + /// @brief maximun quantity of message to stored on the queue size_t m_maxItems; - // TODO: doc - /// @brief + /// @brief timeout in milliseconds for refreshing the queue status const std::chrono::milliseconds m_timeout; - // TODO: doc - /// @brief + /// @brief class for persistence implementation std::unique_ptr m_persistenceDest; - // TODO: doc - /// @brief + /// @brief mutex for protecting the queue access std::mutex m_mtx; - // TODO: doc - /// @brief + /// @brief condition variable related to the mutex std::condition_variable m_cv; /// @brief Time between batch requests @@ -78,8 +73,8 @@ class MultiTypeQueue : public IMultiTypeQueue m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL; } - m_maxItems = - getConfigValue.template operator()("agent", "queue_size").value_or(config::agent::QUEUE_DEFAULT_SIZE); + m_maxItems = getConfigValue.template operator()("agent", "queue_size") + .value_or(config::agent::QUEUE_DEFAULT_SIZE); const auto dbFilePath = dbFolderPath + "/" + config::agent::QUEUE_DB_NAME; diff --git a/src/agent/multitype_queue/src/multitype_queue.cpp b/src/agent/multitype_queue/src/multitype_queue.cpp index 113d837a0c..108cd69105 100644 --- a/src/agent/multitype_queue/src/multitype_queue.cpp +++ b/src/agent/multitype_queue/src/multitype_queue.cpp @@ -214,7 +214,6 @@ std::vector MultiTypeQueue::getNextN(MessageType type, } else if (std::holds_alternative(messageQuantity)) { - LogInfo("Requesting {}B ", std::get(messageQuantity)); arrayData = m_persistenceDest->RetrieveBySize( std::get(messageQuantity), m_mapMessageTypeName.at(type), moduleName, moduleType); } diff --git a/src/agent/multitype_queue/tests/multitype_queue_test.cpp b/src/agent/multitype_queue/tests/multitype_queue_test.cpp index e3eac085fa..acbab5b46e 100644 --- a/src/agent/multitype_queue/tests/multitype_queue_test.cpp +++ b/src/agent/multitype_queue/tests/multitype_queue_test.cpp @@ -78,7 +78,8 @@ namespace return std::nullopt; }; - const auto MOCK_GET_CONFIG_SMALL_SIZE = [](const std::string& table, const std::string& key) -> std::optional + const auto MOCK_GET_CONFIG_SMALL_SIZE = [](const std::string& table, + const std::string& key) -> std::optional { if (table == "agent" && key == "path.data") { diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index ac8fee47de..df74a832b2 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -98,7 +98,7 @@ void Agent::Run() "FetchCommands"); m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( - [this](const int numMessages) + [this](const size_t numMessages) { return GetMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, @@ -111,7 +111,7 @@ void Agent::Run() "Stateful"); m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( - [this](const int numMessages) + [this](const size_t numMessages) { return GetMessagesFromQueue(m_messageQueue, MessageType::STATELESS, diff --git a/src/agent/tests/message_queue_utils_test.cpp b/src/agent/tests/message_queue_utils_test.cpp index 4f911b9c8b..0feb8a5999 100644 --- a/src/agent/tests/message_queue_utils_test.cpp +++ b/src/agent/tests/message_queue_utils_test.cpp @@ -133,7 +133,7 @@ TEST_F(MessageQueueUtilsTest, GetEmptyMessagesFromQueueTest) metadata["agent"] = "test"; // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) - EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATEFUL, MAX_MESSAGES, "", "")) + EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATEFUL, MinMessagesSize, "", "")) .WillOnce([&testMessages]() -> boost::asio::awaitable> { co_return testMessages; }); // NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines) @@ -141,7 +141,8 @@ TEST_F(MessageQueueUtilsTest, GetEmptyMessagesFromQueueTest) auto awaitableResult = boost::asio::co_spawn( io_context, - GetMessagesFromQueue(mockQueue, MessageType::STATEFUL, MAX_MESSAGES, [&metadata]() { return metadata.dump(); }), + GetMessagesFromQueue( + mockQueue, MessageType::STATEFUL, MIN_SIZE_OF_MESSAGES, [&metadata]() { return metadata.dump(); }), boost::asio::use_future); const auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(1);