-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): Add modules api to hardware_control
Closes #2237
- Loading branch information
Showing
16 changed files
with
900 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
import re | ||
from typing import List, Optional, Tuple | ||
|
||
from .mod_abc import AbstractModule | ||
# Must import tempdeck and magdeck (and other modules going forward) so they | ||
# actually create the subclasses | ||
from . import update, tempdeck, magdeck # noqa(W0611) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class UnsupportedModuleError(Exception): | ||
pass | ||
|
||
|
||
class AbsentModuleError(Exception): | ||
pass | ||
|
||
|
||
# mypy isn’t quite expressive enough to handle what we’re doing here, which | ||
# is get all the class objects that are subclasses of an abstract module | ||
# (strike 1) and call a classmethod on them (strike 2) and actually store | ||
# the class objects (strike 3). So, type: ignore | ||
MODULE_TYPES = {cls.name(): cls | ||
for cls in AbstractModule.__subclasses__()} # type: ignore | ||
|
||
|
||
def build(port: str, which: str, simulate: bool) -> AbstractModule: | ||
return MODULE_TYPES[which].build(port, simulate) | ||
|
||
|
||
def discover() -> List[Tuple[str, str]]: | ||
""" Scan for connected modules and instantiate handler classes | ||
""" | ||
if os.environ.get('RUNNING_ON_PI') and os.path.isdir('/dev/modules'): | ||
devices = os.listdir('/dev/modules') | ||
else: | ||
devices = [] | ||
|
||
discovered_modules = [] | ||
|
||
module_port_regex = re.compile('|'.join(MODULE_TYPES.keys()), re.I) | ||
for port in devices: | ||
match = module_port_regex.search(port) | ||
if match: | ||
name = match.group().lower() | ||
if name not in MODULE_TYPES: | ||
log.warning("Unexpected module connected: {} on {}" | ||
.format(name, port)) | ||
continue | ||
absolute_port = '/dev/modules/{}'.format(port) | ||
discovered_modules.append((absolute_port, name)) | ||
log.info('Discovered modules: {}'.format(discovered_modules)) | ||
|
||
return discovered_modules | ||
|
||
|
||
class UpdateError(RuntimeError): | ||
def __init__(self, msg): | ||
self.msg = msg | ||
|
||
|
||
async def update_firmware( | ||
module: AbstractModule, | ||
firmware_file: str, | ||
loop: Optional[asyncio.AbstractEventLoop]) -> AbstractModule: | ||
""" Update a module. | ||
If the update succeeds, an Module instance will be returned. | ||
Otherwise, raises an UpdateError with the reason for the failure. | ||
""" | ||
simulated = module.is_simulated | ||
cls = type(module) | ||
old_port = module.port | ||
flash_port = await module.prep_for_update() | ||
del module | ||
after_port, results = await update.update_firmware(flash_port, | ||
firmware_file, | ||
loop) | ||
await asyncio.sleep(1.0) | ||
new_port = after_port or old_port | ||
if not results[0]: | ||
raise UpdateError(results[1]) | ||
return cls.build(new_port, simulated) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
from opentrons.drivers.mag_deck import MagDeck as MagDeckDriver | ||
from . import update, mod_abc | ||
|
||
LABWARE_ENGAGE_HEIGHT = {'biorad-hardshell-96-PCR': 18} # mm | ||
MAX_ENGAGE_HEIGHT = 45 # mm from home position | ||
|
||
|
||
class MissingDevicePortError(Exception): | ||
pass | ||
|
||
|
||
class SimulatingDriver: | ||
def __init__(self): | ||
self._port = None | ||
|
||
def probe_plate(self): | ||
pass | ||
|
||
def home(self): | ||
pass | ||
|
||
def move(self, location): | ||
pass | ||
|
||
def get_device_info(self): | ||
return {'serial': 'dummySerial', | ||
'model': 'dummyModel', | ||
'version': 'dummyVersion'} | ||
|
||
def connect(self, port): | ||
pass | ||
|
||
def disconnect(self): | ||
pass | ||
|
||
def enter_programming_mode(self): | ||
pass | ||
|
||
|
||
class MagDeck(mod_abc.AbstractModule): | ||
""" | ||
Under development. API subject to change | ||
""" | ||
@classmethod | ||
def build(cls, port, simulating=False): | ||
mod = cls(port, simulating) | ||
mod._connect() | ||
return mod | ||
|
||
@classmethod | ||
def name(cls) -> str: | ||
return 'magdeck' | ||
|
||
def __init__(self, port, simulating): | ||
self._engaged = False | ||
self._port = port | ||
if simulating: | ||
self._driver = SimulatingDriver() | ||
else: | ||
self._driver = MagDeckDriver() | ||
self._device_info = None | ||
|
||
def calibrate(self): | ||
""" | ||
Calibration involves probing for top plate to get the plate height | ||
""" | ||
self._driver.probe_plate() | ||
# return if successful or not? | ||
self._engaged = False | ||
|
||
def engage(self, height): | ||
""" | ||
Move the magnet to a specific height, in mm from home position | ||
""" | ||
if height > MAX_ENGAGE_HEIGHT or height < 0: | ||
raise ValueError('Invalid engage height. Should be 0 to {}'.format( | ||
MAX_ENGAGE_HEIGHT)) | ||
self._driver.move(height) | ||
self._engaged = True | ||
|
||
def disengage(self): | ||
""" | ||
Home the magnet | ||
""" | ||
self._driver.home() | ||
self._engaged = False | ||
|
||
@property | ||
def device_info(self): | ||
""" | ||
Returns a dict: | ||
{ 'serial': 'abc123', 'model': '8675309', 'version': '9001' } | ||
""" | ||
return self._device_info | ||
|
||
@property | ||
def status(self): | ||
return 'engaged' if self._engaged else 'disengaged' | ||
|
||
@property | ||
def live_data(self): | ||
return { | ||
'status': self.status, | ||
'data': {} | ||
} | ||
|
||
@property | ||
def port(self): | ||
return self._port | ||
|
||
@property | ||
def is_simulated(self): | ||
return isinstance(self._driver, SimulatingDriver) | ||
|
||
# Internal Methods | ||
|
||
def _connect(self): | ||
""" | ||
Connect to the serial port | ||
""" | ||
self._driver.connect(self._port) | ||
self._device_info = self._driver.get_device_info() | ||
|
||
def _disconnect(self): | ||
""" | ||
Disconnect from the serial port | ||
""" | ||
if self._driver: | ||
self._driver.disconnect() | ||
|
||
def __del__(self): | ||
self._disconnect() | ||
|
||
async def prep_for_update(self) -> str: | ||
new_port = await update.enter_bootloader(self._driver, | ||
self.device_info['model']) | ||
return new_port or self.port |
Oops, something went wrong.