Skip to content

Commit

Permalink
ardupilot_manager: tweak tests so they run faster
Browse files Browse the repository at this point in the history
This took the mavlink routers tests down from 108s to 65s on my machine
  • Loading branch information
Williangalvani committed Jan 6, 2025
1 parent bd3d4f4 commit 104dc15
Showing 1 changed file with 52 additions and 4 deletions.
56 changes: 52 additions & 4 deletions core/services/ardupilot_manager/mavlink_proxy/test_all.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=redefined-outer-name
import asyncio
import os
import pathlib
import pty
Expand Down Expand Up @@ -182,6 +183,34 @@ async def serial_test_mavproxy(valid_output_endpoints: Set[Endpoint], valid_mast
)


def create_endpoints_with_offset(endpoints: Set[Endpoint], port_offset: int) -> Set[Endpoint]:
"""Create a new set of endpoints with ports offset by the given amount"""
new_endpoints = set()
for endpoint in endpoints:
# Only modify network endpoints with non-None arguments
if (
endpoint.connection_type
in [
EndpointType.UDPServer,
EndpointType.UDPClient,
EndpointType.TCPServer,
EndpointType.TCPClient,
]
and endpoint.argument is not None
):
new_endpoint = Endpoint(
name=endpoint.name,
owner=endpoint.owner,
connection_type=endpoint.connection_type,
place=endpoint.place,
argument=endpoint.argument + port_offset,
)
new_endpoints.add(new_endpoint)
else:
new_endpoints.add(endpoint)
return new_endpoints


async def serial_test_mavlink_router(
valid_output_endpoints: Set[Endpoint], valid_master_endpoints: Set[Endpoint]
) -> None:
Expand All @@ -191,6 +220,10 @@ async def serial_test_mavlink_router(

assert AbstractRouter.get_interface("MAVLinkRouter"), "Failed to find interface MAVLinkRouter"

# Use base ports for MAVLinkRouter (offset by 0)
output_endpoints = create_endpoints_with_offset(valid_output_endpoints, 0)
master_endpoints = create_endpoints_with_offset(valid_master_endpoints, 0)

mavlink_router = MAVLinkRouter()
assert mavlink_router.name() == "MAVLinkRouter", "Name does not match."
assert re.search(r"\d+", str(mavlink_router.version())) is not None, "Version does not follow pattern."
Expand All @@ -207,8 +240,12 @@ async def serial_test_mavlink_router(
EndpointType.TCPClient,
EndpointType.Serial,
]

filtered_output_endpoints = set(e for e in output_endpoints if e.connection_type in allowed_output_types)
filtered_master_endpoints = set(e for e in master_endpoints if e.connection_type in allowed_master_types)

await run_common_routing_tests(
mavlink_router, allowed_output_types, allowed_master_types, valid_output_endpoints, valid_master_endpoints
mavlink_router, allowed_output_types, allowed_master_types, filtered_output_endpoints, filtered_master_endpoints
)


Expand All @@ -220,6 +257,10 @@ async def serial_test_mavp2p(valid_output_endpoints: Set[Endpoint], valid_master

assert AbstractRouter.get_interface("MAVP2P"), "Failed to find interface MAVP2P"

# Offset ports by 100 for MAVP2P to avoid conflicts as we run tests in parallel
output_endpoints = create_endpoints_with_offset(valid_output_endpoints, 100)
master_endpoints = create_endpoints_with_offset(valid_master_endpoints, 100)

mavp2p = MAVP2P()
assert mavp2p.name() == "MAVP2P", "Name does not match."
assert re.search(r"\d+.\d+.\d+", str(mavp2p.version())) is not None, "Version does not follow pattern."
Expand All @@ -232,16 +273,23 @@ async def serial_test_mavp2p(valid_output_endpoints: Set[Endpoint], valid_master
EndpointType.Serial,
]
allowed_master_types = allowed_output_types

filtered_output_endpoints = set(e for e in output_endpoints if e.connection_type in allowed_output_types)
filtered_master_endpoints = set(e for e in master_endpoints if e.connection_type in allowed_master_types)

await run_common_routing_tests(
mavp2p, allowed_output_types, allowed_master_types, valid_output_endpoints, valid_master_endpoints
mavp2p, allowed_output_types, allowed_master_types, filtered_output_endpoints, filtered_master_endpoints
)


@pytest.mark.timeout(120)
@pytest.mark.asyncio
async def test_router(valid_output_endpoints: Set[Endpoint], valid_master_endpoints: Set[Endpoint]) -> None:
for router in [serial_test_mavlink_router, serial_test_mavp2p]:
await router(valid_output_endpoints, valid_master_endpoints)
# Run both router tests in parallel
await asyncio.gather(
serial_test_mavlink_router(valid_output_endpoints, valid_master_endpoints),
serial_test_mavp2p(valid_output_endpoints, valid_master_endpoints),
)


@pytest.mark.timeout(10)
Expand Down

0 comments on commit 104dc15

Please sign in to comment.