From 60f1042f088dcba812e9ec18494687b707ed3a66 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 12 Sep 2024 16:33:42 -0400 Subject: [PATCH] Improve the reliability of rosbag2 tests (#1796) * Remove wait_until_shutdown. This has almost exactly the same functionality as wait_for_condition, except for two things: 1. It is templated on the Timeout type. 2. It calls rclcpp::shutdown after the loop completes. However, neither of those is necessary; all callers to it use a std::chrono::duration, and all of the test fixtures already call rclcpp::shutdown. Thus, just remove it and make all callers use wait_for_condition instead. Signed-off-by: Chris Lalancette * Shutdown the async spinner node without rclcpp::shutdown. That is, we really don't actually want to do a full rclcpp shutdown here; we only want to stop spinning. Accomplish that with an executor, and timing out every 100 milliseconds to check if we are done yet. Signed-off-by: Chris Lalancette * Small fixes to start_async_spin in rosbag2_tests. Make sure it only spins as long as we haven't shutdown, and that it wakes up every so often to check that fact. Signed-off-by: Chris Lalancette * Wait for topics to be discovered during recorder->record(). The main reason for that is that these tests generally want to test certain expectations around how many messages were received. However, if discovery takes longer than we expect, then it could be the case that we "missed" messages at the beginning because discovery hadn't yet completed. Fix this by just waiting around for the recorder to get all the subscriptions it expects before moving on with the test. Signed-off-by: Chris Lalancette * Feedback from review. Signed-off-by: Chris Lalancette * Switch to using MockRecorder. Signed-off-by: Chris Lalancette * Fixes from review. Signed-off-by: Chris Lalancette * Feedback from review. Signed-off-by: Chris Lalancette * Apply suggestions from code review Co-authored-by: Michael Orlov Signed-off-by: Chris Lalancette * Switch to using spin, rather than spin_some. That's because there is currently at least one bug associated with spin_some in rclcpp. However, it turns out that we don't even need to use it, as we can just as easily use spin() along with exec.cancel(). Signed-off-by: Chris Lalancette * Make sure to stop_spinning when we tear down the test. Signed-off-by: Chris Lalancette * Use scopes to shutdown spinning. Signed-off-by: Chris Lalancette * Nested contexts just to explicitly cleanup the async spinners. Signed-off-by: Chris Lalancette * Update rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp Co-authored-by: Michael Orlov Signed-off-by: Chris Lalancette * Apply the same fix to rosbag2_tests. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette Co-authored-by: Michael Orlov --- .../include/rosbag2_test_common/wait_for.hpp | 15 -- .../test_rosbag2_cpp_get_service_info.cpp | 171 ++++++++++-------- .../record_integration_fixture.hpp | 46 +++-- .../test_keyboard_controls.cpp | 5 +- .../test/rosbag2_transport/test_record.cpp | 88 +++++---- .../rosbag2_transport/test_record_all.cpp | 42 +++-- .../test_record_all_ignore_leaf_topics.cpp | 6 +- ..._record_all_include_unpublished_topics.cpp | 4 + .../test_record_all_no_discovery.cpp | 7 +- .../test_record_all_use_sim_time.cpp | 44 +++-- .../rosbag2_transport/test_record_regex.cpp | 57 ++++-- 11 files changed, 286 insertions(+), 199 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp index 9578e72439..bb13ef2f5b 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp @@ -37,21 +37,6 @@ bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition) return true; } -template -bool wait_until_shutdown(Timeout timeout, Condition condition) -{ - using clock = std::chrono::system_clock; - auto start = clock::now(); - while (!condition()) { - if ((clock::now() - start) > timeout) { - return false; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - rclcpp::shutdown(); - return true; -} - template bool wait_until_condition( Condition condition, diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 3ab7aeb98a..401deb311e 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -14,8 +14,10 @@ #include +#include #include #include +#include #include #include "rosbag2_cpp/info.hpp" @@ -65,6 +67,7 @@ class Rosbag2CPPGetServiceInfoTest void TearDown() override { + stop_spinning(); fs::remove_all(root_bag_path_); } @@ -81,22 +84,36 @@ class Rosbag2CPPGetServiceInfoTest template void start_async_spin(T node) { - node_spinner_future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (!exit_from_node_spinner_) { - exec.spin_some(); - } - }); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); + }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() { - exit_from_node_spinner_ = true; - if (node_spinner_future_.valid()) { - node_spinner_future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } + exec_ = nullptr; } } @@ -146,8 +163,8 @@ class Rosbag2CPPGetServiceInfoTest // relative path to the root of the bag file. fs::path root_bag_path_; - std::future node_spinner_future_; - std::atomic_bool exit_from_node_spinner_{false}; + std::unique_ptr exec_{nullptr}; + std::thread spin_thread_; }; TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_only) { @@ -194,36 +211,36 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); - - ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); - ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - constexpr size_t num_service_requests = 3; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); + ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); + } rosbag2_cpp::Info info; std::vector> ret_service_infos; @@ -272,45 +289,45 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); - - ASSERT_TRUE( - wait_for_subscriptions( - *recorder, - {"/test_service1/_service_event", - "/test_service2/_service_event", - "/test_topic1", - "/test_topic2"} - ) - ); - constexpr size_t num_service_requests = 2; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager1->send_request()); - ASSERT_TRUE(service_client_manager2->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE( + wait_for_subscriptions( + *recorder, + {"/test_service1/_service_event", + "/test_service2/_service_event", + "/test_topic1", + "/test_topic2"} + ) + ); + + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager1->send_request()); + ASSERT_TRUE(service_client_manager2->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + pub_manager.run_publishers(); + + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2 + 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); } - pub_manager.run_publishers(); - - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); - - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2 + 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; - - recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); rosbag2_cpp::Info info; std::vector> ret_service_infos; diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index cd1089fec6..c85f69cb9c 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -12,26 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include +#include #include #include +#include #include #include #include #include "rclcpp/rclcpp.hpp" -#include "rosbag2_transport/record_options.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_test_common/memory_management.hpp" #include "rosbag2_transport_test_fixture.hpp" -using namespace ::testing; // NOLINT -using namespace rosbag2_transport; // NOLINT -using namespace std::chrono_literals; // NOLINT - #ifndef ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ #define ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ @@ -50,21 +47,43 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture void TearDown() override { + stop_spinning(); rclcpp::shutdown(); } template void start_async_spin(T node) { - future_ = std::async( - std::launch::async, [node]() -> void {rclcpp::spin(node);}); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); + }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() { - rclcpp::shutdown(); - if (future_.valid()) { - future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } + exec_ = nullptr; } } @@ -83,8 +102,9 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture return filtered_messages; } + std::unique_ptr exec_{nullptr}; MemoryManagement memory_; - std::future future_; + std::thread spin_thread_; }; #endif // ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 3097be472e..60be757b6a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -203,13 +203,12 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls) recorder->record(); - this->start_async_spin(recorder); + start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); EXPECT_THAT(recorder->is_paused(), true); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); EXPECT_THAT(recorder->is_paused(), false); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); EXPECT_THAT(recorder->is_paused(), true); - - this->stop_spinning(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index dd5288eb54..2e7fc81069 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -52,30 +52,39 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are std::move(writer_), storage_options_, record_options); recorder->record(); - start_async_spin(recorder); - - ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - - pub_manager.run_publishers(); + constexpr size_t expected_messages = 4; + std::vector> recorded_messages; + std::unordered_map< + std::string, + std::pair + > recorded_topics; - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + + pub_manager.run_publishers(); + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "failed to capture expected messages in time" << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + recorded_topics = mock_writer.get_topics(); + } - constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }); - auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time" << - "recorded messages = " << recorded_messages.size(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); - stop_spinning(); - auto recorded_topics = mock_writer.get_topics(); ASSERT_THAT(recorded_topics, SizeIs(2)); EXPECT_THAT(recorded_topics.at(string_topic).first.serialization_format, Eq("rmw_format")); EXPECT_THAT(recorded_topics.at(array_topic).first.serialization_format, Eq("rmw_format")); @@ -126,10 +135,12 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) std::move(writer_), storage_options_, record_options); recorder->record(); + start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); - start_async_spin(recorder); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); @@ -146,11 +157,11 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) // 4 because we're running recorder->record() and publishers twice constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -187,6 +198,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -197,11 +209,11 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -251,6 +263,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -261,11 +274,11 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -294,6 +307,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -302,11 +316,11 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) static_cast(writer.get_implementation_handle()); size_t expected_messages = num_latched_messages; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -428,6 +442,7 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); auto & writer = recorder->get_writer_handle(); mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -436,14 +451,15 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(string_topic, string_message, expected_messages); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 597deb40a8..2b2b85a40f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -30,6 +30,7 @@ #include "rosbag2_transport/recorder.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -56,6 +57,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -66,11 +68,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -98,17 +100,25 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a rosbag2_transport::RecordOptions record_options = {false, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /test_service_1 and /test_service_2, along with + // the event topics /test_service_1/_service_event and /test_service_2/_service_event are + // available to be recorded. However, wait_for_service_to_be_ready() only checks the services, + // not the event topics, so ask the recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_1/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_2/_service_event")); + // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. + // For one request, service event topic gets 2 messages. ASSERT_TRUE(client_manager_1->send_request()); ASSERT_TRUE(client_manager_2->send_request()); @@ -116,11 +126,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -140,16 +150,24 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a rosbag2_transport::RecordOptions record_options = {true, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); + // At this point, we expect that the service /test_service_1, along with the topic /string_topic, + // along with the event topic /test_service_1, along with the split topic /events/write_split are + // available to be recorded. However, wait_for_matched() and wait_for_service_to_be_ready() only + // check on the service and the topic, not the event or the split topic, so ask the recorder to + // make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service/_service_event")); + pub_manager.run_publishers(); // By default, only client introspection is enabled. @@ -160,11 +178,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp index 955855d138..17080477a6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp @@ -69,11 +69,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_two_topics_ignore_l static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp index 018e260f40..d0a07d33ef 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -23,6 +23,7 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" #include "rosbag2_test_common/publication_manager.hpp" +#include "rosbag2_test_common/wait_for.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -40,6 +41,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); @@ -58,6 +60,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); @@ -78,6 +81,7 @@ TEST_F( auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp index 5e30702e65..ce9181514d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -44,6 +44,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(topic, string_message, 5); @@ -54,11 +55,11 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 0; - rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(2), + rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() > expected_messages; - }); + }, + std::chrono::seconds(2)); // We can't EXPECT anything here, since there may be some messages from rosout auto recorded_topics = mock_writer.get_topics(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 4b6476ac32..2e612310b1 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -98,32 +98,38 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) std::move(writer_), storage_options_, record_options); recorder->record(); + constexpr size_t expected_messages = 10; + std::vector> recorded_messages; + std::unordered_map messages_per_topic; + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); - ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); + ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); - pub_manager.run_publishers(); + pub_manager.run_publishers(); - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); - constexpr size_t expected_messages = 10; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }); - auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time. " << - "recorded messages = " << recorded_messages.size(); - stop_spinning(); - - auto messages_per_topic = mock_writer.messages_per_topic(); + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + messages_per_topic = mock_writer.messages_per_topic(); + } + + ASSERT_EQ(messages_per_topic.count(string_topic), 1u); EXPECT_EQ(messages_per_topic[string_topic], 5u); EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index f6d4d81fc8..6cf26a4bed 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -33,6 +33,7 @@ #include "test_msgs/message_fixtures.hpp" #include "test_msgs/srv/basic_types.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -77,6 +78,7 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); @@ -87,11 +89,11 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -137,7 +139,6 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) record_options.regex = regex; record_options.exclude_regex = topics_regex_to_exclude; - // TODO(karsten1987) Refactor this into publication manager rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(v1, test_string_messages[0], 3); @@ -151,6 +152,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -162,11 +164,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -226,6 +228,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -237,11 +240,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -290,11 +293,12 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); @@ -302,6 +306,14 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -312,11 +324,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -363,11 +375,12 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); @@ -375,6 +388,14 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -385,11 +406,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages));