Skip to content

Commit

Permalink
Bugfix for bag_split event callbacks called to early with file compre…
Browse files Browse the repository at this point in the history
…ssion (#1643)

* Bugfix for bag_split event callbacks not called with file compression

Signed-off-by: Michael Orlov <[email protected]>

* Delete redundant "should_split_bagfile" in compression_writer

- It is a non-virtual method and doesn't call from the base class.

Signed-off-by: Michael Orlov <[email protected]>

* Adjust "split_event_calls_callback" for testing multiple splits

Signed-off-by: Michael Orlov <[email protected]>

* Use temp folder for "SequentialWriterTest" fixture instead of "uri"

Signed-off-by: Michael Orlov <[email protected]>

* Add tests for split event callbacks when using file and msg compression

 - Added "split_event_calls_callback_with_msg_compression" and
"split_event_calls_callback_with_file_compression" uit tests

Signed-off-by: Michael Orlov <[email protected]>

* Add debug info to the flaky "can_record_again_after_stop" test

Signed-off-by: Michael Orlov <[email protected]>

* Use `uint64_t` type for `fake_storage_size_` in tests

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit 1877b53)

# Conflicts:
#	rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
#	rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp
#	rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
#	rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
#	rosbag2_transport/test/rosbag2_transport/test_record.cpp
  • Loading branch information
MichaelOrlov committed Jul 16, 2024
1 parent 70434fc commit e01b597
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

// Prepares the metadata by setting initial values.
void init_metadata() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ void SequentialCompressionWriter::compression_thread_fn()
rcpputils::check_true(compressor != nullptr, "Could not create compressor.");

while (true) {
<<<<<<< HEAD
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
=======
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message;
std::string closed_file_relative_to_bag;
>>>>>>> 1877b538 (Bugfix for bag_split event callbacks called to early with file compression (#1643))
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
compressor_condition_.wait(
Expand All @@ -92,7 +97,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -109,8 +114,35 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
static const std::string compressor_ext = "." + compressor->get_compression_identifier();
auto closed_file =
(fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).generic_string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag + compressor_ext);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).generic_string();
}
}
}
if (!new_file.empty()) {
// The new_file is empty when we compressed the last file after calling close().
// Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the
// last compressed file because it will be called inside base class
// SequentialWriter::close().
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
}
Expand Down Expand Up @@ -292,14 +324,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand All @@ -324,6 +359,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
// since the compression operation will manipulate memory inplace, thus
Expand Down Expand Up @@ -389,17 +425,4 @@ void SequentialCompressionWriter::write(
}
}

bool SequentialCompressionWriter::should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time)
{
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
return SequentialWriter::should_split_bagfile(current_time);
}
}

} // namespace rosbag2_compression
Loading

0 comments on commit e01b597

Please sign in to comment.