diff --git a/src/agent/multitype_queue/include/multitype_queue.hpp b/src/agent/multitype_queue/include/multitype_queue.hpp index dc034b56ce..ad86482c48 100644 --- a/src/agent/multitype_queue/include/multitype_queue.hpp +++ b/src/agent/multitype_queue/include/multitype_queue.hpp @@ -35,11 +35,11 @@ class MultiTypeQueue : public IMultiTypeQueue // TODO: doc /// @brief - const size_t m_maxItems; + size_t m_maxItems; // TODO: doc /// @brief - const std::chrono::seconds m_timeout; + const std::chrono::milliseconds m_timeout; // TODO: doc /// @brief @@ -64,8 +64,7 @@ class MultiTypeQueue : public IMultiTypeQueue */ template MultiTypeQueue(const ConfigGetter& getConfigValue) - : m_maxItems(config::agent::QUEUE_DEFAULT_SIZE) - , m_timeout(config::agent::QUEUE_STATUS_REFRESH_TIMER) + : m_timeout(config::agent::QUEUE_STATUS_REFRESH_TIMER) { auto dbFolderPath = getConfigValue.template operator()("agent", "path.data").value_or(config::DEFAULT_DATA_PATH); @@ -79,6 +78,9 @@ class MultiTypeQueue : public IMultiTypeQueue m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL; } + m_maxItems = + getConfigValue.template operator()("agent", "queue_size").value_or(config::agent::QUEUE_DEFAULT_SIZE); + const auto dbFilePath = dbFolderPath + "/" + config::agent::QUEUE_DB_NAME; try diff --git a/src/agent/multitype_queue/tests/multitype_queue_test.cpp b/src/agent/multitype_queue/tests/multitype_queue_test.cpp index b087aa7075..e3eac085fa 100644 --- a/src/agent/multitype_queue/tests/multitype_queue_test.cpp +++ b/src/agent/multitype_queue/tests/multitype_queue_test.cpp @@ -15,9 +15,7 @@ #include "multitype_queue.hpp" #include "multitype_queue_test.hpp" -// constexpr int BIG_QUEUE_CAPACITY = 10; -// constexpr int SMALL_QUEUE_CAPACITY = 2; - +constexpr size_t SMALL_QUEUE_CAPACITY = 2; const nlohmann::json BASE_DATA_CONTENT = R"({{"data": "for STATELESS_0"}})"; const nlohmann::json MULTIPLE_DATA_CONTENT = {"content 1", "content 2", "content 3"}; @@ -67,6 +65,38 @@ namespace } } } + + const auto MOCK_GET_CONFIG = [](const std::string& table, const std::string& key) -> std::optional + { + if (table == "agent" && key == "path.data") + { + if constexpr (std::is_same_v) + { + return std::optional("."); + } + } + return std::nullopt; + }; + + const auto MOCK_GET_CONFIG_SMALL_SIZE = [](const std::string& table, const std::string& key) -> std::optional + { + if (table == "agent" && key == "path.data") + { + if constexpr (std::is_same_v) + { + return std::optional("."); + } + } + else if (table == "agent" && key == "queue_size") + { + if constexpr (std::is_same_v) + { + return std::optional(SMALL_QUEUE_CAPACITY); + } + } + return std::nullopt; + }; + } // namespace /// Test Methods @@ -83,7 +113,6 @@ void MultiTypeQueueTest::TearDown() {}; // JSON Basic methods. Move or delete if JSON Wrapper is done TEST_F(JsonTest, JSONConversionComparisson) { - GTEST_SKIP(); const nlohmann::json uj1 = {{"version", 1}, {"type", "integer"}}; // From string. If not unescape then it throws errors @@ -112,7 +141,6 @@ TEST_F(JsonTest, JSONConversionComparisson) TEST_F(JsonTest, JSONArrays) { - GTEST_SKIP(); // create JSON values const nlohmann::json j_object = {{"one", 1}, {"two", 2}, {"three", 3}}; const nlohmann::json j_array = {1, 2, 4, 8, 16}; @@ -131,525 +159,519 @@ TEST_F(JsonTest, JSONArrays) } } -// // Push, get and check the queue is not empty -// TEST_F(MultiTypeQueueTest, SinglePushGetNotEmpty) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const Message messageToSend {messageType, BASE_DATA_CONTENT}; - -// EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); -// auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); - -// auto typeSend = messageToSend.type; -// auto typeReceived = messageResponse.type; -// EXPECT_TRUE(typeSend == typeReceived); - -// auto dataResponse = messageResponse.data; -// EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); - -// EXPECT_FALSE(multiTypeQueue.isEmpty(MessageType::STATELESS)); -// } - -// // push and pop on a non-full queue -// TEST_F(MultiTypeQueueTest, SinglePushPopEmpty) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const Message messageToSend {messageType, BASE_DATA_CONTENT}; - -// EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); -// auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); -// auto dataResponse = messageResponse.data; -// EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); -// EXPECT_EQ(messageType, messageResponse.type); - -// auto messageResponseStateFul = multiTypeQueue.getNext(MessageType::STATEFUL); -// // TODO: this behavior can be change to return an empty message (type and module empty) -// EXPECT_EQ(messageResponseStateFul.type, MessageType::STATEFUL); -// EXPECT_EQ(messageResponseStateFul.data, "{}"_json); - -// multiTypeQueue.pop(MessageType::STATELESS); -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); - -// multiTypeQueue.pop(MessageType::STATELESS); -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); -// } - -// TEST_F(MultiTypeQueueTest, SinglePushGetWithModule) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const std::string moduleFakeName = "fake-module"; -// const std::string moduleName = "module"; -// const Message messageToSend {messageType, BASE_DATA_CONTENT, moduleName}; - -// EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); -// auto messageResponseWrongModule = multiTypeQueue.getNext(MessageType::STATELESS, moduleFakeName); - -// auto typeSend = messageToSend.type; -// auto typeReceived = messageResponseWrongModule.type; -// EXPECT_TRUE(typeSend == typeReceived); - -// EXPECT_EQ(messageResponseWrongModule.moduleName, moduleFakeName); -// EXPECT_EQ(messageResponseWrongModule.data, "{}"_json); - -// auto messageResponseCorrectModule = multiTypeQueue.getNext(MessageType::STATELESS, moduleName); - -// auto dataResponse = messageResponseCorrectModule.data; -// EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); - -// EXPECT_EQ(moduleName, messageResponseCorrectModule.moduleName); -// } - -// // Push, get and check while the queue is full -// TEST_F(MultiTypeQueueTest, SinglePushPopFullWithTimeout) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", SMALL_QUEUE_CAPACITY); - -// // complete the queue with messages -// const MessageType messageType {MessageType::COMMAND}; -// for (int i : {1, 2}) -// { -// const nlohmann::json dataContent = R"({"Data" : "for COMMAND)" + std::to_string(i) + R"("})"; -// EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}), 1); -// } - -// const nlohmann::json dataContent = R"({"Data" : "for COMMAND3"})"; -// Message exampleMessage {messageType, dataContent}; -// EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}, true), 0); - -// auto items = multiTypeQueue.storedItems(MessageType::COMMAND); -// EXPECT_EQ(items, SMALL_QUEUE_CAPACITY); -// EXPECT_TRUE(multiTypeQueue.isFull(MessageType::COMMAND)); -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); - -// multiTypeQueue.pop(MessageType::COMMAND); -// items = multiTypeQueue.storedItems(MessageType::COMMAND); -// EXPECT_NE(items, SMALL_QUEUE_CAPACITY); -// } - -// // Accesing different types of queues from several threads -// TEST_F(MultiTypeQueueTest, MultithreadDifferentType) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); - -// auto consumerStateLess = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// multiTypeQueue.pop(MessageType::STATELESS); -// } -// }; - -// auto consumerStateFull = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// multiTypeQueue.pop(MessageType::STATEFUL); -// } -// }; - -// auto messageProducer = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// const nlohmann::json dataContent = R"({{"Data", "Number )" + std::to_string(i) + R"("}})"; -// EXPECT_EQ(multiTypeQueue.push(Message(MessageType::STATELESS, dataContent)), 1); -// EXPECT_EQ(multiTypeQueue.push(Message(MessageType::STATEFUL, dataContent)), 1); -// } -// }; - -// const int itemsToInsert = 10; -// const int itemsToConsume = 5; - -// messageProducer(itemsToInsert); - -// std::thread consumerThread1(consumerStateLess, std::ref(itemsToConsume)); -// std::thread consumerThread2(consumerStateFull, std::ref(itemsToConsume)); - -// if (consumerThread1.joinable()) -// { -// consumerThread1.join(); -// } - -// if (consumerThread2.joinable()) -// { -// consumerThread2.join(); -// } - -// EXPECT_EQ(5, multiTypeQueue.storedItems(MessageType::STATELESS)); -// EXPECT_EQ(5, multiTypeQueue.storedItems(MessageType::STATEFUL)); - -// // Consume the rest of the messages -// std::thread consumerThread12(consumerStateLess, std::ref(itemsToConsume)); -// std::thread consumerThread22(consumerStateFull, std::ref(itemsToConsume)); - -// if (consumerThread12.joinable()) -// { -// consumerThread12.join(); -// } - -// if (consumerThread22.joinable()) -// { -// consumerThread22.join(); -// } - -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATEFUL)); -// } - -// // Accesing same queue from 2 different threads -// TEST_F(MultiTypeQueueTest, MultithreadSameType) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// auto messageType = MessageType::COMMAND; - -// auto consumerCommand1 = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// multiTypeQueue.pop(messageType); -// } -// }; - -// auto consumerCommand2 = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// multiTypeQueue.pop(messageType); -// } -// }; - -// auto messageProducer = [&](const int& count) -// { -// for (int i = 0; i < count; ++i) -// { -// const nlohmann::json dataContent = R"({{"Data": "for COMMAND)" + std::to_string(i) + R"("}})"; -// EXPECT_EQ(multiTypeQueue.push(Message(messageType, dataContent)), 1); -// } -// }; - -// const int itemsToInsert = 10; -// const int itemsToConsume = 5; - -// messageProducer(itemsToInsert); - -// EXPECT_EQ(itemsToInsert, multiTypeQueue.storedItems(messageType)); - -// std::thread consumerThread1(consumerCommand1, std::ref(itemsToConsume)); -// std::thread messageProducerThread1(consumerCommand2, std::ref(itemsToConsume)); - -// if (messageProducerThread1.joinable()) -// { -// messageProducerThread1.join(); -// } - -// if (consumerThread1.joinable()) -// { -// consumerThread1.join(); -// } - -// EXPECT_TRUE(multiTypeQueue.isEmpty(messageType)); -// } - -// // Push Multiple with single message and data array, -// // several gets, checks and pops -// TEST_F(MultiTypeQueueTest, PushMultipleSeveralSingleGets) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; - -// EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); - -// for (size_t i : {0u, 1u, 2u}) -// { -// auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); -// auto responseData = messageResponse.data; -// auto sentData = messageToSend.data[i].template get(); -// EXPECT_EQ(responseData, sentData); -// multiTypeQueue.pop(MessageType::STATELESS); -// } - -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 0); -// } - -// TEST_F(MultiTypeQueueTest, PushMultipleWithMessageVector) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); - -// std::vector messages; -// const MessageType messageType {MessageType::STATELESS}; -// for (std::string i : {"0", "1", "2"}) -// { -// const nlohmann::json multipleDataContent = {"content " + i}; -// messages.emplace_back(messageType, multipleDataContent); -// } -// EXPECT_EQ(messages.size(), 3); -// EXPECT_EQ(multiTypeQueue.push(messages), 3); -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 3); -// } - -// // push message vector with a mutiple data element -// TEST_F(MultiTypeQueueTest, PushVectorWithAMultipleInside) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); - -// std::vector messages; - -// // triple data content message -// const MessageType messageType {MessageType::STATELESS}; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; -// messages.push_back(messageToSend); - -// // triple message vector -// for (std::string i : {"0", "1", "2"}) -// { -// const nlohmann::json dataContent = {"content " + i}; -// messages.emplace_back(messageType, dataContent); -// } - -// EXPECT_EQ(6, multiTypeQueue.push(messages)); -// } - -// // Push Multiple, pop multiples -// TEST_F(MultiTypeQueueTest, PushMultipleGetMultiple) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; - -// EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 3); -// EXPECT_EQ(multiTypeQueue.popN(MessageType::STATELESS, 1), 1); -// EXPECT_EQ(multiTypeQueue.popN(MessageType::STATELESS, 3), 2); -// EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); -// EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS)); -// } - -// // Push Multiple, pop multiples -// TEST_F(MultiTypeQueueTest, PushMultipleGetMultipleWithModule) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const std::string moduleName = "testModule"; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; - -// EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); - -// // Altough we're asking for 10 messages only the availables are returned. -// auto messagesReceived = -// multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) -// int i = 0; -// for (const auto& singleMessage : messagesReceived) -// { -// EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.get()); -// } - -// EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS, "fakemodule")); -// EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS)); -// EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS, moduleName)); -// } - -// TEST_F(MultiTypeQueueTest, PushSinglesleGetMultipleWithModule) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); - -// for (std::string i : {"1", "2", "3", "4", "5"}) -// { -// const MessageType messageType {MessageType::STATELESS}; -// const nlohmann::json multipleDataContent = {"content-" + i}; -// const std::string moduleName = "module-" + i; -// const Message messageToSend {messageType, multipleDataContent, moduleName, "", ""}; -// EXPECT_EQ(1, multiTypeQueue.push(messageToSend)); -// } - -// auto messagesReceived = -// multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) -// EXPECT_EQ(5, messagesReceived.size()); -// int i = 0; -// for (const auto& singleMessage : messagesReceived) -// { -// auto val = ++i; -// EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.get()); -// EXPECT_EQ("module-" + std::to_string(val), singleMessage.moduleName); -// } - -// auto messageReceivedContent1 = multiTypeQueue.getNextN( -// MessageType::STATELESS, 10, "module-1"); // NOLINT(cppcoreguidelines-avoid-magic-numbers) -// EXPECT_EQ(1, messageReceivedContent1.size()); -// } - -// TEST_F(MultiTypeQueueTest, GetNextAwaitableBase) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// boost::asio::io_context io_context; - -// // Coroutine that waits till there's a message of the needed type on the queue -// boost::asio::co_spawn( -// io_context, -// // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) -// [&multiTypeQueue]() -> boost::asio::awaitable -// { -// auto messageReceived = co_await multiTypeQueue.getNextNAwaitable(MessageType::STATELESS, 2); -// EXPECT_EQ(messageReceived[0].data.at("data"), "content-1"); -// EXPECT_EQ(messageReceived[1].data.at("data"), "content-2"); -// }, -// boost::asio::detached); - -// // Simulate the addition of needed message to the queue after some time -// std::thread producer( -// [&multiTypeQueue]() -// { -// std::this_thread::sleep_for(std::chrono::seconds(2)); -// const MessageType messageType {MessageType::STATELESS}; -// const nlohmann::json multipleDataContent = {"content-1", "content-2", "content-3"}; -// const Message messageToSend {messageType, multipleDataContent}; -// EXPECT_EQ(multiTypeQueue.push(messageToSend), 3); -// }); - -// io_context.run(); -// producer.join(); -// } - -// TEST_F(MultiTypeQueueTest, PushAwaitable) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", SMALL_QUEUE_CAPACITY); -// boost::asio::io_context io_context; - -// for (int i : {1, 2}) -// { -// const nlohmann::json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; -// EXPECT_EQ(multiTypeQueue.push({MessageType::STATEFUL, dataContent}), 1); -// } - -// EXPECT_TRUE(multiTypeQueue.isFull(MessageType::STATEFUL)); -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); - -// // Coroutine that waits till there's space to push a new message -// boost::asio::co_spawn( -// io_context, -// // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) -// [&multiTypeQueue]() -> boost::asio::awaitable -// { -// const nlohmann::json dataContent = {"content-1"}; -// const Message messageToSend {MessageType::STATEFUL, dataContent}; -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); -// auto messagesPushed = co_await multiTypeQueue.pushAwaitable(messageToSend); -// EXPECT_EQ(messagesPushed, 1); -// EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); -// }, -// boost::asio::detached); - -// // Simulate poping one message so there's space to push a new one -// std::thread consumer( -// [&multiTypeQueue]() -// { -// std::this_thread::sleep_for(std::chrono::seconds(2)); -// EXPECT_EQ(multiTypeQueue.popN(MessageType::STATEFUL, 1), 1); -// // TODO: double check this behavior, is it mandatory to stop the context here? -// }); - -// io_context.run(); -// consumer.join(); - -// EXPECT_TRUE(multiTypeQueue.isFull(MessageType::STATEFUL)); -// } - -// TEST_F(MultiTypeQueueTest, FifoOrderCheck) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); - -// // complete the queue with messages -// const MessageType messageType {MessageType::STATEFUL}; -// for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) -// { -// const nlohmann::json dataContent = {{"Data", "for STATEFUL" + std::to_string(i)}}; -// EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}), 1); -// } - -// auto messageReceivedVector = -// multiTypeQueue.getNextN(messageType, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) -// EXPECT_EQ(messageReceivedVector.size(), 10); - -// std::for_each(messageReceivedVector.begin(), -// messageReceivedVector.end(), -// [i = 0](const auto& singleMessage) mutable { -// EXPECT_EQ(singleMessage.data, (nlohmann::json {{"Data", "for STATEFUL" + -// std::to_string(++i)}})); -// }); - -// // Keep the order of the message: FIFO -// for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) -// { -// auto messageReceived = multiTypeQueue.getNextN(messageType, 1); -// EXPECT_EQ(messageReceived[0].data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(i)}})); -// EXPECT_TRUE(multiTypeQueue.pop(messageType)); -// } -// } - -// TEST_F(MultiTypeQueueTest, GetBySizeAboveMax) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const std::string moduleName = "testModule"; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; - -// EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); - -// // Size request should contemplate data and module name string size -// size_t sizeAsked = 0; -// for (const auto& message : MULTIPLE_DATA_CONTENT) -// { -// sizeAsked += message.dump().size(); -// sizeAsked += moduleName.size(); -// } -// // Duplicate to surpass the maximun -// sizeAsked *= 2; - -// auto messagesReceived = multiTypeQueue.getNextN(MessageType::STATELESS, sizeAsked); -// int i = 0; -// for (const auto& singleMessage : messagesReceived) -// { -// EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.get()); -// } - -// EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS, moduleName)); -// } - -// TEST_F(MultiTypeQueueTest, GetByBelowMax) -// { -// GTEST_SKIP(); -// MultiTypeQueue multiTypeQueue(".", BIG_QUEUE_CAPACITY); -// const MessageType messageType {MessageType::STATELESS}; -// const std::string moduleName = "testModule"; -// const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; - -// EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); - -// size_t sizeAsked = 0; -// sizeAsked += MULTIPLE_DATA_CONTENT.at(0).dump().size(); -// sizeAsked += moduleName.size(); -// // Fetching less than a single message size -// sizeAsked -= 1; - -// auto messagesReceived = multiTypeQueue.getNextN(MessageType::STATELESS, sizeAsked); -// EXPECT_EQ(1, messagesReceived.size()); -// } +TEST_F(MultiTypeQueueTest, Constructor) +{ + + const auto FUNC = []([[maybe_unused]] const std::string&, + [[maybe_unused]] const std::string&) -> std::optional + { + return T {}; + }; + + EXPECT_NO_THROW(MultiTypeQueue multiTypeQueue(FUNC)); +} + +// Push, get and check the queue is not empty +TEST_F(MultiTypeQueueTest, SinglePushGetNotEmpty) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, BASE_DATA_CONTENT}; + + EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); + auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); + + auto typeSend = messageToSend.type; + auto typeReceived = messageResponse.type; + EXPECT_TRUE(typeSend == typeReceived); + + auto dataResponse = messageResponse.data; + EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); + + EXPECT_FALSE(multiTypeQueue.isEmpty(MessageType::STATELESS)); +} + +// push and pop on a non-full queue +TEST_F(MultiTypeQueueTest, SinglePushPopEmpty) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, BASE_DATA_CONTENT}; + + EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); + auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); + auto dataResponse = messageResponse.data; + EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); + EXPECT_EQ(messageType, messageResponse.type); + + auto messageResponseStateFul = multiTypeQueue.getNext(MessageType::STATEFUL); + // TODO: this behavior can be change to return an empty message (type and module empty) + EXPECT_EQ(messageResponseStateFul.type, MessageType::STATEFUL); + EXPECT_EQ(messageResponseStateFul.data, "{}"_json); + + multiTypeQueue.pop(MessageType::STATELESS); + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); + + multiTypeQueue.pop(MessageType::STATELESS); + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); +} + +TEST_F(MultiTypeQueueTest, SinglePushGetWithModule) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleFakeName = "fake-module"; + const std::string moduleName = "module"; + const Message messageToSend {messageType, BASE_DATA_CONTENT, moduleName}; + + EXPECT_EQ(multiTypeQueue.push(messageToSend), 1); + auto messageResponseWrongModule = multiTypeQueue.getNext(MessageType::STATELESS, moduleFakeName); + + auto typeSend = messageToSend.type; + auto typeReceived = messageResponseWrongModule.type; + EXPECT_TRUE(typeSend == typeReceived); + + EXPECT_EQ(messageResponseWrongModule.moduleName, moduleFakeName); + EXPECT_EQ(messageResponseWrongModule.data, "{}"_json); + + auto messageResponseCorrectModule = multiTypeQueue.getNext(MessageType::STATELESS, moduleName); + + auto dataResponse = messageResponseCorrectModule.data; + EXPECT_EQ(dataResponse, BASE_DATA_CONTENT); + + EXPECT_EQ(moduleName, messageResponseCorrectModule.moduleName); +} + +// Push, get and check while the queue is full +TEST_F(MultiTypeQueueTest, SinglePushPopFullWithTimeout) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG_SMALL_SIZE); + + // complete the queue with messages + const MessageType messageType {MessageType::COMMAND}; + for (int i : {1, 2}) + { + const nlohmann::json dataContent = R"({"Data" : "for COMMAND)" + std::to_string(i) + R"("})"; + EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}), 1); + } + + const nlohmann::json dataContent = R"({"Data" : "for COMMAND3"})"; + Message exampleMessage {messageType, dataContent}; + EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}, true), 0); + + auto items = multiTypeQueue.storedItems(MessageType::COMMAND); + EXPECT_EQ(items, SMALL_QUEUE_CAPACITY); + EXPECT_TRUE(multiTypeQueue.isFull(MessageType::COMMAND)); + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); + + multiTypeQueue.pop(MessageType::COMMAND); + items = multiTypeQueue.storedItems(MessageType::COMMAND); + EXPECT_NE(items, SMALL_QUEUE_CAPACITY); +} + +// Accesing different types of queues from several threads +TEST_F(MultiTypeQueueTest, MultithreadDifferentType) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + + auto consumerStateLess = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + multiTypeQueue.pop(MessageType::STATELESS); + } + }; + + auto consumerStateFull = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + multiTypeQueue.pop(MessageType::STATEFUL); + } + }; + + auto messageProducer = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + const nlohmann::json dataContent = R"({{"Data", "Number )" + std::to_string(i) + R"("}})"; + EXPECT_EQ(multiTypeQueue.push(Message(MessageType::STATELESS, dataContent)), 1); + EXPECT_EQ(multiTypeQueue.push(Message(MessageType::STATEFUL, dataContent)), 1); + } + }; + + const int itemsToInsert = 10; + const int itemsToConsume = 5; + + messageProducer(itemsToInsert); + + std::thread consumerThread1(consumerStateLess, std::ref(itemsToConsume)); + std::thread consumerThread2(consumerStateFull, std::ref(itemsToConsume)); + + if (consumerThread1.joinable()) + { + consumerThread1.join(); + } + + if (consumerThread2.joinable()) + { + consumerThread2.join(); + } + + EXPECT_EQ(5, multiTypeQueue.storedItems(MessageType::STATELESS)); + EXPECT_EQ(5, multiTypeQueue.storedItems(MessageType::STATEFUL)); + + // Consume the rest of the messages + std::thread consumerThread12(consumerStateLess, std::ref(itemsToConsume)); + std::thread consumerThread22(consumerStateFull, std::ref(itemsToConsume)); + + if (consumerThread12.joinable()) + { + consumerThread12.join(); + } + + if (consumerThread22.joinable()) + { + consumerThread22.join(); + } + + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATEFUL)); +} + +// Accesing same queue from 2 different threads +TEST_F(MultiTypeQueueTest, MultithreadSameType) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + auto messageType = MessageType::COMMAND; + + auto consumerCommand1 = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + multiTypeQueue.pop(messageType); + } + }; + + auto consumerCommand2 = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + multiTypeQueue.pop(messageType); + } + }; + + auto messageProducer = [&](const int& count) + { + for (int i = 0; i < count; ++i) + { + const nlohmann::json dataContent = R"({{"Data": "for COMMAND)" + std::to_string(i) + R"("}})"; + EXPECT_EQ(multiTypeQueue.push(Message(messageType, dataContent)), 1); + } + }; + + const int itemsToInsert = 10; + const int itemsToConsume = 5; + + messageProducer(itemsToInsert); + + EXPECT_EQ(itemsToInsert, multiTypeQueue.storedItems(messageType)); + + std::thread consumerThread1(consumerCommand1, std::ref(itemsToConsume)); + std::thread messageProducerThread1(consumerCommand2, std::ref(itemsToConsume)); + + if (messageProducerThread1.joinable()) + { + messageProducerThread1.join(); + } + + if (consumerThread1.joinable()) + { + consumerThread1.join(); + } + + EXPECT_TRUE(multiTypeQueue.isEmpty(messageType)); +} + +// Push Multiple with single message and data array, +// several gets, checks and pops +TEST_F(MultiTypeQueueTest, PushMultipleSeveralSingleGets) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; + + EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); + + for (size_t i : {0u, 1u, 2u}) + { + auto messageResponse = multiTypeQueue.getNext(MessageType::STATELESS); + auto responseData = messageResponse.data; + auto sentData = messageToSend.data[i].template get(); + EXPECT_EQ(responseData, sentData); + multiTypeQueue.pop(MessageType::STATELESS); + } + + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 0); +} + +TEST_F(MultiTypeQueueTest, PushMultipleWithMessageVector) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + + std::vector messages; + const MessageType messageType {MessageType::STATELESS}; + for (std::string i : {"0", "1", "2"}) + { + const nlohmann::json multipleDataContent = {"content " + i}; + messages.emplace_back(messageType, multipleDataContent); + } + EXPECT_EQ(messages.size(), 3); + EXPECT_EQ(multiTypeQueue.push(messages), 3); + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 3); +} + +// push message vector with a mutiple data element +TEST_F(MultiTypeQueueTest, PushVectorWithAMultipleInside) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + + std::vector messages; + + // triple data content message + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; + messages.push_back(messageToSend); + + // triple message vector + for (std::string i : {"0", "1", "2"}) + { + const nlohmann::json dataContent = {"content " + i}; + messages.emplace_back(messageType, dataContent); + } + + EXPECT_EQ(6, multiTypeQueue.push(messages)); +} + +// Push Multiple, pop multiples +TEST_F(MultiTypeQueueTest, PushMultipleGetMultiple) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT}; + + EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATELESS), 3); + EXPECT_EQ(multiTypeQueue.popN(MessageType::STATELESS, 1), 1); + EXPECT_EQ(multiTypeQueue.popN(MessageType::STATELESS, 3), 2); + EXPECT_TRUE(multiTypeQueue.isEmpty(MessageType::STATELESS)); + EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS)); +} + +// Push Multiple, pop multiples +TEST_F(MultiTypeQueueTest, PushMultipleGetMultipleWithModule) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleName = "testModule"; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; + + EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); + + // Altough we're asking for 10 messages only the availables are returned. + auto messagesReceived = + multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) + int i = 0; + for (const auto& singleMessage : messagesReceived) + { + EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.get()); + } + + EXPECT_EQ(0, multiTypeQueue.storedItems(MessageType::STATELESS, "fakemodule")); + EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS)); + EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS, moduleName)); +} + +TEST_F(MultiTypeQueueTest, PushSinglesleGetMultipleWithModule) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + + for (std::string i : {"1", "2", "3", "4", "5"}) + { + const MessageType messageType {MessageType::STATELESS}; + const nlohmann::json multipleDataContent = {"content-" + i}; + const std::string moduleName = "module-" + i; + const Message messageToSend {messageType, multipleDataContent, moduleName, "", ""}; + EXPECT_EQ(1, multiTypeQueue.push(messageToSend)); + } + + auto messagesReceived = + multiTypeQueue.getNextN(MessageType::STATELESS, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) + EXPECT_EQ(5, messagesReceived.size()); + int i = 0; + for (const auto& singleMessage : messagesReceived) + { + auto val = ++i; + EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.get()); + EXPECT_EQ("module-" + std::to_string(val), singleMessage.moduleName); + } + + auto messageReceivedContent1 = multiTypeQueue.getNextN( + MessageType::STATELESS, 10, "module-1"); // NOLINT(cppcoreguidelines-avoid-magic-numbers) + EXPECT_EQ(1, messageReceivedContent1.size()); +} + +TEST_F(MultiTypeQueueTest, GetNextAwaitableBase) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + boost::asio::io_context io_context; + + // Coroutine that waits till there's a message of the needed type on the queue + boost::asio::co_spawn( + io_context, + // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) + [&multiTypeQueue]() -> boost::asio::awaitable + { + auto messageReceived = co_await multiTypeQueue.getNextNAwaitable(MessageType::STATELESS, 2); + EXPECT_EQ(messageReceived[0].data.at("data"), "content-1"); + EXPECT_EQ(messageReceived[1].data.at("data"), "content-2"); + }, + boost::asio::detached); + + // Simulate the addition of needed message to the queue after some time + std::thread producer( + [&multiTypeQueue]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + const MessageType messageType {MessageType::STATELESS}; + const nlohmann::json multipleDataContent = {"content-1", "content-2", "content-3"}; + const Message messageToSend {messageType, multipleDataContent}; + EXPECT_EQ(multiTypeQueue.push(messageToSend), 3); + }); + + io_context.run(); + producer.join(); +} + +TEST_F(MultiTypeQueueTest, PushAwaitable) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG_SMALL_SIZE); + boost::asio::io_context io_context; + + for (int i : {1, 2}) + { + const nlohmann::json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; + EXPECT_EQ(multiTypeQueue.push({MessageType::STATEFUL, dataContent}), 1); + } + + EXPECT_TRUE(multiTypeQueue.isFull(MessageType::STATEFUL)); + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); + + // Coroutine that waits till there's space to push a new message + boost::asio::co_spawn( + io_context, + // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) + [&multiTypeQueue]() -> boost::asio::awaitable + { + const nlohmann::json dataContent = {"content-1"}; + const Message messageToSend {MessageType::STATEFUL, dataContent}; + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); + auto messagesPushed = co_await multiTypeQueue.pushAwaitable(messageToSend); + EXPECT_EQ(messagesPushed, 1); + EXPECT_EQ(multiTypeQueue.storedItems(MessageType::STATEFUL), 2); + }, + boost::asio::detached); + + // Simulate poping one message so there's space to push a new one + std::thread consumer( + [&multiTypeQueue]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + EXPECT_EQ(multiTypeQueue.popN(MessageType::STATEFUL, 1), 1); + // TODO: double check this behavior, is it mandatory to stop the context here? + }); + + io_context.run(); + consumer.join(); + + EXPECT_TRUE(multiTypeQueue.isFull(MessageType::STATEFUL)); +} + +TEST_F(MultiTypeQueueTest, FifoOrderCheck) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + + // complete the queue with messages + const MessageType messageType {MessageType::STATEFUL}; + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + const nlohmann::json dataContent = {{"Data", "for STATEFUL" + std::to_string(i)}}; + EXPECT_EQ(multiTypeQueue.push({messageType, dataContent}), 1); + } + + auto messageReceivedVector = + multiTypeQueue.getNextN(messageType, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers) + EXPECT_EQ(messageReceivedVector.size(), 10); + + std::for_each(messageReceivedVector.begin(), + messageReceivedVector.end(), + [i = 0](const auto& singleMessage) mutable { + EXPECT_EQ(singleMessage.data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(++i)}})); + }); + + // Keep the order of the message: FIFO + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + auto messageReceived = multiTypeQueue.getNextN(messageType, 1); + EXPECT_EQ(messageReceived[0].data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(i)}})); + EXPECT_TRUE(multiTypeQueue.pop(messageType)); + } +} + +TEST_F(MultiTypeQueueTest, GetBySizeAboveMax) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleName = "testModule"; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; + + EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); + + // Size request should contemplate data and module name string size + size_t sizeAsked = 0; + for (const auto& message : MULTIPLE_DATA_CONTENT) + { + sizeAsked += message.dump().size(); + sizeAsked += moduleName.size(); + } + // Duplicate to surpass the maximun + sizeAsked *= 2; + + auto messagesReceived = multiTypeQueue.getNextN(MessageType::STATELESS, sizeAsked); + int i = 0; + for (const auto& singleMessage : messagesReceived) + { + EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.get()); + } + + EXPECT_EQ(3, multiTypeQueue.storedItems(MessageType::STATELESS, moduleName)); +} + +TEST_F(MultiTypeQueueTest, GetByBelowMax) +{ + MultiTypeQueue multiTypeQueue(MOCK_GET_CONFIG); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleName = "testModule"; + const Message messageToSend {messageType, MULTIPLE_DATA_CONTENT, moduleName}; + + EXPECT_EQ(3, multiTypeQueue.push(messageToSend)); + + size_t sizeAsked = 0; + sizeAsked += MULTIPLE_DATA_CONTENT.at(0).dump().size(); + sizeAsked += moduleName.size(); + // Fetching less than a single message size + sizeAsked -= 1; + + auto messagesReceived = multiTypeQueue.getNextN(MessageType::STATELESS, sizeAsked); + EXPECT_EQ(1, messagesReceived.size()); +}