Skip to content

Commit

Permalink
Ftp server : use a thread to send ftp burst (#2250)
Browse files Browse the repository at this point in the history
* Ftp server : use a thread to send ftp burst

* system tests : increase FtpDownloadBurstStopAndTryAgain file length

FTP server is now faster, we need to increase file length because it was
sometimes already entirely received when the callback was called.

* Revert "system tests : increase FtpDownloadBurstStopAndTryAgain file length"

This reverts commit c233cdd.

* Test commit debug

* System tests: fix FtpDownloadBurstStopAndTryAgain

Only use intercept callbacks to simulation connection problem to avoid
buffering.

* windows

* remove debug
  • Loading branch information
chep authored Mar 19, 2024
1 parent 432e4d7 commit 10876a8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
34 changes: 21 additions & 13 deletions src/mavsdk/core/mavlink_ftp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,22 +770,31 @@ void MavlinkFtpServer::_work_burst(const PayloadHeader& payload)

_session_info.burst_offset = payload.offset;
_session_info.burst_chunk_size = payload.size;

_burst_seq = payload.seq_number + 1;

if (_session_info.burst_thread.joinable()) {
_session_info.burst_stop = true;
_session_info.burst_thread.join();
}

_session_info.burst_stop = false;

// Schedule sending out burst messages.
// Use some arbitrary "fast" rate: 100 packets per second
_server_component_impl.add_call_every(
[this]() { _send_burst_packet(); }, 0.01f, &_burst_call_every_cookie);
_session_info.burst_thread = std::thread([this]() {
while (!_session_info.burst_stop)
if (_send_burst_packet())
break;
});

// Don't send response as that's done in the call every burst call above.
}

void MavlinkFtpServer::_send_burst_packet()
// Returns true if sending is complete
bool MavlinkFtpServer::_send_burst_packet()
{
std::lock_guard<std::mutex> lock(_mutex);
if (!_session_info.ifstream.is_open()) {
_reset();
return false;
}

PayloadHeader burst_packet{};
Expand All @@ -797,11 +806,10 @@ void MavlinkFtpServer::_send_burst_packet()
_send_mavlink_ftp_message(burst_packet);

if (burst_packet.burst_complete == 1) {
if (_burst_call_every_cookie != nullptr) {
_server_component_impl.remove_call_every(_burst_call_every_cookie);
_burst_call_every_cookie = nullptr;
}
return true;
}

return false;
}

void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet)
Expand Down Expand Up @@ -902,9 +910,9 @@ void MavlinkFtpServer::_reset()
_session_info.ofstream.close();
}

if (_burst_call_every_cookie != nullptr) {
_server_component_impl.remove_call_every(_burst_call_every_cookie);
_burst_call_every_cookie = nullptr;
_session_info.burst_stop = true;
if (_session_info.burst_thread.joinable()) {
_session_info.burst_thread.join();
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/mavsdk/core/mavlink_ftp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <utility>
#include <variant>
#include <vector>
#include <thread>

#include "mavlink_include.h"

Expand Down Expand Up @@ -133,9 +134,8 @@ class MavlinkFtpServer {
void _work_rename(const PayloadHeader& payload);
void _work_calc_file_CRC32(const PayloadHeader& payload);

void _send_burst_packet();
bool _send_burst_packet();
void _make_burst_packet(PayloadHeader& packet);
void* _burst_call_every_cookie{nullptr};

std::mutex _mutex{};
struct SessionInfo {
Expand All @@ -144,6 +144,8 @@ class MavlinkFtpServer {
uint8_t burst_chunk_size{0};
std::ifstream ifstream;
std::ofstream ofstream;
bool burst_stop{false};
std::thread burst_thread;
} _session_info{};

uint8_t _network_id = 0;
Expand Down
35 changes: 25 additions & 10 deletions src/system_tests/ftp_download_file_burst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ TEST(SystemTest, FtpDownloadBurstBigFileLossy)

TEST(SystemTest, FtpDownloadBurstStopAndTryAgain)
{
ASSERT_TRUE(create_temp_file(temp_dir_provided / temp_file, 1000));
constexpr int file_size = 1000;
constexpr int msg_count = file_size / 255 + 6; // 6 messages for transfer initialization

ASSERT_TRUE(create_temp_file(temp_dir_provided / temp_file, file_size));
ASSERT_TRUE(reset_directories(temp_dir_downloaded));

Mavsdk mavsdk_groundstation{Mavsdk::Configuration{Mavsdk::ComponentType::GroundStation}};
Expand All @@ -219,11 +222,26 @@ TEST(SystemTest, FtpDownloadBurstStopAndTryAgain)
mavsdk_autopilot.set_timeout_s(reduced_timeout_s);

// Once we received half, we want to stop all traffic.
bool got_half = false;
auto drop_at_some_point = [&got_half](mavlink_message_t&) { return !got_half; };

mavsdk_groundstation.intercept_incoming_messages_async(drop_at_some_point);
mavsdk_groundstation.intercept_outgoing_messages_async(drop_at_some_point);
int received = 0;
auto drop_at_some_point_in = [&received, msg_count](mavlink_message_t& message) {
if (message.msgid == MAVLINK_MSG_ID_FILE_TRANSFER_PROTOCOL) {
received++;
}
if (received >= msg_count / 2) {
return false;
}
return true;
};

auto drop_at_some_point_out = [&received, msg_count](mavlink_message_t& message) {
if (received >= msg_count / 2) {
return false;
}
return true;
};

mavsdk_groundstation.intercept_incoming_messages_async(drop_at_some_point_in);
mavsdk_groundstation.intercept_outgoing_messages_async(drop_at_some_point_out);

ASSERT_EQ(mavsdk_groundstation.add_any_connection("udp://:17000"), ConnectionResult::Success);
ASSERT_EQ(
Expand All @@ -247,10 +265,7 @@ TEST(SystemTest, FtpDownloadBurstStopAndTryAgain)
("" / temp_file).string(),
temp_dir_downloaded.string(),
true,
[&prom, &got_half](Ftp::Result result, Ftp::ProgressData progress_data) {
if (progress_data.bytes_transferred > 500) {
got_half = true;
}
[&prom](Ftp::Result result, Ftp::ProgressData progress_data) {
if (result != Ftp::Result::Next) {
prom.set_value(result);
} else {
Expand Down

0 comments on commit 10876a8

Please sign in to comment.