Skip to content

Commit

Permalink
feat: removing unused multitypequeue methods
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Dec 18, 2024
1 parent 35d2323 commit 7f944d6
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 190 deletions.
2 changes: 1 addition & 1 deletion src/agent/communicator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ target_include_directories(Communicator PUBLIC
${JWT_CPP_INCLUDE_DIRS})

target_compile_definitions(Communicator PRIVATE -DJWT_DISABLE_PICOJSON=ON)
target_link_libraries(Communicator PUBLIC Config Boost::asio Boost::beast Boost::system Boost::url Logger MultiTypeQueue PRIVATE OpenSSL::SSL OpenSSL::Crypto nlohmann_json::nlohmann_json)
target_link_libraries(Communicator PUBLIC Config Boost::asio Boost::beast Boost::system Boost::url Logger PRIVATE OpenSSL::SSL OpenSSL::Crypto nlohmann_json::nlohmann_json)

include(../../cmake/ConfigureTarget.cmake)
configure_target(Communicator)
Expand Down
1 change: 0 additions & 1 deletion src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <http_request_params.hpp>
#include <ihttp_socket.hpp>
#include <message.hpp>

#include <boost/asio.hpp>
#include <boost/beast.hpp>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ namespace configuration
{
if constexpr (std::is_convertible_v<decltype(key), std::string_view>)
{
//This is a workaround for parsing size units
if (key == "batch_size")
// This is a workaround for parsing size units
if (std::string_view(key) == "batch_size")
{
should_parse_size = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ TEST_F(ConfigurationParserFileTest, isValidYamlFileValid)

TEST(ConfigurationParser, GetConfigBytes)
{
//Config should contain batch_size string in order to apply parsing
// Config should contain batch_size string in order to apply parsing
std::string strConfig = R"(
batch_size:
size_bytes: 500B
Expand All @@ -460,8 +460,7 @@ TEST(ConfigurationParser, GetConfigBytes)
ASSERT_EQ(retGB, 2000000000);
const auto retG = parserStr->GetConfig<size_t>("batch_size", "size_G").value_or(1234);
ASSERT_EQ(retG, 3000000000);
const auto retDefaultKB =
parserStr->GetConfig<size_t>("batch_size", "size_default_KB").value_or(1234);
const auto retDefaultKB = parserStr->GetConfig<size_t>("batch_size", "size_default_KB").value_or(1234);
ASSERT_EQ(retDefaultKB, 53);
}

Expand Down
27 changes: 0 additions & 27 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ class IMultiTypeQueue
const size_t messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;
/**
* @brief Retrieves the next message from the queue asynchronously.
*
* @param type The type of the queue to use as the source.
* @param messageQuantity In number of messages.
* @param moduleName The name of the module requesting the message.
* @param moduleType The type of the module requesting the messages.
* @return boost::asio::awaitable<std::vector<Message>> Awaitable object representing the next N messages.
*/
virtual boost::asio::awaitable<std::vector<Message>> getNextNAwaitable(MessageType type,
const int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next N messages from the queue.
Expand All @@ -98,20 +85,6 @@ class IMultiTypeQueue
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next N messages from the queue.
*
* @param type The type of the queue to use as the source.
* @param messageQuantity The quantity of messages to return.
* @param moduleName The name of the module requesting the messages.
* @param moduleType The type of the module requesting the messages.
* @return std::vector<Message> A vector of messages fetched from the queue.
*/
virtual std::vector<Message> getNextN(MessageType type,
const int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Deletes a message from the queue.
*
Expand Down
16 changes: 0 additions & 16 deletions src/agent/multitype_queue/include/multitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,6 @@ class MultiTypeQueue : public IMultiTypeQueue
const size_t messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;
/**
* @copydoc IMultiTypeQueue::getNextNAwaitable(MessageType type, const size_t
* messageQuantity, const std::string moduleName, const std::string moduleType)
*/
boost::asio::awaitable<std::vector<Message>> getNextNAwaitable(MessageType type,
const int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextBytes(MessageType, size_t, const std::string, const std::string)
Expand All @@ -165,14 +157,6 @@ class MultiTypeQueue : public IMultiTypeQueue
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextN(MessageType, int, const std::string, const std::string)
*/
std::vector<Message> getNextN(MessageType type,
const int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::pop(MessageType, const std::string)
*/
Expand Down
55 changes: 2 additions & 53 deletions src/agent/multitype_queue/src/multitype_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,6 @@ boost::asio::awaitable<std::vector<Message>> MultiTypeQueue::getNextBytesAwaitab
co_return result;
}

boost::asio::awaitable<std::vector<Message>> MultiTypeQueue::getNextNAwaitable(MessageType type,
const int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
{
// waits for items to be available
while (isEmpty(type))
{
timer.expires_after(std::chrono::milliseconds(m_timeout));
co_await timer.async_wait(boost::asio::use_awaitable);
}

result = getNextN(type, messageQuantity, moduleName, moduleType);
}
else
{
LogError("Error didn't find the queue.");
}
co_return result;
}

std::vector<Message> MultiTypeQueue::getNextBytes(MessageType type,
const size_t messageQuantity,
const std::string moduleName,
Expand All @@ -212,33 +186,8 @@ std::vector<Message> MultiTypeQueue::getNextBytes(MessageType type,
std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
{
auto arrayData = m_persistenceDest->RetrieveBySize(
messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);

for (auto singleJson : arrayData)
{
result.emplace_back(
type, singleJson["data"], singleJson["moduleName"], singleJson["moduleType"], singleJson["metadata"]);
}
}
else
{
LogError("Error didn't find the queue.");
}
return result;
}

std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
const int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
{
nlohmann::json arrayData;
arrayData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);
auto arrayData =
m_persistenceDest->RetrieveBySize(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);

for (auto singleJson : arrayData)
{
Expand Down
91 changes: 4 additions & 87 deletions src/agent/multitype_queue/tests/multitype_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,91 +471,6 @@ TEST_F(MultiTypeQueueTest, PushMultipleGetMultiple)
EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS));
}

// Push Multiple, pop multiples
TEST_F(MultiTypeQueueTest, PushMultipleGetMultipleWithModule)
{
MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG);
const MessageType messageType {MessageType::STATELESS};
const std::string moduleName = "testModule";
const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName};

EXPECT_EQ(3, multiTypeQueue.push(messageToSend));

// Altough we're asking for 10 messages only the availables are returned.
auto messagesReceived =
multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
int i = 0;
for (const auto& singleMessage : messagesReceived)
{
EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.get<std::string>());
}

EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS, "fakemodule"));
EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS));
EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS, moduleName));
}

