From 1877b53847bda4d1f2668187b79fa27a796c3438 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 26 Jun 2024 15:18:03 -0700 Subject: [PATCH] Bugfix for bag_split event callbacks called to early with file compression (#1643) * Bugfix for bag_split event callbacks not called with file compression Signed-off-by: Michael Orlov * Delete redundant "should_split_bagfile" in compression_writer - It is a non-virtual method and doesn't call from the base class. Signed-off-by: Michael Orlov * Adjust "split_event_calls_callback" for testing multiple splits Signed-off-by: Michael Orlov * Use temp folder for "SequentialWriterTest" fixture instead of "uri" Signed-off-by: Michael Orlov * Add tests for split event callbacks when using file and msg compression - Added "split_event_calls_callback_with_msg_compression" and "split_event_calls_callback_with_file_compression" uit tests Signed-off-by: Michael Orlov * Add debug info to the flaky "can_record_again_after_stop" test Signed-off-by: Michael Orlov * Use `uint64_t` type for `fake_storage_size_` in tests Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov --- .../sequential_compression_writer.hpp | 4 - .../sequential_compression_writer.cpp | 58 ++++--- .../test_sequential_compression_writer.cpp | 161 ++++++++++++++++-- .../rosbag2_cpp/writers/sequential_writer.hpp | 5 + .../rosbag2_cpp/writers/sequential_writer.cpp | 29 +++- .../rosbag2_cpp/test_sequential_writer.cpp | 64 ++++--- .../test/rosbag2_transport/test_record.cpp | 19 ++- 7 files changed, 269 insertions(+), 71 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index d1265848e..7a42426e0 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -206,10 +206,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter // Closes the current backed storage and opens the next bagfile. void split_bagfile() override; - // Checks if the current recording bagfile needs to be split and rolled over to a new file. - bool should_split_bagfile( - const std::chrono::time_point & current_time); - // Prepares the metadata by setting initial values. void init_metadata() override; }; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index a5da3e789..8f5a89c61 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -116,7 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn() while (true) { std::shared_ptr message; - std::string file; + std::string closed_file_relative_to_bag; { std::unique_lock lock(compressor_queue_mutex_); // *INDENT-OFF* @@ -134,7 +134,7 @@ void SequentialCompressionWriter::compression_thread_fn() compressor_message_queue_.pop(); compressor_condition_.notify_all(); } else if (!compressor_file_queue_.empty()) { - file = compressor_file_queue_.front(); + closed_file_relative_to_bag = compressor_file_queue_.front(); compressor_file_queue_.pop(); } else if (!compression_is_running_) { // I woke up, all work queues are empty, and the main thread has stopped execution. Exit. @@ -151,8 +151,35 @@ void SequentialCompressionWriter::compression_thread_fn() std::lock_guard storage_lock(storage_mutex_); SequentialWriter::write(compressed_message); } - } else if (!file.empty()) { - compress_file(*compressor, file); + } else if (!closed_file_relative_to_bag.empty()) { + compress_file(*compressor, closed_file_relative_to_bag); + + // Execute callbacks from the base class + static const std::string compressor_ext = "." + compressor->get_compression_identifier(); + auto closed_file = + (fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).generic_string(); + std::string new_file; + // To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(), + // because other compressor threads may have already pushed a new item above. + { + std::lock_guard lock(storage_mutex_); + auto iter = std::find( + metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(), + closed_file_relative_to_bag + compressor_ext); + if (iter != metadata_.relative_file_paths.end()) { + ++iter; + if (iter != metadata_.relative_file_paths.end()) { + new_file = (fs::path(base_folder_) / *iter).generic_string(); + } + } + } + if (!new_file.empty()) { + // The new_file is empty when we compressed the last file after calling close(). + // Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the + // last compressed file because it will be called inside base class + // SequentialWriter::close(). + SequentialWriter::execute_bag_split_callbacks(closed_file, new_file); + } } } } @@ -349,14 +376,17 @@ void SequentialCompressionWriter::split_bagfile() std::lock_guard compressor_lock(compressor_queue_mutex_); // Grab last file before calling common splitting logic, which pushes the new filename - const auto last_file = metadata_.relative_file_paths.back(); - SequentialWriter::split_bagfile(); + const auto last_file_relative_to_bag = metadata_.relative_file_paths.back(); + const auto new_file = SequentialWriter::split_bagfile_local(false); // If we're in FILE compression mode, push this file's name on to the queue so another // thread will handle compressing it. If not, we can just carry on. if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { - compressor_file_queue_.push(last_file); + compressor_file_queue_.push(last_file_relative_to_bag); compressor_condition_.notify_one(); + } else { + auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string(); + SequentialWriter::execute_bag_split_callbacks(last_file, new_file); } if (!storage_) { @@ -387,6 +417,7 @@ void SequentialCompressionWriter::write( // If the compression mode is MESSAGE, push the message into a queue that will be handled // by the compression threads. if (compression_options_.compression_mode == CompressionMode::FILE) { + std::lock_guard lock(storage_mutex_); SequentialWriter::write(message); } else { std::unique_lock lock(compressor_queue_mutex_); @@ -416,17 +447,4 @@ void SequentialCompressionWriter::write( } } -bool SequentialCompressionWriter::should_split_bagfile( - const std::chrono::time_point & current_time) -{ - if (storage_options_.max_bagfile_size == - rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) - { - return false; - } else { - std::lock_guard lock(storage_mutex_); - return SequentialWriter::should_split_bagfile(current_time); - } -} - } // namespace rosbag2_compression diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index ba383e608..25f913756 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -63,7 +63,7 @@ class SequentialCompressionWriterTest : public TestWithParam tmp_dir_storage_options_{}, serialization_format_{"rmw_format"} { - tmp_dir_storage_options_.uri = tmp_dir_.string(); + tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string(); fs::remove_all(tmp_dir_); ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_)); EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0)); @@ -92,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam DoAll( Invoke( [this](const rosbag2_storage::StorageOptions & storage_options) { - fake_storage_size_ = 0; + fake_storage_size_.store(0); fake_storage_uri_ = storage_options.uri; // Touch the file std::ofstream output(storage_options.uri); @@ -106,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam *storage_, write(An>())).WillByDefault( [this](std::shared_ptr) { - fake_storage_size_ += 1; + fake_storage_size_.fetch_add(1); }); ON_CALL(*storage_, get_bagfile_size).WillByDefault( [this]() { - return fake_storage_size_; + return fake_storage_size_.load(); }); ON_CALL(*storage_, get_relative_file_path).WillByDefault( [this]() { @@ -135,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam writer_ = std::make_unique(std::move(sequential_writer)); } - const std::string bag_name_ = "SequentialCompressionWriterTest"; std::unique_ptr> storage_factory_; std::shared_ptr> storage_; std::shared_ptr> converter_factory_; @@ -148,8 +147,10 @@ class SequentialCompressionWriterTest : public TestWithParam std::unique_ptr writer_; std::string serialization_format_; - uint64_t fake_storage_size_{}; + std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it + // uses in callback from cache_consumer thread std::string fake_storage_uri_; + const std::string bag_base_dir_ = "test_bag"; const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; @@ -299,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u); - const auto base_path = tmp_dir_storage_options_.uri; int counter = 0; for (const auto & path : intercepted_write_metadata_.relative_file_paths) { std::stringstream ss; - ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor; + ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor; counter++; EXPECT_EQ(ss.str(), path); } @@ -426,7 +426,7 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz } writer_.reset(); // reset will call writer destructor - EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); } TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) @@ -480,7 +480,148 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) writer_.reset(); // reset will call writer destructor EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority); - EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) + + "." + DefaultTestCompressor); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } } INSTANTIATE_TEST_SUITE_P( diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index d9c541d10..fd9aae481 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -153,6 +153,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter std::shared_ptr message_cache_; std::unique_ptr cache_consumer_; + std::string split_bagfile_local(bool execute_callbacks = true); + + void execute_bag_split_callbacks( + const std::string & closed_file, const std::string & opened_file); + void switch_to_next_storage(); std::string format_storage_uri( diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index a058a2073..bd56eb8ad 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -194,12 +194,11 @@ void SequentialWriter::close() // bag file was closed before callback call. } if (!metadata_.relative_file_paths.empty()) { - auto info = std::make_shared(); // Take the latest file name from metadata in case if it was updated after compression in // derived class - info->closed_file = + auto closed_file = (fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string(); - callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); + execute_bag_split_callbacks(closed_file, ""); } topics_names_to_info_.clear(); @@ -346,12 +345,11 @@ void SequentialWriter::switch_to_next_storage() } } -void SequentialWriter::split_bagfile() +std::string SequentialWriter::split_bagfile_local(bool execute_callbacks) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); + auto closed_file = storage_->get_relative_file_path(); switch_to_next_storage(); - info->opened_file = storage_->get_relative_file_path(); + auto opened_file = storage_->get_relative_file_path(); metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path())); @@ -361,9 +359,26 @@ void SequentialWriter::split_bagfile() file_info.path = strip_parent_path(storage_->get_relative_file_path()); metadata_.files.push_back(file_info); + if (execute_callbacks) { + execute_bag_split_callbacks(closed_file, opened_file); + } + return opened_file; +} + +void SequentialWriter::execute_bag_split_callbacks( + const std::string & closed_file, const std::string & opened_file) +{ + auto info = std::make_shared(); + info->closed_file = closed_file; + info->opened_file = opened_file; callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } +void SequentialWriter::split_bagfile() +{ + (void)split_bagfile_local(); +} + void SequentialWriter::write(std::shared_ptr message) { if (!is_open_) { diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 8433d4738..cb0229f52 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -51,11 +51,11 @@ class SequentialWriterTest : public Test storage_ = std::make_shared>(); converter_factory_ = std::make_shared>(); metadata_io_ = std::make_unique>(); + tmp_dir_ = fs::temp_directory_path() / "SequentialWriterTest"; storage_options_ = rosbag2_storage::StorageOptions{}; - storage_options_.uri = "uri"; + storage_options_.uri = (tmp_dir_ / bag_base_dir_).string(); - fs::path dir(storage_options_.uri); - fs::remove_all(dir); + fs::remove_all(tmp_dir_); ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault( DoAll( @@ -77,8 +77,7 @@ class SequentialWriterTest : public Test ~SequentialWriterTest() override { - fs::path dir(storage_options_.uri); - fs::remove_all(dir); + fs::remove_all(tmp_dir_); } std::unique_ptr> storage_factory_; @@ -86,6 +85,7 @@ class SequentialWriterTest : public Test std::shared_ptr> converter_factory_; std::unique_ptr metadata_io_; + fs::path tmp_dir_; rosbag2_storage::StorageOptions storage_options_; std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it // uses in callback from cache_consumer thread @@ -93,6 +93,7 @@ class SequentialWriterTest : public Test std::vector v_intercepted_update_metadata_; std::unique_ptr writer_; std::string fake_storage_uri_; + const std::string bag_base_dir_ = "test_bag"; }; std::shared_ptr make_test_msg() @@ -347,11 +348,10 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf static_cast(expected_splits)) << "Storage should have split bagfile " << (expected_splits - 1); - const auto base_path = storage_options_.uri; int counter = 0; for (const auto & path : fake_metadata_.relative_file_paths) { std::stringstream ss; - ss << base_path << "_" << counter; + ss << bag_base_dir_ << "_" << counter; const auto expected_path = ss.str(); counter++; @@ -448,11 +448,10 @@ TEST_F( static_cast(expected_splits)) << "Storage should have split bagfile " << (expected_splits - 1); - const auto base_path = storage_options_.uri; int counter = 0; for (const auto & path : fake_metadata_.relative_file_paths) { std::stringstream ss; - ss << base_path << "_" << counter; + ss << bag_base_dir_ << "_" << counter; const auto expected_path = ss.str(); counter++; @@ -583,8 +582,9 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) TEST_F(SequentialWriterTest, split_event_calls_callback) { - const int message_count = 7; - const int max_bagfile_size = 5; + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 ON_CALL( *storage_, @@ -617,14 +617,13 @@ TEST_F(SequentialWriterTest, split_event_calls_callback) storage_options_.max_bagfile_size = max_bagfile_size; - bool callback_called = false; - std::string closed_file, opened_file; + std::vector closed_files; + std::vector opened_files; rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = - [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { - closed_file = info.closed_file; - opened_file = info.opened_file; - callback_called = true; + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); }; writer_->add_event_callbacks(callbacks); @@ -634,12 +633,30 @@ TEST_F(SequentialWriterTest, split_event_calls_callback) for (auto i = 0; i < message_count; ++i) { writer_->write(message); } + writer_->close(); - ASSERT_TRUE(callback_called); - auto expected_closed = fs::path(storage_options_.uri) / - (storage_options_.uri + "_0"); - EXPECT_EQ(closed_file, expected_closed.generic_string()); - EXPECT_EQ(opened_file, fake_storage_uri_); + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()); + EXPECT_EQ(opened_files[i], expected_opened.generic_string()); + } } TEST_F(SequentialWriterTest, split_event_calls_on_writer_close) @@ -697,8 +714,7 @@ TEST_F(SequentialWriterTest, split_event_calls_on_writer_close) writer_->close(); ASSERT_TRUE(callback_called); - auto expected_closed = fs::path(storage_options_.uri) / - (storage_options_.uri + "_0"); + auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); EXPECT_EQ(closed_file, expected_closed.generic_string()); EXPECT_TRUE(opened_file.empty()); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 3f03d069b..1cb9a0883 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -155,13 +155,20 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); auto recorded_topics = mock_writer.get_topics(); - ASSERT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size(); + EXPECT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size(); + if (recorded_topics.size() != 1) { + for (const auto & topic : recorded_topics) { + std::cerr << "recorded topic name : " << topic.first << std::endl; + } + } EXPECT_THAT(recorded_topics.at(string_topic).first.serialization_format, Eq("rmw_format")); - ASSERT_THAT(recorded_messages, SizeIs(expected_messages)); - auto string_messages = filter_messages( - recorded_messages, string_topic); - ASSERT_THAT(string_messages, SizeIs(4)); - EXPECT_THAT(string_messages[0]->string_value, Eq(string_message->string_value)); + + auto string_messages = + filter_messages(recorded_messages, string_topic); + EXPECT_THAT(string_messages, SizeIs(4)); + for (const auto & str_msg : string_messages) { + EXPECT_THAT(str_msg->string_value, Eq(string_message->string_value)); + } } TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)