From 999d17bba37f912c465762dd9ac4478ded313000 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 22 Aug 2024 19:50:39 +0000 Subject: [PATCH 01/15] Remove wait_until_shutdown. This has almost exactly the same functionality as wait_for_condition, except for two things: 1. It is templated on the Timeout type. 2. It calls rclcpp::shutdown after the loop completes. However, neither of those is necessary; all callers to it use a std::chrono::duration, and all of the test fixtures already call rclcpp::shutdown. Thus, just remove it and make all callers use wait_for_condition instead. Signed-off-by: Chris Lalancette --- .../include/rosbag2_test_common/wait_for.hpp | 15 -------- .../test/rosbag2_transport/test_record.cpp | 36 +++++++++---------- .../rosbag2_transport/test_record_all.cpp | 20 +++++------ .../test_record_all_ignore_leaf_topics.cpp | 6 ++-- .../test_record_all_no_discovery.cpp | 6 ++-- .../test_record_all_use_sim_time.cpp | 6 ++-- .../rosbag2_transport/test_record_regex.cpp | 30 ++++++++-------- 7 files changed, 52 insertions(+), 67 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp index 9578e72439..bb13ef2f5b 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp @@ -37,21 +37,6 @@ bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition) return true; } -template -bool wait_until_shutdown(Timeout timeout, Condition condition) -{ - using clock = std::chrono::system_clock; - auto start = clock::now(); - while (!condition()) { - if ((clock::now() - start) > timeout) { - return false; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - rclcpp::shutdown(); - return true; -} - template bool wait_until_condition( Condition condition, diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index dd5288eb54..8b9c81cac8 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -64,11 +64,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time" << "recorded messages = " << recorded_messages.size(); @@ -146,11 +146,11 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) // 4 because we're running recorder->record() and publishers twice constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -197,11 +197,11 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -261,11 +261,11 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -302,11 +302,11 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) static_cast(writer.get_implementation_handle()); size_t expected_messages = num_latched_messages; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); auto recorded_topics = mock_writer.get_topics(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; @@ -439,11 +439,11 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [&mock_writer, &expected_messages]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 597deb40a8..0849493288 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -66,11 +66,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -108,7 +108,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. + // For one request, service event topic gets 2 messages. ASSERT_TRUE(client_manager_1->send_request()); ASSERT_TRUE(client_manager_2->send_request()); @@ -116,11 +116,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); @@ -160,11 +160,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_EQ(recorded_messages.size(), expected_messages); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp index 955855d138..17080477a6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_ignore_leaf_topics.cpp @@ -69,11 +69,11 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_two_topics_ignore_l static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 2; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp index 5e30702e65..72013a1e8a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -54,11 +54,11 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 0; - rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(2), + rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() > expected_messages; - }); + }, + std::chrono::seconds(2)); // We can't EXPECT anything here, since there may be some messages from rosout auto recorded_topics = mock_writer.get_topics(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 4b6476ac32..a4c5041bc8 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -113,11 +113,11 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 10; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); EXPECT_TRUE(ret) << "failed to capture expected messages in time. " << "recorded messages = " << recorded_messages.size(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index f6d4d81fc8..f9e23e5de0 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -87,11 +87,11 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -162,11 +162,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -237,11 +237,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) static_cast(writer.get_implementation_handle()); constexpr size_t expected_messages = 3; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect @@ -312,11 +312,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -385,11 +385,11 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b2->send_request()); constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_shutdown( - std::chrono::seconds(5), + auto ret = rosbag2_test_common::wait_until_condition( [ =, &mock_writer]() { return mock_writer.get_messages().size() >= expected_messages; - }); + }, + std::chrono::seconds(5)); EXPECT_TRUE(ret) << "failed to capture expected messages in time"; auto recorded_messages = mock_writer.get_messages(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); From da2f9dd7eb115c0d19f91feb9be56464aa9ee64a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 27 Aug 2024 21:18:44 +0000 Subject: [PATCH 02/15] Shutdown the async spinner node without rclcpp::shutdown. That is, we really don't actually want to do a full rclcpp shutdown here; we only want to stop spinning. Accomplish that with an executor, and timing out every 100 milliseconds to check if we are done yet. Signed-off-by: Chris Lalancette --- .../record_integration_fixture.hpp | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index cd1089fec6..787eb9da0c 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include +#include #include #include #include @@ -22,16 +22,12 @@ #include "rclcpp/rclcpp.hpp" -#include "rosbag2_transport/record_options.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_test_common/memory_management.hpp" #include "rosbag2_transport_test_fixture.hpp" -using namespace ::testing; // NOLINT -using namespace rosbag2_transport; // NOLINT -using namespace std::chrono_literals; // NOLINT - #ifndef ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ #define ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ @@ -57,12 +53,20 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture void start_async_spin(T node) { future_ = std::async( - std::launch::async, [node]() -> void {rclcpp::spin(node);}); + std::launch::async, + [node, this]() -> void { + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(node); + while (rclcpp::ok() && !done_) { + exec.spin_some(std::chrono::milliseconds(100)); + } + exec.remove_node(node); + }); } void stop_spinning() { - rclcpp::shutdown(); + done_ = true; if (future_.valid()) { future_.wait(); } @@ -85,6 +89,7 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture MemoryManagement memory_; std::future future_; + std::atomic_bool done_{false}; }; #endif // ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ From 76eee02db43113afedef5841130161d1ec1338c7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 28 Aug 2024 17:21:09 +0000 Subject: [PATCH 03/15] Small fixes to start_async_spin in rosbag2_tests. Make sure it only spins as long as we haven't shutdown, and that it wakes up every so often to check that fact. Signed-off-by: Chris Lalancette --- .../test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 3ab7aeb98a..97d4456b6e 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -86,9 +86,10 @@ class Rosbag2CPPGetServiceInfoTest [node, this]() -> void { rclcpp::executors::SingleThreadedExecutor exec; exec.add_node(node); - while (!exit_from_node_spinner_) { - exec.spin_some(); + while (rclcpp::ok() && !exit_from_node_spinner_) { + exec.spin_some(std::chrono::milliseconds(100)); } + exec.remove_node(node); }); } From db342cd58c5c2ede0f691a25e2c974b203461436 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 28 Aug 2024 18:26:12 +0000 Subject: [PATCH 04/15] Wait for topics to be discovered during recorder->record(). The main reason for that is that these tests generally want to test certain expectations around how many messages were received. However, if discovery takes longer than we expect, then it could be the case that we "missed" messages at the beginning because discovery hadn't yet completed. Fix this by just waiting around for the recorder to get all the subscriptions it expects before moving on with the test. Signed-off-by: Chris Lalancette --- .../test/rosbag2_transport/test_record.cpp | 65 ++++++++++++++++++- .../rosbag2_transport/test_record_all.cpp | 35 ++++++++++ ..._record_all_include_unpublished_topics.cpp | 50 ++++++++++++++ .../test_record_all_use_sim_time.cpp | 11 ++++ .../rosbag2_transport/test_record_regex.cpp | 54 ++++++++++++++- 5 files changed, 213 insertions(+), 2 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 8b9c81cac8..9bfa64fbb4 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -54,6 +54,17 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + // /array_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -126,10 +137,21 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) std::move(writer_), storage_options_, record_options); recorder->record(); + start_async_spin(recorder); + + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); - start_async_spin(recorder); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); @@ -188,6 +210,16 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /qos_chatter + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); pub_manager.run_publishers(); @@ -252,6 +284,16 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /sensor_chatter + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); pub_manager.run_publishers(); @@ -295,6 +337,16 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /latched_chatter + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); auto & writer = recorder->get_writer_handle(); @@ -436,6 +488,17 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(string_topic, string_message, expected_messages); + + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 0849493288..a585a9535a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -57,6 +57,18 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + // /events/write_split + // /array_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 3; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -104,6 +116,17 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /test_service_1/_service_event + // /test_service_2/_service_event + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); @@ -146,6 +169,18 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /test_service/_service_event + // /string_topic + // /events/write_split + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 3; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp index 018e260f40..8d655950a5 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -23,6 +23,7 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" #include "rosbag2_test_common/publication_manager.hpp" +#include "rosbag2_test_common/wait_for.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -41,6 +42,18 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore recorder->record(); start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /rosout + // /parameter_events + // /events/write_split + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 3; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); } @@ -59,6 +72,19 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include recorder->record(); start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + // /rosout + // /parameter_events + // /events/write_split + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 4; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); } @@ -79,6 +105,18 @@ TEST_F( recorder->record(); start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /rosout + // /parameter_events + // /events/write_split + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 3; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); @@ -90,6 +128,18 @@ TEST_F( // Publish 10 messages at a 30ms interval for a steady 300 milliseconds worth of data pub_manager.setup_publisher( string_topic, string_message, 10, rclcpp::QoS{rclcpp::KeepAll()}, 30ms); + // Wait again until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /string_topic + // /rosout + // /parameter_events + // /events/write_split + discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 4; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index a4c5041bc8..4d00573cad 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -100,6 +100,17 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /clock + // /string_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index f9e23e5de0..95c079c3ea 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -78,6 +78,16 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /aa + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 1; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); pub_manager.run_publishers(); @@ -137,7 +147,6 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) record_options.regex = regex; record_options.exclude_regex = topics_regex_to_exclude; - // TODO(karsten1987) Refactor this into publication manager rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(v1, test_string_messages[0], 3); @@ -152,6 +161,17 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /still_nice_topic + // /awesome_nice_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -227,6 +247,17 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /still_nice_topic + // /awesome_nice_topic + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -296,6 +327,17 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) start_async_spin(recorder); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /still_nice_servce/_service_event + // /awesome_nice_service/_service_event + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); + ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_e1->wait_for_service_to_be_ready()); @@ -366,6 +408,16 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); + // Wait until recorder discovery is complete, otherwise messages might be missed. + // The currently expected topics: + // /still_nice_servce/_service_event + // /awesome_nice_service/_service_event + auto discovery_ret = rosbag2_test_common::wait_until_condition( + [&recorder]() { + return recorder->subscriptions().size() == 2; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(discovery_ret); start_async_spin(recorder); From c73fa80e8eeef5e93f1e0b59d2cff05b6b024669 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 3 Sep 2024 14:20:41 +0000 Subject: [PATCH 05/15] Feedback from review. Signed-off-by: Chris Lalancette --- .../test/rosbag2_transport/test_record.cpp | 61 ------------------- .../rosbag2_transport/test_record_regex.cpp | 32 ---------- 2 files changed, 93 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 9bfa64fbb4..edabfccdae 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -54,17 +54,6 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - // /array_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -139,16 +128,6 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -210,16 +189,6 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /qos_chatter - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); pub_manager.run_publishers(); @@ -284,16 +253,6 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /sensor_chatter - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); pub_manager.run_publishers(); @@ -337,16 +296,6 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /latched_chatter - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); auto & writer = recorder->get_writer_handle(); @@ -489,16 +438,6 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(string_topic, string_message, expected_messages); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index 95c079c3ea..c3ed18953d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -78,16 +78,6 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /aa - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 1; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); pub_manager.run_publishers(); @@ -161,17 +151,6 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /still_nice_topic - // /awesome_nice_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -247,17 +226,6 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /still_nice_topic - // /awesome_nice_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); From 11bb8649e4aa99a9f49f5624c71908e6cbd1d52c Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 4 Sep 2024 16:08:13 +0000 Subject: [PATCH 06/15] Switch to using MockRecorder. Signed-off-by: Chris Lalancette --- .../rosbag2_transport/test_record_all.cpp | 62 +++++++------------ ..._record_all_include_unpublished_topics.cpp | 49 --------------- .../test_record_all_use_sim_time.cpp | 14 +---- .../rosbag2_transport/test_record_regex.cpp | 42 ++++++------- 4 files changed, 46 insertions(+), 121 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index a585a9535a..a87f04b969 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -30,6 +30,7 @@ #include "rosbag2_transport/recorder.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -51,27 +52,20 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are rosbag2_transport::RecordOptions record_options = {true, false, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - // /events/write_split - // /array_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 3; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + // At this point, we expect that the topics /string_topic, /array_topic, and /events/write_split + // are available to be recorded. However, wait_for_matched() only checks for /string_topic + // and /array_topic, so ask the recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/events/write_split")); + pub_manager.run_publishers(); auto & writer = recorder->get_writer_handle(); @@ -110,26 +104,22 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a rosbag2_transport::RecordOptions record_options = {false, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /test_service_1/_service_event - // /test_service_2/_service_event - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /test_service_1 and /test_service_2, along with + // the event topics /test_service_1/_service_event and /test_service_2/_service_event are + // available to be recorded. However, wait_for_service_to_be_ready() only checks the services, + // not the event topics, so ask the recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_1/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service_2/_service_event")); + // By default, only client introspection is enabled. // For one request, service event topic gets 2 messages. ASSERT_TRUE(client_manager_1->send_request()); @@ -163,28 +153,24 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a rosbag2_transport::RecordOptions record_options = {true, true, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /test_service/_service_event - // /string_topic - // /events/write_split - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 3; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); + // At this point, we expect that the service /test_service_1, along with the topic /string_topic, + // along with the event topic /test_service_1, along with the split topic /events/write_split are + // available to be recorded. However, wait_for_matched() and wait_for_service_to_be_ready() only + // check on the service and the topic, not the event or the split topic, so ask the recorder to + // make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/events/write_split")); + pub_manager.run_publishers(); // By default, only client introspection is enabled. diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp index 8d655950a5..9092379a5d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -42,18 +42,6 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /rosout - // /parameter_events - // /events/write_split - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 3; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); } @@ -72,19 +60,6 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - // /rosout - // /parameter_events - // /events/write_split - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 4; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); } @@ -105,18 +80,6 @@ TEST_F( recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /rosout - // /parameter_events - // /events/write_split - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 3; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); @@ -128,18 +91,6 @@ TEST_F( // Publish 10 messages at a 30ms interval for a steady 300 milliseconds worth of data pub_manager.setup_publisher( string_topic, string_message, 10, rclcpp::QoS{rclcpp::KeepAll()}, 30ms); - // Wait again until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /string_topic - // /rosout - // /parameter_events - // /events/write_split - discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 4; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); pub_manager.run_publishers(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 4d00573cad..ce3fac0b65 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -100,23 +100,15 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /clock - // /string_topic - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); + // We have to ensure that the /clock topic is available as well + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/clock")); + pub_manager.run_publishers(); auto & writer = recorder->get_writer_handle(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index c3ed18953d..a0c1e79023 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -33,6 +33,7 @@ #include "test_msgs/message_fixtures.hpp" #include "test_msgs/srv/basic_types.hpp" +#include "mock_recorder.hpp" #include "record_integration_fixture.hpp" using namespace std::chrono_literals; // NOLINT @@ -289,29 +290,26 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); start_async_spin(recorder); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /still_nice_servce/_service_event - // /awesome_nice_service/_service_event - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); - ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_e1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/still_nice_service/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/awesome_nice_service/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -373,19 +371,9 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording auto service_manager_b2 = std::make_shared>(b2); - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); - // Wait until recorder discovery is complete, otherwise messages might be missed. - // The currently expected topics: - // /still_nice_servce/_service_event - // /awesome_nice_service/_service_event - auto discovery_ret = rosbag2_test_common::wait_until_condition( - [&recorder]() { - return recorder->subscriptions().size() == 2; - }, - std::chrono::seconds(5)); - ASSERT_TRUE(discovery_ret); start_async_spin(recorder); @@ -395,6 +383,14 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording ASSERT_TRUE(service_manager_b1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_b2->wait_for_service_to_be_ready()); + // At this point, we expect that the services /still_nice_service and /awesome_nice_service, + // along with the event topics /still_nice_service/_service_event + // and /awesome_nice_service/_service_event are available to be recorded. However, + // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the + // recorder to make sure it has successfully subscribed to all. + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/still_nice_service/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/awesome_nice_service/_service_event")); + auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); From 79911268808cdfa90ba170a9ef3e1bc1536859e8 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 4 Sep 2024 16:25:01 +0000 Subject: [PATCH 07/15] Fixes from review. Signed-off-by: Chris Lalancette --- .../test_rosbag2_cpp_get_service_info.cpp | 26 ++++++++++++------- .../record_integration_fixture.hpp | 25 +++++++++++------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 97d4456b6e..f438ca46f3 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -14,8 +14,10 @@ #include +#include #include #include +#include #include #include "rosbag2_cpp/info.hpp" @@ -81,16 +83,20 @@ class Rosbag2CPPGetServiceInfoTest template void start_async_spin(T node) { - node_spinner_future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (rclcpp::ok() && !exit_from_node_spinner_) { - exec.spin_some(std::chrono::milliseconds(100)); - } - exec.remove_node(node); - }); + if (!exit_from_node_spinner_.exchange(false)) { + node_spinner_future_ = std::async( + std::launch::async, + [node, this]() -> void { + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(node); + while (rclcpp::ok() && !exit_from_node_spinner_) { + exec.spin_some(std::chrono::milliseconds(100)); + } + exec.remove_node(node); + }); + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index 787eb9da0c..57c090c637 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -52,16 +53,20 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture template void start_async_spin(T node) { - future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (rclcpp::ok() && !done_) { - exec.spin_some(std::chrono::milliseconds(100)); - } - exec.remove_node(node); - }); + if (!done_.exchange(false)) { + future_ = std::async( + std::launch::async, + [node, this]() -> void { + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(node); + while (rclcpp::ok() && !done_) { + exec.spin_some(std::chrono::milliseconds(100)); + } + exec.remove_node(node); + }); + } else { + throw std::runtime_error("Already spinning a node, can't start a new node spin"); + } } void stop_spinning() From 83b0b6b77e96abc1ae7a8e0bb42b96c51e40cea9 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 4 Sep 2024 19:13:43 +0000 Subject: [PATCH 08/15] Feedback from review. Signed-off-by: Chris Lalancette --- .../test/rosbag2_transport/test_record_all.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index a87f04b969..2a133e9b51 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -52,7 +52,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are rosbag2_transport::RecordOptions record_options = {true, false, false, {}, {}, {}, {"/rosout"}, {}, {}, "rmw_format", 100ms}; - auto recorder = std::make_shared( + auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options); recorder->record(); @@ -61,11 +61,6 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - // At this point, we expect that the topics /string_topic, /array_topic, and /events/write_split - // are available to be recorded. However, wait_for_matched() only checks for /string_topic - // and /array_topic, so ask the recorder to make sure it has successfully subscribed to all. - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/events/write_split")); - pub_manager.run_publishers(); auto & writer = recorder->get_writer_handle(); @@ -169,7 +164,6 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a // check on the service and the topic, not the event or the split topic, so ask the recorder to // make sure it has successfully subscribed to all. ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/test_service/_service_event")); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/events/write_split")); pub_manager.run_publishers(); From 0724e876dfe73b60ca81168cfaf8ad2dec5a2fff Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 4 Sep 2024 15:24:02 -0400 Subject: [PATCH 09/15] Apply suggestions from code review Co-authored-by: Michael Orlov Signed-off-by: Chris Lalancette --- .../test/rosbag2_transport/test_record_regex.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index a0c1e79023..b8ec31cfd7 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -307,8 +307,8 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) // and /awesome_nice_service/_service_event are available to be recorded. However, // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the // recorder to make sure it has successfully subscribed to all. - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/still_nice_service/_service_event")); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/awesome_nice_service/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -388,8 +388,8 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording // and /awesome_nice_service/_service_event are available to be recorded. However, // wait_for_service_to_be_ready() only checks the services, not the event topics, so ask the // recorder to make sure it has successfully subscribed to all. - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/still_nice_service/_service_event")); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/awesome_nice_service/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v1 + "/_service_event")); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(v2 + "/_service_event")); auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); From eb8db9e18ee43f05a66ede68784ae242f51d6bd3 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 5 Sep 2024 12:45:00 +0000 Subject: [PATCH 10/15] Switch to using spin, rather than spin_some. That's because there is currently at least one bug associated with spin_some in rclcpp. However, it turns out that we don't even need to use it, as we can just as easily use spin() along with exec.cancel(). Signed-off-by: Chris Lalancette --- .../test_rosbag2_cpp_get_service_info.cpp | 28 +++++++++---------- .../record_integration_fixture.hpp | 28 +++++++++---------- .../test_record_all_use_sim_time.cpp | 6 ++-- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index f438ca46f3..ebcff109bb 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -83,16 +83,12 @@ class Rosbag2CPPGetServiceInfoTest template void start_async_spin(T node) { - if (!exit_from_node_spinner_.exchange(false)) { - node_spinner_future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (rclcpp::ok() && !exit_from_node_spinner_) { - exec.spin_some(std::chrono::milliseconds(100)); - } - exec.remove_node(node); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); }); } else { throw std::runtime_error("Already spinning a node, can't start a new node spin"); @@ -101,9 +97,11 @@ class Rosbag2CPPGetServiceInfoTest void stop_spinning() { - exit_from_node_spinner_ = true; - if (node_spinner_future_.valid()) { - node_spinner_future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } } } @@ -153,8 +151,8 @@ class Rosbag2CPPGetServiceInfoTest // relative path to the root of the bag file. fs::path root_bag_path_; - std::future node_spinner_future_; - std::atomic_bool exit_from_node_spinner_{false}; + std::unique_ptr exec_{nullptr}; + std::thread spin_thread_; }; TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_only) { diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index 57c090c637..0bd3f87204 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -53,16 +53,12 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture template void start_async_spin(T node) { - if (!done_.exchange(false)) { - future_ = std::async( - std::launch::async, - [node, this]() -> void { - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(node); - while (rclcpp::ok() && !done_) { - exec.spin_some(std::chrono::milliseconds(100)); - } - exec.remove_node(node); + if (exec_ == nullptr) { + exec_ = std::make_unique(); + exec_->add_node(node); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); }); } else { throw std::runtime_error("Already spinning a node, can't start a new node spin"); @@ -71,9 +67,11 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture void stop_spinning() { - done_ = true; - if (future_.valid()) { - future_.wait(); + if (exec_ != nullptr) { + exec_->cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } } } @@ -92,9 +90,9 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture return filtered_messages; } + std::unique_ptr exec_{nullptr}; MemoryManagement memory_; - std::future future_; - std::atomic_bool done_{false}; + std::thread spin_thread_; }; #endif // ROSBAG2_TRANSPORT__RECORD_INTEGRATION_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index ce3fac0b65..ccc348a2a8 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -106,9 +106,6 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); - // We have to ensure that the /clock topic is available as well - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered("/clock")); - pub_manager.run_publishers(); auto & writer = recorder->get_writer_handle(); @@ -122,11 +119,12 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) }, std::chrono::seconds(5)); auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time. " << + ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << "recorded messages = " << recorded_messages.size(); stop_spinning(); auto messages_per_topic = mock_writer.messages_per_topic(); + ASSERT_EQ(messages_per_topic.count(string_topic), 1u); EXPECT_EQ(messages_per_topic[string_topic], 5u); EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); From a25186b14975ae75b45c64db412dabb313195235 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 5 Sep 2024 15:35:55 +0000 Subject: [PATCH 11/15] Make sure to stop_spinning when we tear down the test. Signed-off-by: Chris Lalancette --- .../rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp | 9 +-------- .../rosbag2_transport/record_integration_fixture.hpp | 1 + .../test/rosbag2_transport/test_keyboard_controls.cpp | 2 -- rosbag2_transport/test/rosbag2_transport/test_record.cpp | 1 - .../rosbag2_transport/test_record_all_use_sim_time.cpp | 1 - 5 files changed, 2 insertions(+), 12 deletions(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index ebcff109bb..6723a82a16 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -67,6 +67,7 @@ class Rosbag2CPPGetServiceInfoTest void TearDown() override { + stop_spinning(); fs::remove_all(root_bag_path_); } @@ -200,8 +201,6 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only recorder->record(); start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); @@ -227,8 +226,6 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); rosbag2_cpp::Info info; std::vector> ret_service_infos; @@ -278,8 +275,6 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se recorder->record(); start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit( - [&]() {stop_spinning();}); ASSERT_TRUE( wait_for_subscriptions( @@ -314,8 +309,6 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; recorder->stop(); - stop_spinning(); - cleanup_process_handle.cancel(); rosbag2_cpp::Info info; std::vector> ret_service_infos; diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index 0bd3f87204..c87efe67d1 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -47,6 +47,7 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture void TearDown() override { + stop_spinning(); rclcpp::shutdown(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 3097be472e..fd13d9bc9f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -210,6 +210,4 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls) EXPECT_THAT(recorder->is_paused(), false); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); EXPECT_THAT(recorder->is_paused(), true); - - this->stop_spinning(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index edabfccdae..0dd913f41a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -73,7 +73,6 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are EXPECT_TRUE(ret) << "failed to capture expected messages in time" << "recorded messages = " << recorded_messages.size(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); - stop_spinning(); auto recorded_topics = mock_writer.get_topics(); ASSERT_THAT(recorded_topics, SizeIs(2)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index ccc348a2a8..fe6896adec 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -121,7 +121,6 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) auto recorded_messages = mock_writer.get_messages(); ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << "recorded messages = " << recorded_messages.size(); - stop_spinning(); auto messages_per_topic = mock_writer.messages_per_topic(); ASSERT_EQ(messages_per_topic.count(string_topic), 1u); From 404c78a5a4a97e7d25ac06d8d4fa50148eb32727 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 6 Sep 2024 17:12:01 +0000 Subject: [PATCH 12/15] Use scopes to shutdown spinning. Signed-off-by: Chris Lalancette --- .../rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp | 3 +++ .../test/rosbag2_transport/record_integration_fixture.hpp | 1 + .../test/rosbag2_transport/test_keyboard_controls.cpp | 3 ++- rosbag2_transport/test/rosbag2_transport/test_record.cpp | 6 ++++++ .../test/rosbag2_transport/test_record_all.cpp | 3 +++ .../test_record_all_include_unpublished_topics.cpp | 3 +++ .../test/rosbag2_transport/test_record_all_no_discovery.cpp | 1 + .../test/rosbag2_transport/test_record_all_use_sim_time.cpp | 1 + .../test/rosbag2_transport/test_record_regex.cpp | 5 +++++ 9 files changed, 25 insertions(+), 1 deletion(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 6723a82a16..fb94d0aaa1 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -103,6 +103,7 @@ class Rosbag2CPPGetServiceInfoTest if (spin_thread_.joinable()) { spin_thread_.join(); } + exec_ = nullptr; } } @@ -201,6 +202,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); @@ -275,6 +277,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE( wait_for_subscriptions( diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index c87efe67d1..14c37d3eef 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -73,6 +73,7 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture if (spin_thread_.joinable()) { spin_thread_.join(); } + exec_ = nullptr; } } diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index fd13d9bc9f..60be757b6a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -203,7 +203,8 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls) recorder->record(); - this->start_async_spin(recorder); + start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); EXPECT_THAT(recorder->is_paused(), true); keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 0dd913f41a..89543c2c16 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -53,6 +53,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -126,6 +127,7 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); auto & writer = recorder->get_writer_handle(); auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); @@ -187,6 +189,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -251,6 +254,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -294,6 +298,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str())); @@ -428,6 +433,7 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); auto & writer = recorder->get_writer_handle(); mock_writer = dynamic_cast(writer.get_implementation_handle()); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 2a133e9b51..2b2b85a40f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -57,6 +57,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); @@ -104,6 +105,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(client_manager_1->wait_for_service_to_be_ready()); ASSERT_TRUE(client_manager_2->wait_for_service_to_be_ready()); @@ -153,6 +155,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp index 9092379a5d..d0a07d33ef 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -41,6 +41,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignore auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); @@ -59,6 +60,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_include auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); @@ -79,6 +81,7 @@ TEST_F( auto recorder = std::make_shared(writer_, storage_options_, record_options); recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp index 72013a1e8a..ce9181514d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -44,6 +44,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); rosbag2_test_common::PublicationManager pub_manager; pub_manager.setup_publisher(topic, string_message, 5); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index fe6896adec..608fd96739 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -99,6 +99,7 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index b8ec31cfd7..6cf26a4bed 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -78,6 +78,7 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); @@ -151,6 +152,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -226,6 +228,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_topic_topic_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str())); ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str())); @@ -295,6 +298,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_regex_service_recording) recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); @@ -376,6 +380,7 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_service_recording recorder->record(); start_async_spin(recorder); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); ASSERT_TRUE(service_manager_v1->wait_for_service_to_be_ready()); ASSERT_TRUE(service_manager_v2->wait_for_service_to_be_ready()); From e63f8c7b27f139e011f4452f9a9fbd795be0e0a2 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 10 Sep 2024 13:47:50 +0000 Subject: [PATCH 13/15] Nested contexts just to explicitly cleanup the async spinners. Signed-off-by: Chris Lalancette --- .../test_rosbag2_cpp_get_service_info.cpp | 120 +++++++++--------- .../test/rosbag2_transport/test_record.cpp | 49 ++++--- .../test_record_all_use_sim_time.cpp | 43 ++++--- 3 files changed, 116 insertions(+), 96 deletions(-) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index fb94d0aaa1..6f780b9625 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -201,33 +201,36 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - - ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); - ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - constexpr size_t num_service_requests = 3; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + ASSERT_TRUE(service_client_manager->wait_for_service_to_be_ready()); + ASSERT_TRUE(wait_for_subscriptions(*recorder, {"/test_service/_service_event"})); - recorder->stop(); + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); + } rosbag2_cpp::Info info; std::vector> ret_service_infos; @@ -276,42 +279,45 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se std::move(writer), storage_options, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - - ASSERT_TRUE( - wait_for_subscriptions( - *recorder, - {"/test_service1/_service_event", - "/test_service2/_service_event", - "/test_topic1", - "/test_topic2"} - ) - ); - constexpr size_t num_service_requests = 2; - for (size_t i = 0; i < num_service_requests; i++) { - ASSERT_TRUE(service_client_manager1->send_request()); - ASSERT_TRUE(service_client_manager2->send_request()); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE( + wait_for_subscriptions( + *recorder, + {"/test_service1/_service_event", + "/test_service2/_service_event", + "/test_topic1", + "/test_topic2"} + ) + ); + + for (size_t i = 0; i < num_service_requests; i++) { + ASSERT_TRUE(service_client_manager1->send_request()); + ASSERT_TRUE(service_client_manager2->send_request()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + pub_manager.run_publishers(); + + auto & writer_ref = recorder->get_writer_handle(); + auto & recorder_writer = + dynamic_cast(writer_ref.get_implementation_handle()); + + // By default, only client introspection is enabled. + // For one request, service event topic get 2 messages. + size_t expected_messages = num_service_requests * 2 + 2; + auto ret = rosbag2_test_common::wait_until_condition( + [&recorder_writer, &expected_messages]() { + return recorder_writer.get_number_of_written_messages() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; + + recorder->stop(); } - pub_manager.run_publishers(); - - auto & writer_ref = recorder->get_writer_handle(); - auto & recorder_writer = - dynamic_cast(writer_ref.get_implementation_handle()); - - // By default, only client introspection is enabled. - // For one request, service event topic get 2 messages. - size_t expected_messages = num_service_requests * 2 + 2; - auto ret = rosbag2_test_common::wait_until_condition( - [&recorder_writer, &expected_messages]() { - return recorder_writer.get_number_of_written_messages() >= expected_messages; - }, - std::chrono::seconds(5)); - EXPECT_TRUE(ret) << "Failed to capture " << expected_messages << " expected messages in time"; - - recorder->stop(); rosbag2_cpp::Info info; std::vector> ret_service_infos; diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 89543c2c16..2e7fc81069 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -52,30 +52,39 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are std::move(writer_), storage_options_, record_options); recorder->record(); - start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - - ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - - pub_manager.run_publishers(); + constexpr size_t expected_messages = 4; + std::vector> recorded_messages; + std::unordered_map< + std::string, + std::pair + > recorded_topics; - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + start_async_spin(recorder); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + + ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + + pub_manager.run_publishers(); + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + EXPECT_TRUE(ret) << "failed to capture expected messages in time" << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + recorded_topics = mock_writer.get_topics(); + } - constexpr size_t expected_messages = 4; - auto ret = rosbag2_test_common::wait_until_condition( - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }, - std::chrono::seconds(5)); - auto recorded_messages = mock_writer.get_messages(); - EXPECT_TRUE(ret) << "failed to capture expected messages in time" << - "recorded messages = " << recorded_messages.size(); EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); - auto recorded_topics = mock_writer.get_topics(); ASSERT_THAT(recorded_topics, SizeIs(2)); EXPECT_THAT(recorded_topics.at(string_topic).first.serialization_format, Eq("rmw_format")); EXPECT_THAT(recorded_topics.at(array_topic).first.serialization_format, Eq("rmw_format")); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 608fd96739..2e612310b1 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -98,32 +98,37 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) std::move(writer_), storage_options_, record_options); recorder->record(); + constexpr size_t expected_messages = 10; + std::vector> recorded_messages; + std::unordered_map messages_per_topic; + start_async_spin(recorder); - auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); + { + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {stop_spinning();}); - ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); - ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); + ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); - pub_manager.run_publishers(); + pub_manager.run_publishers(); - auto & writer = recorder->get_writer_handle(); - MockSequentialWriter & mock_writer = - static_cast(writer.get_implementation_handle()); + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + auto ret = rosbag2_test_common::wait_until_condition( + [ =, &mock_writer]() { + return mock_writer.get_messages().size() >= expected_messages; + }, + std::chrono::seconds(5)); + ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << + "recorded messages = " << recorded_messages.size(); + recorded_messages = mock_writer.get_messages(); + messages_per_topic = mock_writer.messages_per_topic(); + } - constexpr size_t expected_messages = 10; - auto ret = rosbag2_test_common::wait_until_condition( - [ =, &mock_writer]() { - return mock_writer.get_messages().size() >= expected_messages; - }, - std::chrono::seconds(5)); - auto recorded_messages = mock_writer.get_messages(); - ASSERT_TRUE(ret) << "failed to capture expected messages in time. " << - "recorded messages = " << recorded_messages.size(); - - auto messages_per_topic = mock_writer.messages_per_topic(); ASSERT_EQ(messages_per_topic.count(string_topic), 1u); EXPECT_EQ(messages_per_topic[string_topic], 5u); From 63391ab23050206e4de18bc0b6e4ddea9212979e Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 12 Sep 2024 12:36:42 -0400 Subject: [PATCH 14/15] Update rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp Co-authored-by: Michael Orlov Signed-off-by: Chris Lalancette --- .../rosbag2_transport/record_integration_fixture.hpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index 14c37d3eef..c85f69cb9c 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -61,6 +61,16 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture [this]() { exec_->spin(); }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } } else { throw std::runtime_error("Already spinning a node, can't start a new node spin"); } From 275c10dfe78ebefa6c5bada0c9e451552588a555 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 12 Sep 2024 17:00:00 +0000 Subject: [PATCH 15/15] Apply the same fix to rosbag2_tests. Signed-off-by: Chris Lalancette --- .../test_rosbag2_cpp_get_service_info.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp index 6f780b9625..401deb311e 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_get_service_info.cpp @@ -91,6 +91,16 @@ class Rosbag2CPPGetServiceInfoTest [this]() { exec_->spin(); }); + // Wait for the executor to start spinning in the newly spawned thread to avoid race condition + // with exec_->cancel() + using clock = std::chrono::steady_clock; + auto start = clock::now(); + while (!exec_->is_spinning() && (clock::now() - start) < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!exec_->is_spinning()) { + throw std::runtime_error("Failed to start spinning node"); + } } else { throw std::runtime_error("Already spinning a node, can't start a new node spin"); }