TEST_F(MultiTypeQueueTest, PushSinglesleGetMultipleWithModule)
{
MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG);

for (std::string i : {"1", "2", "3", "4", "5"})
{
const MessageType messageType {MessageType::STATELESS};
const nlohmann::json multipleDataContent = {"content-" + i};
const std::string moduleName = "module-" + i;
const Message messageToSend {messageType, multipleDataContent, moduleName, "", ""};
EXPECT_EQ(1, multiTypeQueue.push(messageToSend));
}

auto messagesReceived =
multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
EXPECT_EQ(5, messagesReceived.size());
int i = 0;
for (const auto& singleMessage : messagesReceived)
{
auto val = ++i;
EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.get<std::string>());
EXPECT_EQ("module-" + std::to_string(val), singleMessage.moduleName);
}

auto messageReceivedContent1 = multiTypeQueue.getNextN(
MessageType::STATELESS, 10, "module-1"); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
EXPECT_EQ(1, messageReceivedContent1.size());
}

TEST_F(MultiTypeQueueTest, GetNextAwaitableBase)
{
MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG);
boost::asio::io_context io_context;

// Coroutine that waits till there's a message of the needed type on the queue
boost::asio::co_spawn(
io_context,
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
[&multiTypeQueue]() -> boost::asio::awaitable<void>
{
auto messageReceived = co_await multiTypeQueue.getNextNAwaitable(MessageType::STATELESS, 2);
EXPECT_EQ(messageReceived[0].data.at("data"), "content-1");
EXPECT_EQ(messageReceived[1].data.at("data"), "content-2");
},
boost::asio::detached);

// Simulate the addition of needed message to the queue after some time
std::thread producer(
[&multiTypeQueue]()
{
std::this_thread::sleep_for(std::chrono::seconds(2));
const MessageType messageType {MessageType::STATELESS};
const nlohmann::json multipleDataContent = {"content-1", "content-2", "content-3"};
const Message messageToSend {messageType, multipleDataContent};
EXPECT_EQ(multiTypeQueue.push(messageToSend), 3);
});

io_context.run();
producer.join();
}

TEST_F(MultiTypeQueueTest, PushAwaitable)
{
MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG_SMALL_SIZE);
Expand Down Expand Up @@ -607,14 +522,16 @@ TEST_F(MultiTypeQueueTest, FifoOrderCheck)

// complete the queue with messages
const MessageType messageType {MessageType::STATEFUL};
size_t contentSize = 0;
for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
{
const nlohmann::json dataContent = {{"Data", "for STATEFUL" + std::to_string(i)}};
EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}), 1);
contentSize += dataContent.dump().size();
}

auto messageReceivedVector =
multiTypeQueue.getNextN(messageType, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
multiTypeQueue.getNextBytes(messageType, contentSize); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
EXPECT_EQ(messageReceivedVector.size(), 10);

std::for_each(messageReceivedVector.begin(),
Expand All @@ -626,7 +543,7 @@ TEST_F(MultiTypeQueueTest, FifoOrderCheck)
// Keep the order of the message: FIFO
for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
{
auto messageReceived = multiTypeQueue.getNextN(messageType, 1);
auto messageReceived = multiTypeQueue.getNextBytes(messageType, 1);
EXPECT_EQ(messageReceived[0].data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(i)}}));
EXPECT_TRUE(multiTypeQueue.pop(messageType));
}
Expand Down

0 comments on commit 7f944d6

Please sign in to comment.