Skip to content

Commit

Permalink
Bugfix for bag_split event callbacks called to early with file compre…
Browse files Browse the repository at this point in the history
…ssion (#1643)

* Bugfix for bag_split event callbacks not called with file compression

Signed-off-by: Michael Orlov <[email protected]>

* Delete redundant "should_split_bagfile" in compression_writer

- It is a non-virtual method and doesn't call from the base class.

Signed-off-by: Michael Orlov <[email protected]>

* Adjust "split_event_calls_callback" for testing multiple splits

Signed-off-by: Michael Orlov <[email protected]>

* Use temp folder for "SequentialWriterTest" fixture instead of "uri"

Signed-off-by: Michael Orlov <[email protected]>

* Add tests for split event callbacks when using file and msg compression

 - Added "split_event_calls_callback_with_msg_compression" and
"split_event_calls_callback_with_file_compression" uit tests

Signed-off-by: Michael Orlov <[email protected]>

* Add debug info to the flaky "can_record_again_after_stop" test

Signed-off-by: Michael Orlov <[email protected]>

* Use `uint64_t` type for `fake_storage_size_` in tests

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit 1877b53)
  • Loading branch information
MichaelOrlov authored and mergify[bot] committed Jun 26, 2024
1 parent eab5b07 commit 7c8ed48
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

// Prepares the metadata by setting initial values.
void init_metadata() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn()

while (true) {
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message;
std::string file;
std::string closed_file_relative_to_bag;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
// *INDENT-OFF*
Expand All @@ -134,7 +134,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -151,8 +151,35 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(compressed_message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
static const std::string compressor_ext = "." + compressor->get_compression_identifier();
auto closed_file =
(fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).generic_string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag + compressor_ext);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).generic_string();
}
}
}
if (!new_file.empty()) {
// The new_file is empty when we compressed the last file after calling close().
// Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the
// last compressed file because it will be called inside base class
// SequentialWriter::close().
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
}
Expand Down Expand Up @@ -349,14 +376,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand Down Expand Up @@ -387,6 +417,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
Expand Down Expand Up @@ -416,17 +447,4 @@ void SequentialCompressionWriter::write(
}
}

bool SequentialCompressionWriter::should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time)
{
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
return SequentialWriter::should_split_bagfile(current_time);
}
}

} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
tmp_dir_storage_options_{},
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string();
fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
Expand Down Expand Up @@ -92,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_size_.store(0);
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
Expand All @@ -106,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_.fetch_add(1);
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
Expand All @@ -135,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -148,8 +147,10 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
std::unique_ptr<rosbag2_cpp::Writer> writer_;

std::string serialization_format_;
uint64_t fake_storage_size_{};
std::atomic<uint64_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
std::string fake_storage_uri_;
const std::string bag_base_dir_ = "test_bag";

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand Down Expand Up @@ -299,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_write_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
Expand Down Expand Up @@ -426,7 +426,7 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
Expand Down Expand Up @@ -480,7 +480,148 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) +
"." + DefaultTestCompressor);
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

INSTANTIATE_TEST_SUITE_P(
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

std::string split_bagfile_local(bool execute_callbacks = true);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void switch_to_next_storage();

std::string format_storage_uri(
Expand Down
Loading

0 comments on commit 7c8ed48

Please sign in to comment.