diff --git a/src/backend/src/core/core_service_impl.h b/src/backend/src/core/core_service_impl.h index 17c261dc30..2e6d4884c2 100644 --- a/src/backend/src/core/core_service_impl.h +++ b/src/backend/src/core/core_service_impl.h @@ -21,14 +21,20 @@ class CoreServiceImpl final : public mavsdk::rpc::core::CoreService::Service { const rpc::core::SubscribeConnectionStateRequest * /* request */, grpc::ServerWriter *writer) override { - _dc.register_on_discover([&writer](const uint64_t uuid) { + std::mutex connection_state_mutex{}; + + _dc.register_on_discover([&writer, &connection_state_mutex](const uint64_t uuid) { const auto rpc_connection_state_response = createRpcConnectionStateResponse(uuid, true); + + std::lock_guard lock(connection_state_mutex); writer->Write(rpc_connection_state_response); }); - _dc.register_on_timeout([&writer](const uint64_t uuid) { + _dc.register_on_timeout([&writer, &connection_state_mutex](const uint64_t uuid) { const auto rpc_connection_state_response = createRpcConnectionStateResponse(uuid, false); + + std::lock_guard lock(connection_state_mutex); writer->Write(rpc_connection_state_response); }); diff --git a/src/backend/src/plugins/telemetry/telemetry_service_impl.h b/src/backend/src/plugins/telemetry/telemetry_service_impl.h index 3f1e9b6082..1aae626c2c 100644 --- a/src/backend/src/plugins/telemetry/telemetry_service_impl.h +++ b/src/backend/src/plugins/telemetry/telemetry_service_impl.h @@ -20,7 +20,9 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribePositionRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.position_async([&writer](mavsdk::Telemetry::Position position) { + std::mutex position_mutex{}; + + _telemetry.position_async([&writer, &position_mutex](mavsdk::Telemetry::Position position) { auto rpc_position = new mavsdk::rpc::telemetry::Position(); rpc_position->set_latitude_deg(position.latitude_deg); rpc_position->set_longitude_deg(position.longitude_deg); @@ -29,6 +31,8 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi mavsdk::rpc::telemetry::PositionResponse rpc_position_response; rpc_position_response.set_allocated_position(rpc_position); + + std::lock_guard lock(position_mutex); writer->Write(rpc_position_response); }); @@ -41,7 +45,9 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeHealthRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.health_async([&writer](mavsdk::Telemetry::Health health) { + std::mutex health_mutex{}; + + _telemetry.health_async([&writer, &health_mutex](mavsdk::Telemetry::Health health) { auto rpc_health = new mavsdk::rpc::telemetry::Health(); rpc_health->set_is_gyrometer_calibration_ok(health.gyrometer_calibration_ok); rpc_health->set_is_accelerometer_calibration_ok(health.accelerometer_calibration_ok); @@ -53,6 +59,8 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi mavsdk::rpc::telemetry::HealthResponse rpc_health_response; rpc_health_response.set_allocated_health(rpc_health); + + std::lock_guard lock(health_mutex); writer->Write(rpc_health_response); }); @@ -64,17 +72,22 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeHomeRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.home_position_async([&writer](mavsdk::Telemetry::Position position) { - auto rpc_position = new mavsdk::rpc::telemetry::Position(); - rpc_position->set_latitude_deg(position.latitude_deg); - rpc_position->set_longitude_deg(position.longitude_deg); - rpc_position->set_relative_altitude_m(position.relative_altitude_m); - rpc_position->set_absolute_altitude_m(position.absolute_altitude_m); + std::mutex home_mutex{}; - mavsdk::rpc::telemetry::HomeResponse rpc_home_response; - rpc_home_response.set_allocated_home(rpc_position); - writer->Write(rpc_home_response); - }); + _telemetry.home_position_async( + [&writer, &home_mutex](mavsdk::Telemetry::Position position) { + auto rpc_position = new mavsdk::rpc::telemetry::Position(); + rpc_position->set_latitude_deg(position.latitude_deg); + rpc_position->set_longitude_deg(position.longitude_deg); + rpc_position->set_relative_altitude_m(position.relative_altitude_m); + rpc_position->set_absolute_altitude_m(position.absolute_altitude_m); + + mavsdk::rpc::telemetry::HomeResponse rpc_home_response; + rpc_home_response.set_allocated_home(rpc_position); + + std::lock_guard lock(home_mutex); + writer->Write(rpc_home_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -84,9 +97,13 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeInAirRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.in_air_async([&writer](bool is_in_air) { + std::mutex in_air_mutex{}; + + _telemetry.in_air_async([&writer, &in_air_mutex](bool is_in_air) { mavsdk::rpc::telemetry::InAirResponse rpc_in_air_response; rpc_in_air_response.set_is_in_air(is_in_air); + + std::lock_guard lock(in_air_mutex); writer->Write(rpc_in_air_response); }); @@ -99,15 +116,20 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeStatusTextRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.status_text_async([this, &writer](mavsdk::Telemetry::StatusText status_text) { - auto rpc_status_text = new mavsdk::rpc::telemetry::StatusText(); - rpc_status_text->set_text(status_text.text); - rpc_status_text->set_type(translateStatusTextType(status_text.type)); - - mavsdk::rpc::telemetry::StatusTextResponse rpc_status_text_response; - rpc_status_text_response.set_allocated_status_text(rpc_status_text); - writer->Write(rpc_status_text_response); - }); + std::mutex status_text_mutex{}; + + _telemetry.status_text_async( + [this, &writer, &status_text_mutex](mavsdk::Telemetry::StatusText status_text) { + auto rpc_status_text = new mavsdk::rpc::telemetry::StatusText(); + rpc_status_text->set_text(status_text.text); + rpc_status_text->set_type(translateStatusTextType(status_text.type)); + + mavsdk::rpc::telemetry::StatusTextResponse rpc_status_text_response; + rpc_status_text_response.set_allocated_status_text(rpc_status_text); + + std::lock_guard lock(status_text_mutex); + writer->Write(rpc_status_text_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -133,9 +155,13 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeArmedRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.armed_async([&writer](bool is_armed) { + std::mutex armed_mutex{}; + + _telemetry.armed_async([&writer, &armed_mutex](bool is_armed) { mavsdk::rpc::telemetry::ArmedResponse rpc_armed_response; rpc_armed_response.set_is_armed(is_armed); + + std::lock_guard lock(armed_mutex); writer->Write(rpc_armed_response); }); @@ -148,15 +174,20 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeGpsInfoRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.gps_info_async([this, &writer](mavsdk::Telemetry::GPSInfo gps_info) { - auto rpc_gps_info = new mavsdk::rpc::telemetry::GpsInfo(); - rpc_gps_info->set_num_satellites(gps_info.num_satellites); - rpc_gps_info->set_fix_type(translateGpsFixType(gps_info.fix_type)); - - mavsdk::rpc::telemetry::GpsInfoResponse rpc_gps_info_response; - rpc_gps_info_response.set_allocated_gps_info(rpc_gps_info); - writer->Write(rpc_gps_info_response); - }); + std::mutex gps_info_mutex{}; + + _telemetry.gps_info_async( + [this, &writer, &gps_info_mutex](mavsdk::Telemetry::GPSInfo gps_info) { + auto rpc_gps_info = new mavsdk::rpc::telemetry::GpsInfo(); + rpc_gps_info->set_num_satellites(gps_info.num_satellites); + rpc_gps_info->set_fix_type(translateGpsFixType(gps_info.fix_type)); + + mavsdk::rpc::telemetry::GpsInfoResponse rpc_gps_info_response; + rpc_gps_info_response.set_allocated_gps_info(rpc_gps_info); + + std::lock_guard lock(gps_info_mutex); + writer->Write(rpc_gps_info_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -188,13 +219,17 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeBatteryRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.battery_async([&writer](mavsdk::Telemetry::Battery battery) { + std::mutex battery_mutex{}; + + _telemetry.battery_async([&writer, &battery_mutex](mavsdk::Telemetry::Battery battery) { auto rpc_battery = new mavsdk::rpc::telemetry::Battery(); rpc_battery->set_voltage_v(battery.voltage_v); rpc_battery->set_remaining_percent(battery.remaining_percent); mavsdk::rpc::telemetry::BatteryResponse rpc_battery_response; rpc_battery_response.set_allocated_battery(rpc_battery); + + std::lock_guard lock(battery_mutex); writer->Write(rpc_battery_response); }); @@ -207,13 +242,18 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeFlightModeRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.flight_mode_async([this, &writer](mavsdk::Telemetry::FlightMode flight_mode) { - auto rpc_flight_mode = translateFlightMode(flight_mode); + std::mutex flight_mode_mutex{}; - mavsdk::rpc::telemetry::FlightModeResponse rpc_flight_mode_response; - rpc_flight_mode_response.set_flight_mode(rpc_flight_mode); - writer->Write(rpc_flight_mode_response); - }); + _telemetry.flight_mode_async( + [this, &writer, &flight_mode_mutex](mavsdk::Telemetry::FlightMode flight_mode) { + auto rpc_flight_mode = translateFlightMode(flight_mode); + + mavsdk::rpc::telemetry::FlightModeResponse rpc_flight_mode_response; + rpc_flight_mode_response.set_flight_mode(rpc_flight_mode); + + std::lock_guard lock(flight_mode_mutex); + writer->Write(rpc_flight_mode_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -250,17 +290,22 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeAttitudeQuaternionRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.attitude_quaternion_async([&writer](mavsdk::Telemetry::Quaternion quaternion) { - auto rpc_quaternion = new mavsdk::rpc::telemetry::Quaternion(); - rpc_quaternion->set_w(quaternion.w); - rpc_quaternion->set_x(quaternion.x); - rpc_quaternion->set_y(quaternion.y); - rpc_quaternion->set_z(quaternion.z); - - mavsdk::rpc::telemetry::AttitudeQuaternionResponse rpc_quaternion_response; - rpc_quaternion_response.set_allocated_attitude_quaternion(rpc_quaternion); - writer->Write(rpc_quaternion_response); - }); + std::mutex attitude_quaternion_mutex{}; + + _telemetry.attitude_quaternion_async( + [&writer, &attitude_quaternion_mutex](mavsdk::Telemetry::Quaternion quaternion) { + auto rpc_quaternion = new mavsdk::rpc::telemetry::Quaternion(); + rpc_quaternion->set_w(quaternion.w); + rpc_quaternion->set_x(quaternion.x); + rpc_quaternion->set_y(quaternion.y); + rpc_quaternion->set_z(quaternion.z); + + mavsdk::rpc::telemetry::AttitudeQuaternionResponse rpc_quaternion_response; + rpc_quaternion_response.set_allocated_attitude_quaternion(rpc_quaternion); + + std::lock_guard lock(attitude_quaternion_mutex); + writer->Write(rpc_quaternion_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -271,16 +316,21 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeAttitudeEulerRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.attitude_euler_angle_async([&writer](mavsdk::Telemetry::EulerAngle euler_angle) { - auto rpc_euler_angle = new mavsdk::rpc::telemetry::EulerAngle(); - rpc_euler_angle->set_roll_deg(euler_angle.roll_deg); - rpc_euler_angle->set_pitch_deg(euler_angle.pitch_deg); - rpc_euler_angle->set_yaw_deg(euler_angle.yaw_deg); - - mavsdk::rpc::telemetry::AttitudeEulerResponse rpc_euler_response; - rpc_euler_response.set_allocated_attitude_euler(rpc_euler_angle); - writer->Write(rpc_euler_response); - }); + std::mutex attitude_euler_mutex{}; + + _telemetry.attitude_euler_angle_async( + [&writer, &attitude_euler_mutex](mavsdk::Telemetry::EulerAngle euler_angle) { + auto rpc_euler_angle = new mavsdk::rpc::telemetry::EulerAngle(); + rpc_euler_angle->set_roll_deg(euler_angle.roll_deg); + rpc_euler_angle->set_pitch_deg(euler_angle.pitch_deg); + rpc_euler_angle->set_yaw_deg(euler_angle.yaw_deg); + + mavsdk::rpc::telemetry::AttitudeEulerResponse rpc_euler_response; + rpc_euler_response.set_allocated_attitude_euler(rpc_euler_angle); + + std::lock_guard lock(attitude_euler_mutex); + writer->Write(rpc_euler_response); + }); _stop_future.wait(); return grpc::Status::OK; @@ -291,8 +341,10 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeCameraAttitudeQuaternionRequest * /* request */, grpc::ServerWriter *writer) override { + std::mutex camera_attitude_quaternion_mutex{}; + _telemetry.camera_attitude_quaternion_async( - [&writer](mavsdk::Telemetry::Quaternion quaternion) { + [&writer, &camera_attitude_quaternion_mutex](mavsdk::Telemetry::Quaternion quaternion) { auto rpc_quaternion = new mavsdk::rpc::telemetry::Quaternion(); rpc_quaternion->set_w(quaternion.w); rpc_quaternion->set_x(quaternion.x); @@ -301,6 +353,8 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi mavsdk::rpc::telemetry::CameraAttitudeQuaternionResponse rpc_quaternion_response; rpc_quaternion_response.set_allocated_attitude_quaternion(rpc_quaternion); + + std::lock_guard lock(camera_attitude_quaternion_mutex); writer->Write(rpc_quaternion_response); }); @@ -313,8 +367,10 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeCameraAttitudeEulerRequest * /* request */, grpc::ServerWriter *writer) override { + std::mutex camera_attitude_euler_mutex{}; + _telemetry.camera_attitude_euler_angle_async( - [&writer](mavsdk::Telemetry::EulerAngle euler_angle) { + [&writer, &camera_attitude_euler_mutex](mavsdk::Telemetry::EulerAngle euler_angle) { auto rpc_euler_angle = new mavsdk::rpc::telemetry::EulerAngle(); rpc_euler_angle->set_roll_deg(euler_angle.roll_deg); rpc_euler_angle->set_pitch_deg(euler_angle.pitch_deg); @@ -322,6 +378,8 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi mavsdk::rpc::telemetry::CameraAttitudeEulerResponse rpc_euler_response; rpc_euler_response.set_allocated_attitude_euler(rpc_euler_angle); + + std::lock_guard lock(camera_attitude_euler_mutex); writer->Write(rpc_euler_response); }); @@ -334,8 +392,10 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeGroundSpeedNedRequest * /* request */, grpc::ServerWriter *writer) override { + std::mutex ground_speed_mutex{}; + _telemetry.ground_speed_ned_async( - [&writer](mavsdk::Telemetry::GroundSpeedNED ground_speed) { + [&writer, &ground_speed_mutex](mavsdk::Telemetry::GroundSpeedNED ground_speed) { auto rpc_ground_speed = new mavsdk::rpc::telemetry::SpeedNed(); rpc_ground_speed->set_velocity_north_m_s(ground_speed.velocity_north_m_s); rpc_ground_speed->set_velocity_east_m_s(ground_speed.velocity_east_m_s); @@ -343,6 +403,8 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi mavsdk::rpc::telemetry::GroundSpeedNedResponse rpc_ground_speed_response; rpc_ground_speed_response.set_allocated_ground_speed_ned(rpc_ground_speed); + + std::lock_guard lock(ground_speed_mutex); writer->Write(rpc_ground_speed_response); }); @@ -355,16 +417,21 @@ class TelemetryServiceImpl final : public mavsdk::rpc::telemetry::TelemetryServi const mavsdk::rpc::telemetry::SubscribeRcStatusRequest * /* request */, grpc::ServerWriter *writer) override { - _telemetry.rc_status_async([&writer](mavsdk::Telemetry::RCStatus rc_status) { - auto rpc_rc_status = new mavsdk::rpc::telemetry::RcStatus(); - rpc_rc_status->set_was_available_once(rc_status.available_once); - rpc_rc_status->set_is_available(rc_status.available); - rpc_rc_status->set_signal_strength_percent(rc_status.signal_strength_percent); - - mavsdk::rpc::telemetry::RcStatusResponse rpc_rc_status_response; - rpc_rc_status_response.set_allocated_rc_status(rpc_rc_status); - writer->Write(rpc_rc_status_response); - }); + std::mutex rc_status_mutex{}; + + _telemetry.rc_status_async( + [&writer, &rc_status_mutex](mavsdk::Telemetry::RCStatus rc_status) { + auto rpc_rc_status = new mavsdk::rpc::telemetry::RcStatus(); + rpc_rc_status->set_was_available_once(rc_status.available_once); + rpc_rc_status->set_is_available(rc_status.available); + rpc_rc_status->set_signal_strength_percent(rc_status.signal_strength_percent); + + mavsdk::rpc::telemetry::RcStatusResponse rpc_rc_status_response; + rpc_rc_status_response.set_allocated_rc_status(rpc_rc_status); + + std::lock_guard lock(rc_status_mutex); + writer->Write(rpc_rc_status_response); + }); _stop_future.wait(); return grpc::Status::OK;