diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index b2738ff896..43c3817f80 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -193,10 +193,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter // Closes the current backed storage and opens the next bagfile. void split_bagfile() override; - // Checks if the current recording bagfile needs to be split and rolled over to a new file. - bool should_split_bagfile( - const std::chrono::time_point & current_time); - // Prepares the metadata by setting initial values. void init_metadata() override; }; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 3ecad0cb58..71cb4f2e3f 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -75,8 +75,13 @@ void SequentialCompressionWriter::compression_thread_fn() rcpputils::check_true(compressor != nullptr, "Could not create compressor."); while (true) { +<<<<<<< HEAD std::shared_ptr message; std::string file; +======= + std::shared_ptr message; + std::string closed_file_relative_to_bag; +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) { std::unique_lock lock(compressor_queue_mutex_); compressor_condition_.wait( @@ -91,7 +96,7 @@ void SequentialCompressionWriter::compression_thread_fn() message = compressor_message_queue_.front(); compressor_message_queue_.pop(); } else if (!compressor_file_queue_.empty()) { - file = compressor_file_queue_.front(); + closed_file_relative_to_bag = compressor_file_queue_.front(); compressor_file_queue_.pop(); } else if (!compression_is_running_) { // I woke up, all work queues are empty, and the main thread has stopped execution. Exit. @@ -108,8 +113,35 @@ void SequentialCompressionWriter::compression_thread_fn() std::lock_guard storage_lock(storage_mutex_); SequentialWriter::write(message); } - } else if (!file.empty()) { - compress_file(*compressor, file); + } else if (!closed_file_relative_to_bag.empty()) { + compress_file(*compressor, closed_file_relative_to_bag); + + // Execute callbacks from the base class + static const std::string compressor_ext = "." + compressor->get_compression_identifier(); + auto closed_file = + (fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).generic_string(); + std::string new_file; + // To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(), + // because other compressor threads may have already pushed a new item above. + { + std::lock_guard lock(storage_mutex_); + auto iter = std::find( + metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(), + closed_file_relative_to_bag + compressor_ext); + if (iter != metadata_.relative_file_paths.end()) { + ++iter; + if (iter != metadata_.relative_file_paths.end()) { + new_file = (fs::path(base_folder_) / *iter).generic_string(); + } + } + } + if (!new_file.empty()) { + // The new_file is empty when we compressed the last file after calling close(). + // Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the + // last compressed file because it will be called inside base class + // SequentialWriter::close(). + SequentialWriter::execute_bag_split_callbacks(closed_file, new_file); + } } } } @@ -291,14 +323,17 @@ void SequentialCompressionWriter::split_bagfile() std::lock_guard compressor_lock(compressor_queue_mutex_); // Grab last file before calling common splitting logic, which pushes the new filename - const auto last_file = metadata_.relative_file_paths.back(); - SequentialWriter::split_bagfile(); + const auto last_file_relative_to_bag = metadata_.relative_file_paths.back(); + const auto new_file = SequentialWriter::split_bagfile_local(false); // If we're in FILE compression mode, push this file's name on to the queue so another // thread will handle compressing it. If not, we can just carry on. if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { - compressor_file_queue_.push(last_file); + compressor_file_queue_.push(last_file_relative_to_bag); compressor_condition_.notify_one(); + } else { + auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string(); + SequentialWriter::execute_bag_split_callbacks(last_file, new_file); } if (!storage_) { @@ -323,6 +358,7 @@ void SequentialCompressionWriter::write( // If the compression mode is MESSAGE, push the message into a queue that will be handled // by the compression threads. if (compression_options_.compression_mode == CompressionMode::FILE) { + std::lock_guard lock(storage_mutex_); SequentialWriter::write(message); } else { // since the compression operation will manipulate memory inplace, thus @@ -372,17 +408,4 @@ void SequentialCompressionWriter::write( } } -bool SequentialCompressionWriter::should_split_bagfile( - const std::chrono::time_point & current_time) -{ - if (storage_options_.max_bagfile_size == - rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) - { - return false; - } else { - std::lock_guard lock(storage_mutex_); - return SequentialWriter::should_split_bagfile(current_time); - } -} - } // namespace rosbag2_compression diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 55bd007439..c9d94ccc61 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -53,8 +53,13 @@ class SequentialCompressionWriterTest : public Test tmp_dir_storage_options_{}, serialization_format_{"rmw_format"} { +<<<<<<< HEAD tmp_dir_storage_options_.uri = tmp_dir_.string(); rcpputils::fs::remove_all(tmp_dir_); +======= + tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string(); + fs::remove_all(tmp_dir_); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_)); EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0)); // intercept the metadata write so we can analyze it. @@ -77,7 +82,7 @@ class SequentialCompressionWriterTest : public Test DoAll( Invoke( [this](const rosbag2_storage::StorageOptions & storage_options) { - fake_storage_size_ = 0; + fake_storage_size_.store(0); fake_storage_uri_ = storage_options.uri; // Touch the file std::ofstream output(storage_options.uri); @@ -91,11 +96,11 @@ class SequentialCompressionWriterTest : public Test *storage_, write(An>())).WillByDefault( [this](std::shared_ptr) { - fake_storage_size_ += 1; + fake_storage_size_.fetch_add(1); }); ON_CALL(*storage_, get_bagfile_size).WillByDefault( [this]() { - return fake_storage_size_; + return fake_storage_size_.load(); }); ON_CALL(*storage_, get_relative_file_path).WillByDefault( [this]() { @@ -120,7 +125,6 @@ class SequentialCompressionWriterTest : public Test writer_ = std::make_unique(std::move(sequential_writer)); } - const std::string bag_name_ = "SequentialCompressionWriterTest"; std::unique_ptr> storage_factory_; std::shared_ptr> storage_; std::shared_ptr> converter_factory_; @@ -130,8 +134,10 @@ class SequentialCompressionWriterTest : public Test rosbag2_storage::BagMetadata intercepted_metadata_; std::unique_ptr writer_; std::string serialization_format_; - uint64_t fake_storage_size_{}; + std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it + // uses in callback from cache_consumer thread std::string fake_storage_uri_; + const std::string bag_base_dir_ = "test_bag"; const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; @@ -284,12 +290,336 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative EXPECT_EQ( intercepted_metadata_.relative_file_paths.size(), 3u); - const auto base_path = tmp_dir_storage_options_.uri; int counter = 0; for (const auto & path : intercepted_metadata_.relative_file_paths) { std::stringstream ss; - ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor; + ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor; counter++; EXPECT_EQ(ss.str(), path); } } +<<<<<<< HEAD +======= + +TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_destruction) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + 0, + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + EXPECT_CALL(*storage_, update_metadata(_)).Times(2); + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, test_topic_name, test_topic_type, "", {}, ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + EXPECT_EQ(v_intercepted_update_metadata_.size(), 2u); + using rosbag2_compression::compression_mode_from_string; + auto compression_mode = + compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode); + EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE); + EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u); + EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, kNumMessagesToWrite); +} + +TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + 0, + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + EXPECT_CALL(*storage_, update_metadata(_)).Times(4); + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, test_topic_name, test_topic_type, "", {}, ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + + writer_->split_bagfile(); + + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + ASSERT_EQ(v_intercepted_update_metadata_.size(), 4u); + using rosbag2_compression::compression_mode_from_string; + auto compression_mode = + compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode); + EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE); + EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u); // On opening first bag file + EXPECT_EQ(v_intercepted_update_metadata_[1].files.size(), 1u); // On closing first bag file + EXPECT_EQ(v_intercepted_update_metadata_[2].files.size(), 1u); // On opening second bag file + EXPECT_EQ(v_intercepted_update_metadata_[3].files.size(), 2u); // On writer destruction + EXPECT_EQ(v_intercepted_update_metadata_[3].message_count, 2 * kNumMessagesToWrite); +} + +TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_sizes) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + const uint64_t kCompressionQueueSize = GetParam(); + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + kCompressionQueueSize, + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, test_topic_name, test_topic_type, "", {}, ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); +} + +TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + const uint64_t kCompressionQueueSize = GetParam(); +#ifndef _WIN32 + const int32_t wanted_thread_priority = 10; + errno = 0; + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + ASSERT_TRUE(!(cur_nice_value == -1 && errno != 0)); +#else + const int32_t wanted_thread_priority = THREAD_PRIORITY_LOWEST; + auto current_thread_priority = GetThreadPriority(GetCurrentThread()); + ASSERT_NE(current_thread_priority, THREAD_PRIORITY_ERROR_RETURN); + ASSERT_NE(current_thread_priority, wanted_thread_priority); +#endif + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + kCompressionQueueSize, + kDefaultCompressionQueueThreads, + wanted_thread_priority + }; + +#ifndef _WIN32 + // nice values are in the range from -20 to +19, so this value will never be read + int32_t detected_thread_priority = 100; +#else + int32_t detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN; +#endif + + initializeFakeFileStorage(); + initializeWriter( + compression_options, + std::make_unique(detected_thread_priority)); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, test_topic_name, test_topic_type, "", {}, ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority); + EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite); +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } +} + +TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression) +{ + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 0, + 1, + kDefaultCompressionQueueThreadsPriority + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size; + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + writer_->close(); + + EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1); + + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) { + std::cout << "metadata file path [" << i << "] = '" << + intercepted_write_metadata_.relative_file_paths[i] << "'\n"; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) + + "." + DefaultTestCompressor); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i; + EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i; + } +} + +INSTANTIATE_TEST_SUITE_P( + SequentialCompressionWriterTestQueueSizes, + SequentialCompressionWriterTest, + ::testing::Values( + 0ul, 5ul +)); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 21e3dee36c..2495f56fac 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -133,6 +133,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter std::shared_ptr message_cache_; std::unique_ptr cache_consumer_; + std::string split_bagfile_local(bool execute_callbacks = true); + + void execute_bag_split_callbacks( + const std::string & closed_file, const std::string & opened_file); + void switch_to_next_storage(); std::string format_storage_uri( diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 8b8a0008ed..50008b80af 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -177,12 +177,17 @@ void SequentialWriter::close() // bag file was closed before callback call. } if (!metadata_.relative_file_paths.empty()) { - auto info = std::make_shared(); // Take the latest file name from metadata in case if it was updated after compression in // derived class +<<<<<<< HEAD info->closed_file = (rcpputils::fs::path(base_folder_) / metadata_.relative_file_paths.back()).string(); callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); +======= + auto closed_file = + (fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string(); + execute_bag_split_callbacks(closed_file, ""); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) } topics_names_to_info_.clear(); @@ -293,12 +298,11 @@ void SequentialWriter::switch_to_next_storage() } } -void SequentialWriter::split_bagfile() +std::string SequentialWriter::split_bagfile_local(bool execute_callbacks) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); + auto closed_file = storage_->get_relative_file_path(); switch_to_next_storage(); - info->opened_file = storage_->get_relative_file_path(); + auto opened_file = storage_->get_relative_file_path(); metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path())); @@ -308,10 +312,31 @@ void SequentialWriter::split_bagfile() file_info.path = strip_parent_path(storage_->get_relative_file_path()); metadata_.files.push_back(file_info); + if (execute_callbacks) { + execute_bag_split_callbacks(closed_file, opened_file); + } + return opened_file; +} + +void SequentialWriter::execute_bag_split_callbacks( + const std::string & closed_file, const std::string & opened_file) +{ + auto info = std::make_shared(); + info->closed_file = closed_file; + info->opened_file = opened_file; callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } +<<<<<<< HEAD void SequentialWriter::write(std::shared_ptr message) +======= +void SequentialWriter::split_bagfile() +{ + (void)split_bagfile_local(); +} + +void SequentialWriter::write(std::shared_ptr message) +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) { if (!storage_) { throw std::runtime_error("Bag is not open. Call open() before writing."); diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 7cd72355bf..88bf853177 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -52,11 +52,16 @@ class SequentialWriterTest : public Test storage_ = std::make_shared>(); converter_factory_ = std::make_shared>(); metadata_io_ = std::make_unique>(); + tmp_dir_ = fs::temp_directory_path() / "SequentialWriterTest"; storage_options_ = rosbag2_storage::StorageOptions{}; - storage_options_.uri = "uri"; + storage_options_.uri = (tmp_dir_ / bag_base_dir_).string(); +<<<<<<< HEAD rcpputils::fs::path dir(storage_options_.uri); rcpputils::fs::remove_all(dir); +======= + fs::remove_all(tmp_dir_); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault( DoAll( @@ -72,8 +77,12 @@ class SequentialWriterTest : public Test ~SequentialWriterTest() { +<<<<<<< HEAD rcpputils::fs::path dir(storage_options_.uri); rcpputils::fs::remove_all(dir); +======= + fs::remove_all(tmp_dir_); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) } std::unique_ptr> storage_factory_; @@ -81,6 +90,7 @@ class SequentialWriterTest : public Test std::shared_ptr> converter_factory_; std::unique_ptr metadata_io_; + fs::path tmp_dir_; rosbag2_storage::StorageOptions storage_options_; std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it // uses in callback from cache_consumer thread @@ -88,6 +98,7 @@ class SequentialWriterTest : public Test // Ensure writer_ is destructed before intercepted fake_metadata_ std::unique_ptr writer_; std::string fake_storage_uri_; + const std::string bag_base_dir_ = "test_bag"; }; std::shared_ptr make_test_msg() @@ -275,11 +286,10 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf static_cast(expected_splits)) << "Storage should have split bagfile " << (expected_splits - 1); - const auto base_path = storage_options_.uri; int counter = 0; for (const auto & path : fake_metadata_.relative_file_paths) { std::stringstream ss; - ss << base_path << "_" << counter; + ss << bag_base_dir_ << "_" << counter; const auto expected_path = ss.str(); counter++; @@ -377,11 +387,10 @@ TEST_F( static_cast(expected_splits)) << "Storage should have split bagfile " << (expected_splits - 1); - const auto base_path = storage_options_.uri; int counter = 0; for (const auto & path : fake_metadata_.relative_file_paths) { std::stringstream ss; - ss << base_path << "_" << counter; + ss << bag_base_dir_ << "_" << counter; const auto expected_path = ss.str(); counter++; @@ -512,8 +521,9 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) TEST_F(SequentialWriterTest, split_event_calls_callback) { - const int message_count = 7; - const int max_bagfile_size = 5; + const uint64_t max_bagfile_size = 3; + const size_t num_splits = 2; + const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8 ON_CALL( *storage_, @@ -546,14 +556,13 @@ TEST_F(SequentialWriterTest, split_event_calls_callback) storage_options_.max_bagfile_size = max_bagfile_size; - bool callback_called = false; - std::string closed_file, opened_file; + std::vector closed_files; + std::vector opened_files; rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = - [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { - closed_file = info.closed_file; - opened_file = info.opened_file; - callback_called = true; + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); }; writer_->add_event_callbacks(callbacks); @@ -563,11 +572,37 @@ TEST_F(SequentialWriterTest, split_event_calls_callback) for (auto i = 0; i < message_count; ++i) { writer_->write(message); } + writer_->close(); +<<<<<<< HEAD ASSERT_TRUE(callback_called); auto expected_closed = rcpputils::fs::path(storage_options_.uri) / (storage_options_.uri + "_0"); EXPECT_EQ(closed_file, expected_closed.string()); EXPECT_EQ(opened_file, fake_storage_uri_); +======= + EXPECT_THAT(closed_files.size(), num_splits + 1); + EXPECT_THAT(opened_files.size(), num_splits + 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_GE(opened_files.size(), num_splits + 1); + ASSERT_GE(closed_files.size(), num_splits + 1); + for (size_t i = 0; i < num_splits + 1; i++) { + auto expected_closed = + fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == num_splits) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()); + EXPECT_EQ(opened_files[i], expected_opened.generic_string()); + } +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) } TEST_F(SequentialWriterTest, split_event_calls_on_writer_close) @@ -625,8 +660,13 @@ TEST_F(SequentialWriterTest, split_event_calls_on_writer_close) writer_->close(); ASSERT_TRUE(callback_called); +<<<<<<< HEAD auto expected_closed = rcpputils::fs::path(storage_options_.uri) / (storage_options_.uri + "_0"); EXPECT_EQ(closed_file, expected_closed.string()); +======= + auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); + EXPECT_EQ(closed_file, expected_closed.generic_string()); +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) EXPECT_TRUE(opened_file.empty()); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 161b73483d..68a6a2879b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -131,6 +131,7 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); auto recorded_topics = mock_writer.get_topics(); +<<<<<<< HEAD ASSERT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size(); EXPECT_THAT(recorded_topics.at(string_topic).serialization_format, Eq("rmw_format")); ASSERT_THAT(recorded_messages, SizeIs(expected_messages)); @@ -138,6 +139,22 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) recorded_messages, string_topic); ASSERT_THAT(string_messages, SizeIs(4)); EXPECT_THAT(string_messages[0]->string_value, Eq(string_message->string_value)); +======= + EXPECT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size(); + if (recorded_topics.size() != 1) { + for (const auto & topic : recorded_topics) { + std::cerr << "recorded topic name : " << topic.first << std::endl; + } + } + EXPECT_THAT(recorded_topics.at(string_topic).first.serialization_format, Eq("rmw_format")); + + auto string_messages = + filter_messages(recorded_messages, string_topic); + EXPECT_THAT(string_messages, SizeIs(4)); + for (const auto & str_msg : string_messages) { + EXPECT_THAT(str_msg->string_value, Eq(string_message->string_value)); + } +>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643)) } TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)