Skip to content

Commit

Permalink
feat: removing all instances of MessageSize and it's use
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Dec 18, 2024
1 parent 19556a0 commit 35d2323
Show file tree
Hide file tree
Showing 23 changed files with 92 additions and 119 deletions.
11 changes: 5 additions & 6 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include <config.h>
#include <logger.hpp>
#include <message.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -59,8 +58,8 @@ namespace communicator
m_retryInterval = getConfigValue.template operator()<std::time_t>("agent", "retry_interval")
.value_or(config::agent::DEFAULT_RETRY_INTERVAL);

m_batchSize = getConfigValue.template operator()<MessageSize>("events", "batch_size")
.value_or(MessageSize(config::agent::DEFAULT_BATCH_SIZE));
m_batchSize = getConfigValue.template operator()<size_t>("events", "batch_size")
.value_or(config::agent::DEFAULT_BATCH_SIZE);

if (m_batchSize < 1000ULL || m_batchSize > 100000000ULL)
{
Expand All @@ -85,14 +84,14 @@ namespace communicator
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void> StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Processes messages in a stateless manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void> StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Retrieves group configuration from the manager
Expand Down Expand Up @@ -128,7 +127,7 @@ namespace communicator
std::time_t m_retryInterval = config::agent::DEFAULT_RETRY_INTERVAL;

/// @brief Size for batch requests
MessageSize m_batchSize = config::agent::DEFAULT_BATCH_SIZE;
size_t m_batchSize = config::agent::DEFAULT_BATCH_SIZE;

/// @brief The server URL
std::string m_serverUrl;
Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ namespace http_client
boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
MessageSize batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;

Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ namespace http_client
virtual boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
MessageSize batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;

Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ namespace communicator
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand Down Expand Up @@ -168,7 +168,7 @@ namespace communicator
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ namespace http_client
boost::asio::awaitable<void> HttpClient::Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const MessageSize)> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
MessageSize batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess,
std::function<bool()> loopRequestCondition)
{
Expand Down
12 changes: 6 additions & 6 deletions src/agent/communicator/tests/communicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
{
auto mockHttpClient = std::make_unique<MockHttpClient>();

auto getMessages = [](const MessageSize) -> boost::asio::awaitable<intStringTuple>
auto getMessages = [](const size_t) -> boost::asio::awaitable<intStringTuple>
{
co_return intStringTuple {1, std::string("message-content")};
};
Expand All @@ -68,7 +68,7 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
GetMessagesFuncType pGetMessages,
std::function<void()>,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] MessageSize batchSize,
[[maybe_unused]] size_t batchSize,
std::function<void(const int, const std::string&)> pOnSuccess,
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
{
Expand Down Expand Up @@ -117,7 +117,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] MessageSize batchSize,
[[maybe_unused]] size_t batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
Expand All @@ -137,7 +137,7 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[](const MessageSize) -> boost::asio::awaitable<intStringTuple>
[](const size_t) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
Expand Down Expand Up @@ -177,7 +177,7 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] MessageSize batchSize,
[[maybe_unused]] size_t batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
Expand All @@ -196,7 +196,7 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[](const MessageSize) -> boost::asio::awaitable<intStringTuple>
[](const size_t) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/tests/mocks/mock_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class MockHttpClient : public http_client::IHttpClient
Co_PerformHttpRequest,
(std::shared_ptr<std::string> token,
http_client::HttpRequestParams params,
std::function<boost::asio::awaitable<intStringTuple>(const MessageSize)> messageGetter,
std::function<boost::asio::awaitable<intStringTuple>(const size_t)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
MessageSize batchSize,
size_t batchSize,
std::function<void(const int, const std::string&)> onSuccess,
std::function<bool()> loopRequestCondition),
(override));
Expand Down
2 changes: 1 addition & 1 deletion src/agent/configuration_parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ find_package(yaml-cpp CONFIG REQUIRED)

