Skip to content

Commit

Permalink
core: ardupilot_manager: add support to all usb boards
Browse files Browse the repository at this point in the history
  • Loading branch information
Williangalvani committed Jan 9, 2025
1 parent 00bc6c5 commit 619e3b4
Show file tree
Hide file tree
Showing 10 changed files with 947 additions and 60 deletions.
2 changes: 1 addition & 1 deletion core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def get_firmware_vehicle_type() -> Any:
summary="Retrieve dictionary of available firmwares versions with their respective URL.",
)
async def get_available_firmwares(vehicle: Vehicle, board_name: Optional[str] = None) -> Any:
return autopilot.get_available_firmwares(vehicle, (await target_board(board_name)).platform)
return autopilot.get_available_firmwares(vehicle, (await target_board(board_name)))


@index_router_v1.post("/install_firmware_from_url", summary="Install firmware for given URL.")
Expand Down
8 changes: 4 additions & 4 deletions core/services/ardupilot_manager/autopilot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ async def start_linux_board(self, board: LinuxFlightController) -> None:
)

firmware_path = self.firmware_manager.firmware_path(self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board)

# ArduPilot process will connect as a client on the UDP server created by the mavlink router
master_endpoint = Endpoint(
Expand Down Expand Up @@ -319,7 +319,7 @@ async def start_sitl(self) -> None:
self.current_sitl_frame = frame

firmware_path = self.firmware_manager.firmware_path(self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board)

# ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client
master_endpoint = Endpoint(
Expand Down Expand Up @@ -624,8 +624,8 @@ async def update_endpoints(self, endpoints_to_update: Set[Endpoint]) -> None:
self._save_current_endpoints()
await self.mavlink_manager.restart()

def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]:
return self.firmware_manager.get_available_firmwares(vehicle, platform)
def get_available_firmwares(self, vehicle: Vehicle, board: FlightController) -> List[Firmware]:
return self.firmware_manager.get_available_firmwares(vehicle, board)

def install_firmware_from_file(
self, firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None
Expand Down
39 changes: 21 additions & 18 deletions core/services/ardupilot_manager/firmware/FirmwareDownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
NoCandidate,
NoVersionAvailable,
)
from typedefs import FirmwareFormat, Platform, PlatformType, Vehicle
from typedefs import FirmwareFormat, FlightController, Platform, PlatformType, Vehicle

# TODO: This should be not necessary
# Disable SSL verification
Expand Down Expand Up @@ -123,21 +123,21 @@ def _find_version_item(self, **args: str) -> List[Dict[str, Any]]:
# Make sure that the item matches all args value
for item in self._manifest["firmware"]:
for key, value in args.items():
real_key = key.replace("_", "-")
if real_key not in item or item[real_key] != value:
real_key = key.replace("_", "-").lower()
if real_key not in item or item[real_key].lower() != value.lower():
break
else:
found_version_item.append(item)

return found_version_item

@temporary_cache(timeout_seconds=3600)
def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[str]:
def get_available_versions(self, vehicle: Vehicle, board: FlightController) -> List[str]:
"""Get available firmware versions for the specific plataform and vehicle
Args:
vehicle (Vehicle): Desired vehicle.
platform (Platform): Desired platform.
board (FlightController): Desired Flight Controller.
Returns:
List[str]: List of available versions that match the specific desired configuration.
Expand All @@ -147,16 +147,18 @@ def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[s
if not self._manifest_is_valid():
raise InvalidManifest("Manifest file is invalid. Cannot use it to find available versions.")

items = self._find_version_item(vehicletype=vehicle.value, platform=platform.value)
platform = board.platform.value if board.platform != Platform.GenericSerial else board.name
platform_type = board.platform.type if board.platform != Platform.GenericSerial else PlatformType.Serial
items = self._find_version_item(vehicletype=vehicle.value, platform=platform)

for item in items:
if item["format"] == FirmwareDownloader._supported_firmware_formats[platform.type]:
if item["format"] == FirmwareDownloader._supported_firmware_formats[platform_type]:
available_versions.append(item["mav-firmware-version-type"])

return available_versions

@temporary_cache(timeout_seconds=3600)
def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str = "") -> str:
def get_download_url(self, vehicle: Vehicle, board: FlightController, version: str = "") -> str:
"""Find a specific firmware URL from manifest that matches the arguments.
Args:
Expand All @@ -168,16 +170,16 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
Returns:
str: URL of valid firmware.
"""
versions = self.get_available_versions(vehicle, platform)
logger.debug(f"Got following versions for {vehicle} running {platform}: {versions}")
versions = self.get_available_versions(vehicle, board)
logger.debug(f"Got following versions for {vehicle} running {board}: {versions}")

if not versions:
raise NoVersionAvailable(f"Could not find available firmware versions for {platform}/{vehicle}.")
raise NoVersionAvailable(f"Could not find available firmware versions for {board}/{vehicle}.")

if version and version not in versions:
raise NoVersionAvailable(f"Version {version} was not found for {platform}/{vehicle}.")
raise NoVersionAvailable(f"Version {version} was not found for {board}/{vehicle}.")

firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type]
firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type]

# Autodetect the latest supported version.
# For .apj firmwares (e.g. Pixhawk), we use the latest STABLE version while for the others (e.g. SITL and
Expand All @@ -192,21 +194,22 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
if not newest_version or Version(newest_version) < Version(semver_version):
newest_version = semver_version
if not newest_version:
raise NoVersionAvailable(f"No firmware versions found for {platform}/{vehicle}.")
raise NoVersionAvailable(f"No firmware versions found for {board}/{vehicle}.")
version = f"STABLE-{newest_version}"
else:
version = "BETA"

items = self._find_version_item(
vehicletype=vehicle.value,
platform=platform.value,
platform=board.platform.value if board.platform == Platform.SITL else board.name,
mav_firmware_version_type=version,
format=firmware_format,
)

if len(items) == 0:
logger.error(f"Could not find any firmware for {vehicle=}, {board=}, {version=}, {firmware_format=}")
raise NoCandidate(
f"Found no candidate for configuration: {vehicle=}, {platform=}, {version=}, {firmware_format=}"
f"Found no candidate for configuration: {vehicle=}, {board=}, {version=}, {firmware_format=}"
)

if len(items) != 1:
Expand All @@ -216,7 +219,7 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
logger.debug(f"Downloading following firmware: {item}")
return str(item["url"])

def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> pathlib.Path:
def download(self, vehicle: Vehicle, board: FlightController, version: str = "") -> pathlib.Path:
"""Download a specific firmware that matches the arguments.
Args:
Expand All @@ -228,5 +231,5 @@ def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> p
Returns:
pathlib.Path: Temporary path for the firmware file.
"""
url = self.get_download_url(vehicle, platform, version)
url = self.get_download_url(vehicle, board, version)
return FirmwareDownloader._download(url)
Loading

0 comments on commit 619e3b4

Please sign in to comment.