diff --git a/api/src/opentrons/hardware_control/__init__.py b/api/src/opentrons/hardware_control/__init__.py index 0849bb67fbc..a426cbbb821 100644 --- a/api/src/opentrons/hardware_control/__init__.py +++ b/api/src/opentrons/hardware_control/__init__.py @@ -306,6 +306,17 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]: plunger_ax: self._current_position[plunger_ax] } + def gantry_position(self, mount: top_types.Mount) -> top_types.Point: + """ Return the position of the critical point as pertains to the gantry + + This ignores the plunger position and gives the Z-axis a predictable + name (as :py:attr:`.Point.z`). + """ + cur_pos = self.current_position(mount) + return top_types.Point(x=cur_pos[Axis.X], + y=cur_pos[Axis.Y], + z=cur_pos[Axis.by_mount(mount)]) + @_log_call async def move_to( self, mount: top_types.Mount, abs_position: top_types.Point, diff --git a/api/src/opentrons/protocol_api/__init__.py b/api/src/opentrons/protocol_api/__init__.py index 28a1bb6b47b..3622c31bb2b 100644 --- a/api/src/opentrons/protocol_api/__init__.py +++ b/api/src/opentrons/protocol_api/__init__.py @@ -7,7 +7,7 @@ import logging import os -from . import back_compat +from . import back_compat, labware from .contexts import ProtocolContext, InstrumentContext @@ -45,4 +45,8 @@ def run(protocol_bytes: bytes = None, pass -__all__ = ['run', 'ProtocolContext', 'InstrumentContext', 'back_compat'] +__all__ = ['run', + 'ProtocolContext', + 'InstrumentContext', + 'back_compat', + 'labware'] diff --git a/api/src/opentrons/protocol_api/contexts.py b/api/src/opentrons/protocol_api/contexts.py index 96d516a6287..cf63cc5a409 100644 --- a/api/src/opentrons/protocol_api/contexts.py +++ b/api/src/opentrons/protocol_api/contexts.py @@ -38,6 +38,7 @@ def __init__(self, self._instruments: Dict[types.Mount, Optional[InstrumentContext]]\ = {mount: None for mount in types.Mount} self._last_moved_instrument: Optional[types.Mount] = None + self._location_cache: Optional[types.Location] = None self._hardware = self._build_hardware_adapter(self._loop) self._log = MODULE_LOG.getChild(self.__class__.__name__) @@ -197,24 +198,41 @@ def update_config(self, **kwargs): self._hardware.update_config(**kwargs) def move_to(self, mount: types.Mount, - location: geometry.Location, - strategy: types.MotionStrategy): - where = geometry.point_from_location(location) - if self._last_moved_instrument\ - and self._last_moved_instrument != mount: + location: types.Location): + """ Implement motions of the robot. + + This should not need to be called by the user; it is called by + :py:meth:`InstrumentContext.move_to` (and thus all other + :py:class:`InstrumentContext` methods that involve moving, such as + :py:meth:`InstrumentContext.aspirate`) to move the pipettes around. + + It encapsulates location caching and ensures that all moves are safe. + It does this by taking a :py:class:`.types.Location` that can have + a position attached to it, and its behavior depends on the state of + that location cache and the passed location. + """ + switching_instr = self._last_moved_instrument\ + and self._last_moved_instrument != mount + if switching_instr: # TODO: Is 10 the right number here? This is what’s used in # robot since it’s a default to an argument that is never # changed self._log.debug("retract {}".format(self._last_moved_instrument)) self._hardware.retract(self._last_moved_instrument, 10) - if strategy == types.MotionStrategy.DIRECT: - self._hardware.move_to(mount, where) - self._log.debug("move {} direct {}".format(mount.name, where)) + + if self._location_cache and not switching_instr: + from_loc = self._location_cache else: - self._log.debug("move {} arc {}".format(mount.name, where)) - self._log.warn( - "Arc moves are not implemented, following back to direct") - self._hardware.move_to(mount, where) + from_loc = types.Location( + point=self._hardware.gantry_position(mount), + labware=None) + moves = geometry.plan_moves(from_loc, location, self._deck_layout) + self._log.debug("planned moves for {}->{}: {}" + .format(from_loc, location, moves)) + self._location_cache = location + self._last_moved_instrument = mount + for move in moves: + self._hardware.move_to(mount, move) def home(self): """ Homes the robot. @@ -261,7 +279,7 @@ def __init__(self, def aspirate(self, volume: float = None, - location: geometry.Location = None, + location: types.Location = None, rate: float = 1.0): """ Aspirate a volume of liquid (in microliters/uL) using this pipette @@ -271,58 +289,36 @@ def aspirate(self, from its current position. If only a location is passed, :py:meth:`aspirate` will default to its :py:attr:`max_volume`. - The location may be a :py:class:`.Well`, or a specific position in - relation to a :py:class:`.Well`, such as the return value of - :py:meth:`.Well.top`. If a :py:class:`.Well` is specified without - calling a position method (such as :py:meth:`.Well.top` or - :py:meth:`.Well.bottom`), this method will aspirate from the bottom - of the well. + If the :py:class:`.types.Location` passed in `location` has an + associated labware, that labware will be saved until another motion + is commanded. This is used to optimize motions - for instance, moving + between two wells requires much less Z-distance to avoid collisions + than moving between two pieces of labware. :param volume: The volume to aspirate, in microliters. If not specified, :py:attr:`max_volume`. :type volume: int or float - :param location: Where to aspirate from. A :py:class:`.Well` or - position (e.g. the return value from - :py:meth:`.Well.top` or :py:meth:`.Well.bottom`). If - unspecified, the current position. For advanced usage, - an (x, y, z) tuple or instance of - :py:class:`types.Point` may be passed. This is a - location in :ref:`protocol-api-deck-coords`. - :type location: Well or tuple[float, float, float] or types.Point + :param location: Where to aspirate from. If unspecified, the + current position. :param rate: The relative plunger speed for this aspirate. During this aspirate, the speed of the plunger will be `rate` * :py:attr:`aspirate_speed`. If not specified, defaults to 1.0 (speed will not be modified). :type rate: float :returns: This instance. - - Examples - -------- - .. - >>> from opentrons import instruments, labware, robot # doctest: +SKIP - >>> robot.reset() # doctest: +SKIP - >>> plate = labware.load('96-flat', '2') # doctest: +SKIP - >>> p300 = instruments.P300_Single(mount='right') # doctest: +SKIP - >>> p300.pick_up_tip() # doctest: +SKIP - # aspirate 50uL from a Well - >>> p300.aspirate(50, plate[0]) # doctest: +SKIP - # aspirate 50uL from the center of a well - >>> p300.aspirate(50, plate[1].bottom()) # doctest: +SKIP - >>> # aspirate 20uL in place, twice as fast - >>> p300.aspirate(20, rate=2.0) # doctest: +SKIP - >>> # aspirate the pipette's remaining volume (80uL) from a Well - >>> p300.aspirate(plate[2]) # doctest: +SKIP - """ - where = self._get_point_and_cache(location, 'bottom') + """ self._log.debug("aspirate {} from {} at {}" - .format(volume, where, rate)) - self._ctx.move_to(self._mount, where, types.MotionStrategy.ARC) + .format(volume, + location if location else 'current position', + rate)) + if location: + self.move_to(location) self._hardware.aspirate(self._mount, volume, rate) return self def dispense(self, volume: float = None, - location: geometry.Location = None, + location: types.Location = None, rate: float = 1.0): """ Dispense a volume of liquid (in microliters/uL) using this pipette @@ -342,44 +338,21 @@ def dispense(self, :param volume: The volume of liquid to dispense, in microliters. If not specified, defaults to :py:attr:`current_volume`. :type volume: int or float - :param location: Where to dispense into. A :py:class:`.Well` or - position (e.g. the return value from - :py:meth:`.Well.top` or :py:meth:`.Well.bottom`). If - unspecified, the bottom of the current well. For - advanced usage, an `(x, y, z)` tuple or instance of - :py:class:`types.Point` may be passed containing a - location in :ref:`protocol-api-deck-coords`. - :type location: .Well or tuple[float, float, float] or types.Point + :param location: Where to dispense into. If unspecified, the + current position. :param rate: The relative plunger speed for this aspirate. During this aspirate, the speed of the plunger will be `rate` * :py:attr:`aspirate_speed`. If not specified, defaults to 1.0 (speed will not be modified). :type rate: float :returns: This instance. - - Examples - -------- - .. - >>> from opentrons import instruments, labware, robot # doctest: +SKIP - >>> robot.reset() # doctest: +SKIP - >>> plate = labware.load('96-flat', '3') # doctest: +SKIP - >>> p300 = instruments.P300_Single(mount='left') # doctest: +SKIP - # fill the pipette with liquid (200uL) - >>> p300.aspirate(plate[0]) # doctest: +SKIP - # dispense 50uL to a Well - >>> p300.dispense(50, plate[0]) # doctest: +SKIP - # dispense 50uL to the center of a well - >>> relative_vector = plate[1].center() # doctest: +SKIP - >>> p300.dispense(50, (plate[1], relative_vector)) # doctest: +SKIP - # dispense 20uL in place, at half the speed - >>> p300.dispense(20, rate=0.5) # doctest: +SKIP - # dispense the pipette's remaining volume (80uL) to a Well - >>> p300.dispense(plate[2]) # doctest: +SKIP - """ - where = self._get_point_and_cache(location, 'bottom') + """ self._log.debug("dispense {} from {} at {}" - .format(volume, where, rate)) - self._ctx.move_to(self._mount, where, types.MotionStrategy.ARC) + .format(volume, + location if location else 'current position', + rate)) + if location: + self.move_to(location) self._hardware.dispense(self._mount, volume, rate) return self @@ -446,55 +419,14 @@ def transfer(self, **kwargs): raise NotImplementedError - def move_to(self, - location: geometry.Location, - strategy: Union[types.MotionStrategy, str] = None): + def move_to(self, location: types.Location): """ Move this pipette to a specific location on the deck. - :param location: Where to move to. This can be a :py:class:`.Well`, in - which case the pipette will move to its - :py:meth:`.Well.top`; a :py:class:`Labware`, in which - case the pipette will move to the top of its first - well; or a position, whether specified by the result - of a call to something like :py:class:`.Well.top` or - directly specified as a `tuple` or - :py:attr:`types.Point` in - :ref:`protocol-api-deck-coords`. - :type location: Well or tuple[float, float, float] or types.Point - :param strategy: How to move. This can be a member of - :py:class:`types.MotionStrategy` or one of the strings - `'arc'` and `'direct'` (with any capitalization). - The `'arc'` strategy (default) will pick the head up - on Z axis, then move over to the XY destination, then - finally down to the Z destination. This avoids any - obstacles, like labware, and is suitable for moving - around between different areas on the deck. The - `'direct'` strategy will simply move in a straight - line from the current position to the destination, - and is suitable for smaller motions, for instance - plate streaking or custom touch tip implementations. + :param location: Where to move to. :raises ValueError: if an argument is incorrect. """ - if None is strategy: - strategy = types.MotionStrategy.DIRECT - if strategy not in types.MotionStrategy: - # ignore the type here because mypy isn’t quite good enough - # to catch that if strategy is not in types.MotionStrategy - # it definitely isn’t an instance of types.MotionStrategy - name = strategy.upper() # type: ignore - try: - checked_strategy = types.MotionStrategy[name] - except KeyError: - raise ValueError( - "invalid motion strategy {}. Please use 'arc', 'direct'" - "opentrons.types.MotionStrategy.ARC or " - "opentrons.types.MotionStrategy.DIRECT" - .format(strategy)) - else: - # Same reason for the type: ignore as above - checked_strategy = strategy # type: ignore - where = self._get_point_and_cache(location, 'top') - self._ctx.move_to(self._mount, where, checked_strategy) + self._log.debug("move to {}".format(location)) + self._ctx.move_to(self._mount, location) return self @property @@ -613,62 +545,6 @@ def hw_pipette(self) -> Optional[Dict[str, Any]]: """ return self._hardware.attached_instruments[self._mount] - def _get_point_and_cache(self, - location: Optional[geometry.Location], - accessor: str) -> types.Point: - """ Take a location and turn it into a point, caching the labware. - - This method resolves a :py:class:`.Point` in absolute coordinates. - If given a :py:class:`.Point` (or a 3-tuple of coordinates) it will - use the input directly; otherwise it will attempt to resolve a - :py:class:`.Well` (by taking the first well of a passed - :py:class:`.Labware` if necessary) and use the specified `accessor` - on it. - - If `location` is `None` and there is a cached location, the cache - will be used. - - If a :py:class:`.Well` could be resolved (i.e. `location` was a - :py:class:`.Labware` or :py:class:`.Well` or it was `None` and there - was a cached location) the resolved :py:class:`.Well` will be cached. - If `location` did not resolve to a :py:class:`.Well` the location - cache will be invalidated. - - If `location` is `None` and nothing is cached, raises. - - :param location: The location to resolve (see above). - :param accessor: The name of the position accessor on - :py:class:`.Well` (e.g. 'top' or 'bottom') to use - on the eventually-resolved :py:class:`.Well`. - :raises RuntimeError: If `location` was `None` and no location is - cached. - """ - if None is location: - if self._last_location: - location = self._last_location - else: - raise RuntimeError( - 'Locationless move specified but no location cached') - if isinstance(location, Labware): - well: Optional[Well] = location.wells()[0] - point: types.Point = getattr(well, accessor)() - elif isinstance(location, Well): - well = location - point = getattr(well, accessor)() - elif isinstance(location, types.Point): - point = location - well = None - elif isinstance(location, tuple): - point = types.Point(location[0], location[1], location[2]) - well = None - else: - raise TypeError( - 'Bad location {}, must be None, Labware, Well, Point or tuple' - .format(location)) - - self._last_location = well - return point - def __repr__(self): return '<{}: {} in {}>'.format(self.__class__.__name__, self.hw_pipette['name'], diff --git a/api/src/opentrons/protocol_api/geometry.py b/api/src/opentrons/protocol_api/geometry.py index 0681e0fc14a..66191adddf7 100644 --- a/api/src/opentrons/protocol_api/geometry.py +++ b/api/src/opentrons/protocol_api/geometry.py @@ -1,33 +1,84 @@ from collections import UserDict +import functools import logging -from typing import Union, Tuple +from typing import List, Optional, Tuple -from opentrons.protocol_api.labware import Labware, Well from opentrons import types +from .labware import Labware, Well + MODULE_LOG = logging.getLogger(__name__) -Location = Union[Labware, Well, types.Point, Tuple[float, float, float]] +def max_many(*args): + return functools.reduce(max, args[1:], args[0]) + + +def plan_moves(from_loc: types.Location, + to_loc: types.Location, + deck: 'Deck', + well_z_margin: float = 5.0, + lw_z_margin: float = 20.0) -> List[types.Point]: + """ Plan moves between one :py:class:`.Location` and another. + + Each :py:class:`.Location` instance might or might not have a specific + kind of geometry attached. This function is intended to return series + of moves that contain the minimum safe retractions to avoid (known) + labware on the specified :py:class:`Deck`. + + :param from_loc: The last location. + :param to_loc: The location to move to. + :param deck: The :py:class:`Deck` instance describing the robot. + :param float well_z_margin: How much extra Z margin to raise the cp by over + the bare minimum to clear wells within the same + labware. Default: 5mm + :param float lw_z_margin: How much extra Z margin to raise the cp by over + the bare minimum to clear different pieces of + labware. Default: 20mm + + :returns: A list of :py:class:`.Point` to move through. + """ + + def _split_loc_labware( + loc: types.Location) -> Tuple[Optional[Labware], Optional[Well]]: + if isinstance(loc.labware, Labware): + return loc.labware, None + elif isinstance(loc.labware, Well): + return loc.labware.parent, loc.labware + else: + return None, None -def point_from_location(location: Location) -> types.Point: - """ Build a deck-abs point from anything the user passes in """ + to_point = to_loc.point + to_lw, to_well = _split_loc_labware(to_loc) + from_point = from_loc.point + from_lw, from_well = _split_loc_labware(from_loc) - # Defined with an inner function like this to make logging the result - # a bit less tedious and reasonably mypy-compliant - def _point(loc: Location) -> types.Point: - if isinstance(location, Well): - return location.top() - elif isinstance(location, Labware): - return location.wells()[0].top() - elif isinstance(location, tuple): - return types.Point(*location[:3]) + if to_lw and to_lw == from_lw: + # Two valid labwares. We’ll either raise to clear a well or go direct + if to_well and to_well == from_well: + return [to_point] else: - return location - - point = _point(location) - MODULE_LOG.debug("Location {} -> {}".format(location, point)) - return point + if to_well: + to_safety = to_well.top().point.z + well_z_margin + else: + to_safety = to_lw.highest_z + well_z_margin + if from_well: + from_safety = from_well.top().point.z + well_z_margin + else: + from_safety = from_lw.highest_z + well_z_margin + safe = max_many( + to_point.z, + from_point.z, + to_safety, + from_safety) + else: + # For now, the only fallback is to clear all known labware + safe = max_many(to_point.z, + from_point.z, + deck.highest_z + lw_z_margin) + return [from_point._replace(z=safe), + to_point._replace(z=safe), + to_point] class Deck(UserDict): @@ -65,7 +116,7 @@ def _check_name(self, key: object) -> int: else: return key_int - def __getitem__(self, key: types.DeckLocation) -> Labware: + def __getitem__(self, key: types.DeckLocation) -> 'Labware': return self.data[self._check_name(key)] def __delitem__(self, key: types.DeckLocation) -> None: @@ -77,13 +128,13 @@ def __delitem__(self, key: types.DeckLocation) -> None: for item in [lw for lw in self.data.values() if lw]: self._highest_z = max(item.wells()[0].top().z, self._highest_z) - def __setitem__(self, key: types.DeckLocation, val: Labware) -> None: + def __setitem__(self, key: types.DeckLocation, val: 'Labware') -> None: key_int = self._check_name(key) if self.data.get(key_int) is not None: raise ValueError('Deck location {} already has an item: {}' .format(key, self.data[key_int])) self.data[key_int] = val - self._highest_z = max(val.wells()[0].top().z, self._highest_z) + self._highest_z = max(val.wells()[0].top().point.z, self._highest_z) def __contains__(self, key: object) -> bool: try: diff --git a/api/src/opentrons/protocol_api/labware.py b/api/src/opentrons/protocol_api/labware.py index 05bfb182040..e4fa1fccf45 100644 --- a/api/src/opentrons/protocol_api/labware.py +++ b/api/src/opentrons/protocol_api/labware.py @@ -1,13 +1,14 @@ """This module will replace Placeable""" -import re -import os +from collections import defaultdict +from enum import Enum, auto import json +import os +import re import time from typing import List, Dict -from enum import Enum, auto -from opentrons.types import Point + +from opentrons.types import Point, Location from opentrons.util import environment as env -from collections import defaultdict class WellShape(Enum): @@ -25,7 +26,8 @@ class WellShape(Enum): class Well: def __init__( - self, well_props: dict, parent: Point, display_name: str) -> None: + self, well_props: dict, parent: Location, display_name: str)\ + -> None: """ Create a well, and track the Point corresponding to the top-center of the well (this Point is in absolute deck coordinates) @@ -37,15 +39,19 @@ def __init__( This is created by the caller and passed in, so here it is just saved and made available. :param well_props: a dict that conforms to the json-schema for a Well - :param parent: a Point representing the absolute position of the parent - of the Well (usually the lower-left corner of a labware) + :param parent: a :py:class:`.Location` Point representing the absolute + position of the parent of the Well (usually the + lower-left corner of a labware) """ self._display_name = display_name - self._position = Point( - x=well_props['x'] + parent.x, - y=well_props['y'] + parent.y, - z=well_props['z'] + well_props['depth'] + parent.z) - + self._position\ + = Point(well_props['x'], + well_props['y'], + well_props['z'] + well_props['depth']) + parent.point + + if not parent.labware: + raise ValueError("Wells must have a parent") + self._parent = parent.labware self._shape = well_shapes.get(well_props['shape']) if self._shape is WellShape.RECTANGULAR: self._length = well_props['length'] @@ -62,33 +68,37 @@ def __init__( self._depth = well_props['depth'] - def top(self) -> Point: + @property + def parent(self) -> 'Labware': + return self._parent + + def top(self) -> Location: """ :return: a Point corresponding to the absolute position of the top-center of the well relative to the deck (with the lower-left corner of slot 1 as (0,0,0)) """ - return self._position + return Location(self._position, self) - def bottom(self) -> Point: + def bottom(self) -> Location: """ :return: a Point corresponding to the absolute position of the bottom-center of the well (with the lower-left corner of slot 1 as (0,0,0)) """ top = self.top() - bottom_z = top.z - self._depth - return Point(x=top.x, y=top.y, z=bottom_z) + bottom_z = top.point.z - self._depth + return Location(Point(x=top.point.x, y=top.point.y, z=bottom_z), self) - def center(self) -> Point: + def center(self) -> Location: """ :return: a Point corresponding to the absolute position of the center of the well relative to the deck (with the lower-left corner of slot 1 as (0,0,0)) """ top = self.top() - center_z = top.z - (self._depth / 2.0) - return Point(x=top.x, y=top.y, z=center_z) + center_z = top.point.z - (self._depth / 2.0) + return Location(Point(x=top.point.x, y=top.point.y, z=center_z), self) def _from_center_cartesian( self, x: float, y: float, z: float) -> Point: @@ -121,9 +131,9 @@ def _from_center_cartesian( z_size = self._depth return Point( - x=center.x + (x * (x_size / 2.0)), - y=center.y + (y * (y_size / 2.0)), - z=center.z + (z * (z_size / 2.0))) + x=center.point.x + (x * (x_size / 2.0)), + y=center.point.y + (y * (y_size / 2.0)), + z=center.point.z + (z * (z_size / 2.0))) def __str__(self): return self._display_name @@ -135,7 +145,7 @@ def __eq__(self, other: object) -> bool: """ if not isinstance(other, Well): return NotImplemented - return self.top() == other.top() + return self.top().point == other.top().point class Labware: @@ -170,6 +180,7 @@ def __init__( self._id = definition['otId'] self._parameters = definition['parameters'] offset = definition['cornerOffsetFromSlot'] + self._dimensions = definition['dimensions'] # Inferred from definition self._ordering = [well for col in definition['ordering'] @@ -179,6 +190,7 @@ def __init__( z=offset['z'] + parent.z) # Applied properties self.set_calibration(self._calibrated_offset) + self._pattern = re.compile(r'^([A-Z]+)([1-9][0-9]*)$', re.X) def _build_wells(self) -> List[Well]: @@ -190,7 +202,7 @@ def _build_wells(self) -> List[Well]: return [ Well( self._well_definition[well], - self._calibrated_offset, + Location(self._calibrated_offset, self), "{} of {}".format(well, self._display_name)) for well in self._ordering] @@ -217,6 +229,10 @@ def set_calibration(self, delta: Point): z=self._offset.z + delta.z) self._wells = self._build_wells() + @property + def calibrated_offset(self) -> Point: + return self._calibrated_offset + def well(self, idx) -> Well: """Deprecated---use result of `wells` or `wells_by_index`""" if isinstance(idx, int): @@ -357,6 +373,16 @@ def cols(self, *args): """Deprecated--use `columns`""" return self.columns(*args) + @property + def highest_z(self) -> float: + """ + The z-coordinate of the tallest single point anywhere on the labware. + + This is drawn from the 'dimensions'/'overallHeight' elements of the + labware definition and takes into account the calibration offset. + """ + return self._dimensions['overallHeight'] + self._calibrated_offset.z + def __repr__(self): return self._display_name diff --git a/api/src/opentrons/types.py b/api/src/opentrons/types.py index 52f13dc0602..745741c5ddb 100644 --- a/api/src/opentrons/types.py +++ b/api/src/opentrons/types.py @@ -1,12 +1,17 @@ import enum -from typing import Union -from collections import namedtuple -from typing import Any +from typing import Any, NamedTuple, TYPE_CHECKING, Union -_PointTuple = namedtuple('Point', ['x', 'y', 'z']) +if TYPE_CHECKING: + from typing import (Optional, # noqa(F401) Used for typechecking + Tuple) + from .labware import Labware, Well # noqa(F401) Used for typechecking -class Point(_PointTuple): +class Point(NamedTuple): + x: float + y: float + z: float + def __eq__(self, other: Any) -> bool: if not isinstance(other, Point): return False @@ -23,14 +28,39 @@ def __sub__(self, other: Any) -> 'Point': return Point(self.x - other.x, self.y - other.y, self.z - other.z) +class Location(NamedTuple): + """ A location to target as a motion in the :ref:`protocol-api`. + + The location contains a :py:class:`.Point` (in + :ref:`protocol-api-deck-coordinates`) and possibly an associated + :py:class:`.Labware` or :py:class:`.Well` instance. + + It should rarely be constructed directly by the user; rather, it is the + return type of most :py:class:`.Well` accessors like :py:meth:`.Well.top` + and is passed directly into a method like + :py:meth:`InstrumentContext.aspirate`. + + .. warning:: + The :py:attr:`labware` attribute of this class is used by the protocol + API internals to, among other things, determine safe heights to retract + the instruments to when moving between locations. If constructing an + instance of this class manually, be sure to either specify `None` as the + labware (so the robot does its worst case retraction) or specify the + correct labware for the :py:attr:`point` attribute. + + + .. warning:: + The `==` operation compares both the position and associated labware. + If you only need to compare locations, compare the :py:attr:`point` + of each item. + """ + point: Point + labware: 'Union[Labware, Well, None]' + + class Mount(enum.Enum): LEFT = enum.auto() RIGHT = enum.auto() DeckLocation = Union[int, str] - - -class MotionStrategy(enum.Enum): - DIRECT = enum.auto() - ARC = enum.auto() diff --git a/api/tests/opentrons/protocol_api/test_accessor_fn.py b/api/tests/opentrons/protocol_api/test_accessor_fn.py index aa671a30969..6be2e340811 100644 --- a/api/tests/opentrons/protocol_api/test_accessor_fn.py +++ b/api/tests/opentrons/protocol_api/test_accessor_fn.py @@ -34,6 +34,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } } @@ -106,6 +111,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } } diff --git a/api/tests/opentrons/protocol_api/test_back_compat.py b/api/tests/opentrons/protocol_api/test_back_compat.py new file mode 100644 index 00000000000..1df61599af8 --- /dev/null +++ b/api/tests/opentrons/protocol_api/test_back_compat.py @@ -0,0 +1,40 @@ +from opentrons.protocol_api import back_compat # , ProtocolContext +from opentrons.types import Mount + + +def test_add_instrument(loop, monkeypatch): + requested_instr, requested_mount = None, None + + def fake_load(instr_name, mount): + nonlocal requested_instr + nonlocal requested_mount + requested_instr = instr_name + requested_mount = mount + + monkeypatch.setattr(back_compat.instruments._ctx, + 'load_instrument', fake_load) + + back_compat.instruments.P1000_Single('left') + assert requested_instr == 'p1000_single' + assert requested_mount == Mount.LEFT + back_compat.instruments.P10_Single('right') + assert requested_instr == 'p10_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P10_Multi('left') + assert requested_instr == 'p10_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P50_Single('right') + assert requested_instr == 'p50_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P50_Multi('left') + assert requested_instr == 'p50_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P300_Single('right') + assert requested_instr == 'p300_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P300_Multi('left') + assert requested_instr == 'p300_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P1000_Single('right') + assert requested_instr == 'p1000_single' + assert requested_mount == Mount.RIGHT diff --git a/api/tests/opentrons/protocol_api/test_context.py b/api/tests/opentrons/protocol_api/test_context.py index 70049b51ce4..fce57e52922 100644 --- a/api/tests/opentrons/protocol_api/test_context.py +++ b/api/tests/opentrons/protocol_api/test_context.py @@ -4,9 +4,7 @@ import pkgutil import opentrons.protocol_api as papi -from opentrons.protocol_api.geometry import Deck -from opentrons.protocol_api.labware import load -from opentrons.types import MotionStrategy, Mount, Point +from opentrons.types import Mount, Point, Location from opentrons.hardware_control import API from opentrons.hardware_control.types import Axis from opentrons.config.pipette_config import configs @@ -14,6 +12,7 @@ import pytest +# TODO: Remove once load_labware_by_name is implemented labware_name = 'generic_96_wellPlate_380_uL' labware_def = json.loads( pkgutil.get_data('opentrons', @@ -27,36 +26,6 @@ def dummy_load(labware): monkeypatch.setattr(papi.labware, '_load_definition_by_name', dummy_load) -def test_slot_names(load_my_labware): - slots_by_int = list(range(1, 13)) - slots_by_str = [str(idx) for idx in slots_by_int] - for method in (slots_by_int, slots_by_str): - d = Deck() - for idx, slot in enumerate(method): - lw = load(labware_name, d.position_for(slot), str(slot)) - assert slot in d - d[slot] = lw - with pytest.raises(ValueError): - d[slot] = 'not this time boyo' - del d[slot] - assert slot in d - assert d[slot] is None - - assert 'hasasdaia' not in d - with pytest.raises(ValueError): - d['ahgoasia'] = 'nope' - - -def test_highest_z(load_my_labware): - deck = Deck() - assert deck.highest_z == 0 - lw = load(labware_name, deck.position_for(1), '1') - deck[1] = lw - assert deck.highest_z == lw.wells()[0].top().z - del deck[1] - assert deck.highest_z == 0 - - def test_load_instrument(loop): ctx = papi.ProtocolContext(loop=loop) for config in configs: @@ -73,35 +42,71 @@ def test_motion(loop): ctx.connect(hardware) instr = ctx.load_instrument('p10_single', Mount.RIGHT) instr.home() - assert instr.move_to((0, 0, 0)) is instr + assert instr.move_to(Location(Point(0, 0, 0), None)) is instr assert hardware.current_position(instr._mount) == {Axis.X: 0, Axis.Y: 0, Axis.A: 0, Axis.C: 19} -def test_location_parsing(loop, load_my_labware): +def test_location_cache(loop, monkeypatch, load_my_labware): + hardware = API.build_hardware_simulator(loop=loop) ctx = papi.ProtocolContext(loop) - lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', '1') - instr = ctx.load_instrument('p10_single', Mount.RIGHT) - w0 = lw.wells()[0] - assert instr._get_point_and_cache(w0, 'top') == w0.top() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0, 'bottom') == w0.bottom() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0, 'center') == w0.center() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0.bottom(), 'top') == w0.bottom() - assert instr._last_location is None - assert instr._get_point_and_cache(lw, 'top') == lw.wells()[0].top() - assert instr._last_location == lw.wells()[0] - assert instr._get_point_and_cache(None, 'top') == w0.top() - assert instr._last_location == lw.wells()[0] - assert instr._get_point_and_cache((0, 1, 2), 'bottom') == Point(0, 1, 2) - with pytest.raises(RuntimeError): - instr._get_point_and_cache(None, 'top') - with pytest.raises(TypeError): - instr._get_point_and_cache(2, 'bottom') + ctx.connect(hardware) + right = ctx.load_instrument('p10_single', Mount.RIGHT) + left = ctx.load_instrument('p300_multi', Mount.LEFT) + lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', 1) + ctx.home() + + test_args = None + + def fake_plan_move(from_loc, to_loc, deck, + well_z_margin=None, + lw_z_margin=None): + nonlocal test_args + test_args = (from_loc, to_loc, deck, well_z_margin, lw_z_margin) + return [Point(0, 1, 10), Point(1, 2, 10), Point(1, 2, 3)] + + monkeypatch.setattr(papi.geometry, 'plan_moves', fake_plan_move) + # When we move without a cache, the from location should be the gantry + # position + right.move_to(lw.wells()[0].top()) + # The home position from hardware_control/simulator.py, taking into account + # that the right pipette is a p10 single which is a different height than + # the reference p300 single + assert test_args[0].point == Point(418, 353, 205) + assert test_args[0].labware is None + + # Once we have a location cache, that should be our from_loc + right.move_to(lw.wells()[1].top()) + assert test_args[0] == lw.wells()[0].top() + + # If we switch instruments, we should ignore the cache + here = hardware.gantry_position(Mount.LEFT) + left.move_to(lw.wells()[1].top()) + assert test_args[0].point == here + assert test_args[0].labware is None + + +def test_move_uses_arc(loop, monkeypatch, load_my_labware): + hardware = API.build_hardware_simulator(loop=loop) + ctx = papi.ProtocolContext(loop) + ctx.connect(hardware) + right = ctx.load_instrument('p10_single', Mount.RIGHT) + lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', 1) + ctx.home() + + targets = [] + + async def fake_move(mount, target_pos): + nonlocal targets + targets.append((mount, target_pos)) + monkeypatch.setattr(hardware, 'move_to', fake_move) + + right.move_to(lw.wells()[0].top()) + assert len(targets) == 3 + assert targets[-1][0] == Mount.RIGHT + assert targets[-1][1] == lw.wells()[0].top().point def test_pipette_info(loop): @@ -128,18 +133,17 @@ async def fake_hw_aspirate(mount, volume=None, rate=1.0): move_called_with = None - def fake_move(mount, loc, strat): + def fake_move(mount, loc): nonlocal move_called_with - move_called_with = (mount, loc, strat) + move_called_with = (mount, loc) monkeypatch.setattr(ctx._hardware._api, 'aspirate', fake_hw_aspirate) monkeypatch.setattr(ctx, 'move_to', fake_move) - instr.aspirate(2.0, lw) + instr.aspirate(2.0, lw.wells()[0].bottom()) assert asp_called_with == (Mount.RIGHT, 2.0, 1.0) - assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom(), - MotionStrategy.ARC) + assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom()) def test_dispense(loop, load_my_labware, monkeypatch): @@ -155,15 +159,14 @@ async def fake_hw_dispense(mount, volume=None, rate=1.0): move_called_with = None - def fake_move(mount, loc, strat): + def fake_move(mount, loc): nonlocal move_called_with - move_called_with = (mount, loc, strat) + move_called_with = (mount, loc) monkeypatch.setattr(ctx._hardware._api, 'dispense', fake_hw_dispense) monkeypatch.setattr(ctx, 'move_to', fake_move) - instr.dispense(2.0, lw) + instr.dispense(2.0, lw.wells()[0].bottom()) assert disp_called_with == (Mount.RIGHT, 2.0, 1.0) - assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom(), - MotionStrategy.ARC) + assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom()) diff --git a/api/tests/opentrons/protocol_api/test_geometry.py b/api/tests/opentrons/protocol_api/test_geometry.py new file mode 100644 index 00000000000..ddeaa843dc6 --- /dev/null +++ b/api/tests/opentrons/protocol_api/test_geometry.py @@ -0,0 +1,147 @@ +import json +import pkgutil + +import pytest + +import opentrons.protocol_api as papi +from opentrons.protocol_api.geometry import Deck, plan_moves +from opentrons.protocol_api.labware import load + +# TODO: Remove once load_labware_by_name is implemented +labware_name = 'generic_96_wellPlate_380_uL' +labware_def = json.loads( + pkgutil.get_data('opentrons', + 'shared_data/definitions2/{}.json'.format(labware_name))) + + +@pytest.fixture +def load_my_labware(monkeypatch): + def dummy_load(labware): + return labware_def + monkeypatch.setattr(papi.labware, '_load_definition_by_name', dummy_load) + + +def test_slot_names(load_my_labware): + slots_by_int = list(range(1, 13)) + slots_by_str = [str(idx) for idx in slots_by_int] + for method in (slots_by_int, slots_by_str): + d = Deck() + for idx, slot in enumerate(method): + lw = load(labware_name, d.position_for(slot), str(slot)) + assert slot in d + d[slot] = lw + with pytest.raises(ValueError): + d[slot] = 'not this time boyo' + del d[slot] + assert slot in d + assert d[slot] is None + + assert 'hasasdaia' not in d + with pytest.raises(ValueError): + d['ahgoasia'] = 'nope' + + +def test_highest_z(load_my_labware): + deck = Deck() + assert deck.highest_z == 0 + lw = load(labware_name, deck.position_for(1), '1') + deck[1] = lw + assert deck.highest_z == lw.wells()[0].top().point.z + del deck[1] + assert deck.highest_z == 0 + + +def check_arc_basic(arc, from_loc, to_loc): + """ Check the tests that should always be true for different-well moves + - we should always go only up, then only xy, then only down + - we should have three moves + """ + assert len(arc) == 3 + assert arc[0]._replace(z=0) == from_loc.point._replace(z=0) + assert arc[0].z >= from_loc.point.z + assert arc[0].z == arc[1].z + assert arc[1]._replace(z=0) == to_loc.point._replace(z=0) + assert arc[1].z >= to_loc.point.z + assert arc[2] == to_loc.point + + +def test_direct_movs(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + + same_place = plan_moves(lw1.wells()[0].top(), lw1.wells()[0].top(), deck) + assert same_place == [lw1.wells()[0].top().point] + + same_well = plan_moves(lw1.wells()[0].top(), lw1.wells()[0].bottom(), deck) + assert same_well == [lw1.wells()[0].bottom().point] + + +def test_basic_arc(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + lw2 = load(labware_name, deck.position_for(2), 'lw2') + # same-labware moves should use the smaller safe z + same_lw = plan_moves(lw1.wells()[0].top(), + lw1.wells()[8].bottom(), + deck, + 7.0, 15.0) + check_arc_basic(same_lw, lw1.wells()[0].top(), lw1.wells()[8].bottom()) + assert same_lw[0].z == lw1.wells()[0].top().point.z + 7.0 + + # different-labware moves, or moves with no labware attached, + # should use the larger safe z and the global z + different_lw = plan_moves(lw1.wells()[0].top(), + lw2.wells()[0].bottom(), + deck, + 7.0, 15.0) + check_arc_basic(different_lw, + lw1.wells()[0].top(), lw2.wells()[0].bottom()) + assert different_lw[0].z == deck.highest_z + 15.0 + + +def test_no_labware_loc(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + lw2 = load(labware_name, deck.position_for(2), 'lw2') + # Various flavors of locations without labware should work + no_lw = lw1.wells()[0].top()._replace(labware=None) + + no_from = plan_moves(no_lw, lw2.wells()[0].bottom(), deck, 7.0, 15.0) + check_arc_basic(no_from, no_lw, lw2.wells()[0].bottom()) + assert no_from[0].z == deck.highest_z + 15.0 + + no_to = plan_moves(lw1.wells()[0].bottom(), no_lw, deck, 7.0, 15.0) + check_arc_basic(no_to, lw1.wells()[0].bottom(), no_lw) + assert no_from[0].z == deck.highest_z + 15.0 + + no_well = lw1.wells()[0].top()._replace(labware=lw1) + + no_from_well = plan_moves(no_well, lw1.wells()[1].top(), deck, 7.0, 15.0) + check_arc_basic(no_from_well, no_well, lw1.wells()[1].top()) + assert no_from_well[0].z\ + == labware_def['dimensions']['overallHeight'] + 7.0 + + no_to_well = plan_moves(lw1.wells()[1].top(), no_well, deck, 7.0, 15.0) + check_arc_basic(no_to_well, lw1.wells()[1].top(), no_well) + assert no_to_well[0].z == labware_def['dimensions']['overallHeight'] + 7.0 + + +def test_arc_tall_point(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + tall_z = 100 + old_top = lw1.wells()[0].top() + tall_point = old_top.point._replace(z=tall_z) + tall_top = old_top._replace(point=tall_point) + to_tall = plan_moves(lw1.wells()[2].top(), tall_top, deck, 7.0, 15.0) + check_arc_basic(to_tall, lw1.wells()[2].top(), tall_top) + assert to_tall[0].z == tall_z + + from_tall = plan_moves(tall_top, lw1.wells()[3].top(), deck, 7.0, 15.0) + check_arc_basic(from_tall, tall_top, lw1.wells()[3].top()) + assert from_tall[0].z == tall_z + + no_well = tall_top._replace(labware=lw1) + from_tall_lw = plan_moves(no_well, lw1.wells()[4].bottom(), deck, + 7.0, 15.0) + check_arc_basic(from_tall_lw, no_well, lw1.wells()[4].bottom()) diff --git a/api/tests/opentrons/protocol_api/test_labware.py b/api/tests/opentrons/protocol_api/test_labware.py index 8855edc4039..6e919df1abc 100644 --- a/api/tests/opentrons/protocol_api/test_labware.py +++ b/api/tests/opentrons/protocol_api/test_labware.py @@ -1,7 +1,7 @@ import json import pkgutil from opentrons.protocol_api import labware -from opentrons.types import Point +from opentrons.types import Point, Location test_data = { 'circular_well_json': { @@ -27,7 +27,7 @@ def test_well_init(): - slot = Point(1, 2, 3) + slot = Location(Point(1, 2, 3), 1) well_name = 'circular_well_json' well1 = labware.Well(test_data[well_name], slot, well_name) assert well1._diameter == test_data[well_name]['diameter'] @@ -42,29 +42,31 @@ def test_well_init(): def test_top(): - slot = Point(4, 5, 6) + slot = Location(Point(4, 5, 6), 1) well_name = 'circular_well_json' well = labware.Well(test_data[well_name], slot, well_name) well_data = test_data[well_name] - expected_x = well_data['x'] + slot.x - expected_y = well_data['y'] + slot.y - expected_z = well_data['z'] + well_data['depth'] + slot.z - assert well.top() == Point(expected_x, expected_y, expected_z) + expected_x = well_data['x'] + slot.point.x + expected_y = well_data['y'] + slot.point.y + expected_z = well_data['z'] + well_data['depth'] + slot.point.z + assert well.top() == Location(Point(expected_x, expected_y, expected_z), + well) def test_bottom(): - slot = Point(7, 8, 9) + slot = Location(Point(7, 8, 9), 1) well_name = 'rectangular_well_json' well = labware.Well(test_data[well_name], slot, well_name) well_data = test_data[well_name] - expected_x = well_data['x'] + slot.x - expected_y = well_data['y'] + slot.y - expected_z = well_data['z'] + slot.z - assert well.bottom() == Point(expected_x, expected_y, expected_z) + expected_x = well_data['x'] + slot.point.x + expected_y = well_data['y'] + slot.point.y + expected_z = well_data['z'] + slot.point.z + assert well.bottom() == Location(Point(expected_x, expected_y, expected_z), + well) def test_from_center_cartesian(): - slot1 = Point(10, 11, 12) + slot1 = Location(Point(10, 11, 12), 1) well_name = 'circular_well_json' well1 = labware.Well(test_data[well_name], slot1, well_name) @@ -84,7 +86,7 @@ def test_from_center_cartesian(): assert point1.y == expected_y assert point1.z == expected_z - slot2 = Point(13, 14, 15) + slot2 = Location(Point(13, 14, 15), 1) well2_name = 'rectangular_well_json' well2 = labware.Well(test_data[well2_name], slot2, well2_name) percent2_x = -0.25 @@ -165,3 +167,24 @@ def test_backcompat(): w11 = lw.columns('2', '3', '6') assert len(w11) == 3 assert repr(w11[1][2]) == well_c3_name + + +def test_well_parent(): + labware_name = 'generic_96_wellPlate_380_uL' + labware_def = json.loads( + pkgutil.get_data('opentrons', + 'shared_data/definitions2/{}.json'.format( + labware_name))) + lw = labware.Labware(labware_def, Point(0, 0, 0), 'Test Slot') + parent = Location(Point(7, 8, 9), lw) + well_name = 'circular_well_json' + well = labware.Well(test_data[well_name], + parent, + well_name) + assert well.parent is lw + assert well.top().labware is well + assert well.top().labware.parent is lw + assert well.bottom().labware is well + assert well.bottom().labware.parent is lw + assert well.center().labware is well + assert well.center().labware.parent is lw diff --git a/api/tests/opentrons/protocol_api/test_offsets.py b/api/tests/opentrons/protocol_api/test_offsets.py index d2660cfa5dc..dabb941320b 100644 --- a/api/tests/opentrons/protocol_api/test_offsets.py +++ b/api/tests/opentrons/protocol_api/test_offsets.py @@ -39,6 +39,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } }