Skip to content

Commit

Permalink
WIP - Adding Plat/Firwm
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoMario109 committed Oct 14, 2024
1 parent f6a1d18 commit d785058
Show file tree
Hide file tree
Showing 12 changed files with 1,211,754 additions and 0 deletions.
117 changes: 117 additions & 0 deletions core/services/ardupilot_manager/autopilot/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@

import abc
from typing import List, Optional, Type

from loguru import logger

from autopilot.exceptions import InvalidEnvironmentImplementation
from autopilot.firmware import AutopilotFirmware
from flight_controller import FlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager


class AutopilotEnvironment(abc.ABC):
def __init__(self, owner: str) -> None:
self.owner = owner
self.mavlink_manager: Optional[MavlinkManager] = None

@classmethod
def all(cls) -> List[str]:
return [subclass.__name__ for subclass in cls.__subclasses__()]

@classmethod
def get(cls, name: str) -> Type["AutopilotEnvironment"]:
for subclass in cls.__subclasses__():
if subclass.__name__ == name:
return subclass()
raise InvalidEnvironmentImplementation(f"{name} is not a valid environment implementation class")

async def start_mavlink_manager(self, device: Endpoint) -> None:
default_endpoints = [
Endpoint(
name="GCS Server Link",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="0.0.0.0",
argument=14550,
persistent=True,
enabled=False,
),
Endpoint(
name="GCS Client Link",
owner=self.owner,
connection_type=EndpointType.UDPClient,
place="192.168.2.1",
argument=14550,
persistent=True,
enabled=True,
),
Endpoint(
name="MAVLink2RestServer",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="127.0.0.1",
argument=14001,
persistent=True,
protected=True,
),
Endpoint(
name="MAVLink2Rest",
owner=self.owner,
connection_type=EndpointType.UDPClient,
place="127.0.0.1",
argument=14000,
persistent=True,
protected=True,
overwrite_settings=True,
),
Endpoint(
name="Internal Link",
owner=self.owner,
connection_type=EndpointType.TCPServer,
place="127.0.0.1",
argument=5777,
persistent=True,
protected=True,
overwrite_settings=True,
),
Endpoint(
name="Ping360 Heading",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="0.0.0.0",
argument=14660,
persistent=True,
protected=True,
),
]
for endpoint in default_endpoints:
try:
self.mavlink_manager.add_endpoint(endpoint)
except EndpointAlreadyExists:
pass
except Exception as error:
logger.warning(str(error))
await self.mavlink_manager.start(device)

@abc.abstractmethod
async def start(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def stop(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def restart(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def setup(self) -> None:
raise NotImplementedError

@abc.abstractmethod
def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from autopilot.environment import AutopilotEnvironment

class LinuxAutopilotEnvironment(AutopilotEnvironment):
pass
44 changes: 44 additions & 0 deletions core/services/ardupilot_manager/autopilot/environments/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional
from autopilot.environment import AutopilotEnvironment
from autopilot.firmware import AutopilotFirmware
from flight_controller import FlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType

class SerialEnvironment(AutopilotEnvironment):
def __init__(self, owner: str, board: FlightController) -> None:
self.board: FlightController = board
super().__init__(owner)

async def start(self) -> None:
if not self.board.path:
raise ValueError(f"Could not find device path for board {self.board.name}.")

baudrate = 115200
is_px4 = "px4" in self.board.name.lower()
if is_px4:
baudrate = 57600
await self.start_mavlink_manager(
Endpoint(
name="Master",
owner=self.owner,
connection_type=EndpointType.Serial,
place=self.board.path,
argument=baudrate,
protected=True,
)
)
if is_px4:
# PX4 needs at least one initial heartbeat to start sending data
await self.vehicle_manager.burst_heartbeat()

def stop(self) -> None:
raise NotImplementedError

def restart(self) -> None:
raise NotImplementedError

def setup(self) -> None:
raise NotImplementedError

def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]:
raise NotImplementedError
Empty file.
18 changes: 18 additions & 0 deletions core/services/ardupilot_manager/autopilot/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

class InvalidEnvironmentImplementation(ValueError):
"""Required platform implementation class does not exists"""

class InvalidFirmwareImplementation(ValueError):
"""Required firmware implementation class does not exists"""

class InvalidAutopilotManifestData(ValueError):
"""Invalid manifest data"""

class InvalidAutopilotManifestURL(ValueError):
"""Invalid manifest URL"""

class ManifestDataFetchFailed(RuntimeError):
"""Failed to fetch manifest data"""

class BackendIsOffline(RuntimeError):
"""Backend is offline"""
20 changes: 20 additions & 0 deletions core/services/ardupilot_manager/autopilot/firmware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import List, Optional, Type

from autopilot.exceptions import InvalidFirmwareImplementation
from flight_controller import FlightController


class AutopilotFirmware:
@classmethod
def all(cls) -> List[str]:
return [subclass.__name__ for subclass in cls.__subclasses__()]

@classmethod
def get(cls, name: str) -> Optional[Type["AutopilotFirmware"]]:
for subclass in cls.__subclasses__():
if subclass.__name__ == name:
return subclass()
raise InvalidFirmwareImplementation(f"{name} is not a valid firmware implementation class")

def supported_boards(self) -> List[FlightController]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from autopilot.firmware import AutopilotFirmware


class ArdupilotFirmware(AutopilotFirmware):
pass
Loading

0 comments on commit d785058

Please sign in to comment.