add_library(ConfigurationParser src/configuration_parser.cpp src/configuration_parser_utils.cpp)
target_include_directories(ConfigurationParser PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(ConfigurationParser PUBLIC yaml-cpp::yaml-cpp Logger MultiTypeQueue PRIVATE Config)
target_link_libraries(ConfigurationParser PUBLIC yaml-cpp::yaml-cpp Logger PRIVATE Config)

include(../../cmake/ConfigureTarget.cmake)
configure_target(ConfigurationParser)
Expand Down
32 changes: 25 additions & 7 deletions src/agent/configuration_parser/include/configuration_parser.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <logger.hpp>
#include <message.hpp>

#include <yaml-cpp/yaml.h>

Expand Down Expand Up @@ -59,7 +58,7 @@ namespace configuration
/// @details This function parses a string representing a size unit and returns the equivalent size_t
/// value. The size unit can be expressed in Bytes (e.g. "1B"), Mega bytes (e.g. "1M" or "1MB"), kilo bytes
/// (e.g. "1K" or "1KB"). If no unit is specified, the value is assumed to be in Bytes
MessageSize ParseSizeUnit(const std::string& option) const;
size_t ParseSizeUnit(const std::string& option) const;

public:
/// @brief Default constructor. Loads configuration from a default file path.
Expand Down Expand Up @@ -92,12 +91,22 @@ namespace configuration
std::optional<T> GetConfig(Keys... keys) const
{
YAML::Node current = YAML::Clone(m_config);
bool should_parse_size = false;

try
{
(
[&current](const auto& key)
[&current, &should_parse_size](const auto& key)
{
if constexpr (std::is_convertible_v<decltype(key), std::string_view>)
{
//This is a workaround for parsing size units
if (key == "batch_size")
{
should_parse_size = true;
}
}

current = current[key];
if (!current.IsDefined())
{
Expand All @@ -106,13 +115,22 @@ namespace configuration
}(keys),
...);

if constexpr (std::is_same_v<T, std::time_t>)
if (should_parse_size)
{
return ParseTimeUnit(current.as<std::string>());
// For batch_size, always parse as size unit and convert to requested type
auto size = ParseSizeUnit(current.as<std::string>());
if constexpr (std::is_integral_v<T>)
{
return static_cast<T>(size);
}
else
{
throw std::invalid_argument("Invalid type for batch_size");
}
}
else if constexpr (std::is_same_v<T, MessageSize>)
else if constexpr (std::is_same_v<T, std::time_t>)
{
return ParseSizeUnit(current.as<std::string>());
return ParseTimeUnit(current.as<std::string>());
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/agent/configuration_parser/src/configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ namespace configuration
LogInfo("Reload configuration done.");
}

MessageSize ConfigurationParser::ParseSizeUnit(const std::string& option) const
size_t ConfigurationParser::ParseSizeUnit(const std::string& option) const
{
std::string number;
unsigned int multiplier = 1;
Expand Down Expand Up @@ -242,6 +242,6 @@ namespace configuration
throw std::invalid_argument("Invalid size unit: " + option);
}

return static_cast<MessageSize>(std::stoul(number) * multiplier);
return static_cast<size_t>(std::stoul(number) * multiplier);
}
} // namespace configuration
31 changes: 16 additions & 15 deletions src/agent/configuration_parser/tests/configuration_parser_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ TEST_F(ConfigurationParserFileTest, isValidYamlFileValid)

TEST(ConfigurationParser, GetConfigBytes)
{
//Config should contain batch_size string in order to apply parsing
std::string strConfig = R"(
agent_array:
batch_size:
size_bytes: 500B
size_KB: 45KB
size_MB: 1MB
Expand All @@ -447,21 +448,21 @@ TEST(ConfigurationParser, GetConfigBytes)
size_default_KB: 53
)";
const auto parserStr = std::make_unique<configuration::ConfigurationParser>(strConfig);
const auto ret = parserStr->GetConfig<MessageSize>("agent_array", "size_bytes").value_or(MessageSize(1234));
ASSERT_EQ(ret.size, 500);
const auto retKB = parserStr->GetConfig<MessageSize>("agent_array", "size_KB").value_or(MessageSize(1234));
ASSERT_EQ(retKB.size, 45000);
const auto retMB = parserStr->GetConfig<MessageSize>("agent_array", "size_MB").value_or(MessageSize(1234));
ASSERT_EQ(retMB.size, 1000000);
const auto retM = parserStr->GetConfig<MessageSize>("agent_array", "size_M").value_or(MessageSize(1234));
ASSERT_EQ(retM.size, 4000000);
const auto retGB = parserStr->GetConfig<MessageSize>("agent_array", "size_GB").value_or(MessageSize(1234));
ASSERT_EQ(retGB.size, 2000000000);
const auto retG = parserStr->GetConfig<MessageSize>("agent_array", "size_G").value_or(MessageSize(1234));
ASSERT_EQ(retG.size, 3000000000);
const auto ret = parserStr->GetConfig<size_t>("batch_size", "size_bytes").value_or(1234);
ASSERT_EQ(ret, 500);
const auto retKB = parserStr->GetConfig<size_t>("batch_size", "size_KB").value_or(1234);
ASSERT_EQ(retKB, 45000);
const auto retMB = parserStr->GetConfig<size_t>("batch_size", "size_MB").value_or(1234);
ASSERT_EQ(retMB, 1000000);
const auto retM = parserStr->GetConfig<size_t>("batch_size", "size_M").value_or(1234);
ASSERT_EQ(retM, 4000000);
const auto retGB = parserStr->GetConfig<size_t>("batch_size", "size_GB").value_or(1234);
ASSERT_EQ(retGB, 2000000000);
const auto retG = parserStr->GetConfig<size_t>("batch_size", "size_G").value_or(1234);
ASSERT_EQ(retG, 3000000000);
const auto retDefaultKB =
parserStr->GetConfig<MessageSize>("agent_array", "size_default_KB").value_or(MessageSize(1234));
ASSERT_EQ(retDefaultKB.size, 53);
parserStr->GetConfig<size_t>("batch_size", "size_default_KB").value_or(1234);
ASSERT_EQ(retDefaultKB, 53);
}

int main(int argc, char** argv)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class IMultiTypeQueue
* @return boost::asio::awaitable<std::vector<Message>> Awaitable object representing the next N messages.
*/
virtual boost::asio::awaitable<std::vector<Message>> getNextBytesAwaitable(MessageType type,
const MessageSize messageQuantity,
const size_t messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;
/**
Expand All @@ -94,7 +94,7 @@ class IMultiTypeQueue
* @return std::vector<Message> A vector of messages fetched from the queue.
*/
virtual std::vector<Message> getNextBytes(MessageType type,
const MessageSize messageQuantity,
const size_t messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

Expand Down
37 changes: 0 additions & 37 deletions src/agent/multitype_queue/include/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,3 @@ class Message
moduleType == other.moduleType && metaData == other.metaData;
}
};

struct MessageSize
{
size_t size;

MessageSize()
: size(0)
{
}

MessageSize(size_t value)
: size(value)
{
}

operator size_t() const
{
return size;
}

bool operator==(const MessageSize& other) const
{
return size == other.size;
}

auto operator<=>(const MessageSize&) const = default;

auto operator<=>(const unsigned long long& other) const
{
return size <=> other;
}

bool operator==(const unsigned long long& other) const
{
return size == other;
}
};
Loading

0 comments on commit 35d2323

Please sign in to comment.