From 10876a89b5d4097b68009fa98b30b1ff08c29488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ch=C3=A9pied?= Date: Tue, 19 Mar 2024 23:26:49 +0100 Subject: [PATCH] =?UTF-8?q?Ftp=20server=C2=A0:=20use=20a=20thread=20to=20s?= =?UTF-8?q?end=20ftp=20burst=20(#2250)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Ftp server : use a thread to send ftp burst * system tests : increase FtpDownloadBurstStopAndTryAgain file length FTP server is now faster, we need to increase file length because it was sometimes already entirely received when the callback was called. * Revert "system tests : increase FtpDownloadBurstStopAndTryAgain file length" This reverts commit c233cdd2eb103eaa476855e6338b43ee15033ff7. * Test commit debug * System tests: fix FtpDownloadBurstStopAndTryAgain Only use intercept callbacks to simulation connection problem to avoid buffering. * windows * remove debug --- src/mavsdk/core/mavlink_ftp_server.cpp | 34 +++++++++++-------- src/mavsdk/core/mavlink_ftp_server.h | 6 ++-- src/system_tests/ftp_download_file_burst.cpp | 35 ++++++++++++++------ 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/mavsdk/core/mavlink_ftp_server.cpp b/src/mavsdk/core/mavlink_ftp_server.cpp index 8fdd4a3f03..cd236a2017 100644 --- a/src/mavsdk/core/mavlink_ftp_server.cpp +++ b/src/mavsdk/core/mavlink_ftp_server.cpp @@ -770,22 +770,31 @@ void MavlinkFtpServer::_work_burst(const PayloadHeader& payload) _session_info.burst_offset = payload.offset; _session_info.burst_chunk_size = payload.size; - _burst_seq = payload.seq_number + 1; + if (_session_info.burst_thread.joinable()) { + _session_info.burst_stop = true; + _session_info.burst_thread.join(); + } + + _session_info.burst_stop = false; + // Schedule sending out burst messages. - // Use some arbitrary "fast" rate: 100 packets per second - _server_component_impl.add_call_every( - [this]() { _send_burst_packet(); }, 0.01f, &_burst_call_every_cookie); + _session_info.burst_thread = std::thread([this]() { + while (!_session_info.burst_stop) + if (_send_burst_packet()) + break; + }); // Don't send response as that's done in the call every burst call above. } -void MavlinkFtpServer::_send_burst_packet() +// Returns true if sending is complete +bool MavlinkFtpServer::_send_burst_packet() { std::lock_guard lock(_mutex); if (!_session_info.ifstream.is_open()) { - _reset(); + return false; } PayloadHeader burst_packet{}; @@ -797,11 +806,10 @@ void MavlinkFtpServer::_send_burst_packet() _send_mavlink_ftp_message(burst_packet); if (burst_packet.burst_complete == 1) { - if (_burst_call_every_cookie != nullptr) { - _server_component_impl.remove_call_every(_burst_call_every_cookie); - _burst_call_every_cookie = nullptr; - } + return true; } + + return false; } void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet) @@ -902,9 +910,9 @@ void MavlinkFtpServer::_reset() _session_info.ofstream.close(); } - if (_burst_call_every_cookie != nullptr) { - _server_component_impl.remove_call_every(_burst_call_every_cookie); - _burst_call_every_cookie = nullptr; + _session_info.burst_stop = true; + if (_session_info.burst_thread.joinable()) { + _session_info.burst_thread.join(); } } diff --git a/src/mavsdk/core/mavlink_ftp_server.h b/src/mavsdk/core/mavlink_ftp_server.h index dca1eac021..f9e1ad309f 100644 --- a/src/mavsdk/core/mavlink_ftp_server.h +++ b/src/mavsdk/core/mavlink_ftp_server.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "mavlink_include.h" @@ -133,9 +134,8 @@ class MavlinkFtpServer { void _work_rename(const PayloadHeader& payload); void _work_calc_file_CRC32(const PayloadHeader& payload); - void _send_burst_packet(); + bool _send_burst_packet(); void _make_burst_packet(PayloadHeader& packet); - void* _burst_call_every_cookie{nullptr}; std::mutex _mutex{}; struct SessionInfo { @@ -144,6 +144,8 @@ class MavlinkFtpServer { uint8_t burst_chunk_size{0}; std::ifstream ifstream; std::ofstream ofstream; + bool burst_stop{false}; + std::thread burst_thread; } _session_info{}; uint8_t _network_id = 0; diff --git a/src/system_tests/ftp_download_file_burst.cpp b/src/system_tests/ftp_download_file_burst.cpp index 3727880c37..23a8d2629c 100644 --- a/src/system_tests/ftp_download_file_burst.cpp +++ b/src/system_tests/ftp_download_file_burst.cpp @@ -209,7 +209,10 @@ TEST(SystemTest, FtpDownloadBurstBigFileLossy) TEST(SystemTest, FtpDownloadBurstStopAndTryAgain) { - ASSERT_TRUE(create_temp_file(temp_dir_provided / temp_file, 1000)); + constexpr int file_size = 1000; + constexpr int msg_count = file_size / 255 + 6; // 6 messages for transfer initialization + + ASSERT_TRUE(create_temp_file(temp_dir_provided / temp_file, file_size)); ASSERT_TRUE(reset_directories(temp_dir_downloaded)); Mavsdk mavsdk_groundstation{Mavsdk::Configuration{Mavsdk::ComponentType::GroundStation}}; @@ -219,11 +222,26 @@ TEST(SystemTest, FtpDownloadBurstStopAndTryAgain) mavsdk_autopilot.set_timeout_s(reduced_timeout_s); // Once we received half, we want to stop all traffic. - bool got_half = false; - auto drop_at_some_point = [&got_half](mavlink_message_t&) { return !got_half; }; - - mavsdk_groundstation.intercept_incoming_messages_async(drop_at_some_point); - mavsdk_groundstation.intercept_outgoing_messages_async(drop_at_some_point); + int received = 0; + auto drop_at_some_point_in = [&received, msg_count](mavlink_message_t& message) { + if (message.msgid == MAVLINK_MSG_ID_FILE_TRANSFER_PROTOCOL) { + received++; + } + if (received >= msg_count / 2) { + return false; + } + return true; + }; + + auto drop_at_some_point_out = [&received, msg_count](mavlink_message_t& message) { + if (received >= msg_count / 2) { + return false; + } + return true; + }; + + mavsdk_groundstation.intercept_incoming_messages_async(drop_at_some_point_in); + mavsdk_groundstation.intercept_outgoing_messages_async(drop_at_some_point_out); ASSERT_EQ(mavsdk_groundstation.add_any_connection("udp://:17000"), ConnectionResult::Success); ASSERT_EQ( @@ -247,10 +265,7 @@ TEST(SystemTest, FtpDownloadBurstStopAndTryAgain) ("" / temp_file).string(), temp_dir_downloaded.string(), true, - [&prom, &got_half](Ftp::Result result, Ftp::ProgressData progress_data) { - if (progress_data.bytes_transferred > 500) { - got_half = true; - } + [&prom](Ftp::Result result, Ftp::ProgressData progress_data) { if (result != Ftp::Result::Next) { prom.set_value(result); } else {