Source code for iotile.core.hw.hwmanager

# This file is adapted from python code released by WellDone International
# under the terms of the LGPLv3.  WellDone International's contact information is
# info@welldone.org
# http://welldone.org
#
# Modifications to this file from the original created at WellDone International
# are copyright Arch Systems Inc.
"""This file contains necessary functionality to manage the hardware"""

import time
import binascii
import logging
from queue import Empty
from typedargs.annotate import annotated, param, return_type, finalizer, docannotate, context

from iotile.core.dev.semver import SemanticVersion, SemanticVersionRange
from iotile.core.hw.exceptions import UnknownModuleTypeError
from iotile.core.exceptions import ArgumentError, HardwareError, ValidationError, TimeoutExpiredError, ExternalError
from iotile.core.dev.registry import ComponentRegistry
from iotile.core.hw.transport.adapterstream import AdapterStream
from iotile.core.dev.config import ConfigManager
from iotile.core.hw.debug import DebugManager
from iotile.core.utilities.linebuffer_ui import LinebufferUI
from iotile.core.utilities.gid import uuid_to_slug

from .proxy import TileBusProxyObject
from .app import IOTileApp


[docs]@context("HardwareManager") class HardwareManager: """ A module for managing and interacting with IOTile Hardware This context provides tools to configure, control, debug and program any IOTile module. Specific functionality can be implemented in dynamically loaded proxy objects that are designed to provide access to each IOTile. To create a HardwareManager, you need to pass a port string that describes the method to be used to connect to the IOTile device. The method should specify the name of the connection method optionally followed by a colon and any extra information possibly needed to connect using that method. Currently implemented ports are: bled112 jlink jlink:mux=ftdi virtual:...(e.g. simple) """ logger = logging.getLogger(__name__) @param("port", "string", desc="transport method to use in the format transport[:port]") @param("record", "path", desc="Optional file to record all RPC calls and responses made on this HardwareManager") def __init__(self, port=None, record=None, adapter=None): if port is None and adapter is None: try: conf = ConfigManager() port = conf.get('core:default-port') except ArgumentError: raise ArgumentError("No port given and no core:default-port config variable set", suggestion="Specify the port to use to connect to the IOTile devices") elif port is None: port = "" transport, _, arg = port.partition(':') self.transport = transport self.port = None if arg != "": self.port = arg self.stream = self._create_stream(adapter, record=record) self._stream_queue = None self._trace_queue = None self._broadcast_queue = None self._trace_data = bytearray() self._proxies = {'TileBusProxyObject': TileBusProxyObject} self._name_map = {TileBusProxyObject.ModuleName(): [TileBusProxyObject]} self._known_apps = {} self._named_apps = {} self._setup_proxies() self._setup_apps() def _setup_proxies(self): """Load in proxy module objects for all of the registered components on this system.""" # Find all of the registered IOTile components and see if we need to add any proxies for them reg = ComponentRegistry() proxy_classes = reg.load_extensions('iotile.proxy', class_filter=TileBusProxyObject, product_name="proxy_module") for name, obj in proxy_classes: proxy_key = obj.__name__ + ':' + name # awu_10/01/19 - we want to add all proxies even if duplicate but diff version # if proxy_key in self._proxies: # continue self._proxies[proxy_key] = obj # Check if this object matches a specific shortened name so that we can # automatically match a hw module to a proxy without user intervention try: short_name = obj.ModuleName() if short_name in self._name_map: self._name_map[short_name].append(obj) else: self._name_map[short_name] = [obj] except Exception: # pylint: disable=broad-except; # We don't want this to die if someone loads a misbehaving plugin self.logger.exception("Error importing misbehaving proxy object %s, skipping.", obj) def _setup_apps(self): """Load in all iotile app objects for all registered or installed components on this system.""" reg = ComponentRegistry() app_classes = reg.load_extensions('iotile.app', class_filter=IOTileApp, product_name="app_module") for _name, app in app_classes: try: matches = app.MatchInfo() name = app.AppName() for tag, ver_range, quality in matches: if tag not in self._known_apps: self._known_apps[tag] = [] self._known_apps[tag].append((ver_range, quality, app)) if name in self._named_apps: self.logger.warning("Added an app module with an existing name, overriding previous app, name=%s", name) self._named_apps[name] = app except Exception: #pylint: disable=broad-except; # We don't want this to die if someone loads a misbehaving plugin self.logger.exception("Error importing misbehaving app module %s, skipping.", app) @param("address", "integer", "positive", desc="numerical address of module to get") @param("basic", "bool", desc="return a basic global proxy rather than a specialized one") @param("force", "str", desc="Explicitly set the 6-character ID to match against") @param("uuid", "integer", desc="UUID of the device we would like to connect to") def get(self, address, basic=False, force=None, uuid=None): """Create a proxy object for a tile by address. The correct proxy object is determined by asking the tile for its status information and looking up the appropriate proxy in our list of installed proxy objects. If you want to send raw RPCs, you can get a basic TileBusProxyObject by passing basic=True. """ if basic is True and force is not None: raise ArgumentError("You cannot conbine basic and force, they have opposite effects") if force is not None and len(force) != 6: raise ArgumentError("You must specify a 6 character name when using the force parameter", force=force) if uuid is not None: self.connect(uuid) tile = self._create_proxy('TileBusProxyObject', address) if basic: return tile name = tile.tile_name() version = tile.tile_version() if force is not None: name = force # Now create the appropriate proxy object based on the name and version of the tile tile_type = self.get_proxy(name, version) if tile_type is None: raise HardwareError("Could not find proxy object for tile", name="'{}'".format(name), known_names=self._name_map.keys()) tile = tile_type(self.stream, address) tile._hwmanager = self return tile @docannotate def app(self, name=None, path=None, uuid=None): """Find the best IOTileApp for the device we are connected to. Apps are matched by looking at the app tag and version information specified by the connected device. If no installed app matches, an exception will be thrown. You can also force the matching of a specific app by using the name parameter. Args: name (str): Optional name of the app that you wish to load. path (str): Optional path to a python file containing the app that you wish to load. uuid (int): Optional uuid of device to directly connect to. Passing this parameter is equivalent to calling ``connect`` before calling this method Returns: IOTileApp show-as context: The IOTileApp class that was loaded for this device. """ if name is not None and path is not None: raise ArgumentError("You cannot specify both an app name and an app path", name=name, path=path) if uuid is not None: self.connect(uuid) # We perform all app matching by asking the device's controller for its app and os info tile = self._create_proxy('TileBusProxyObject', 8) device_id, os_info, app_info = tile.rpc(0x10, 0x08, result_format="L8xLL") os_tag = os_info & ((1 << 20) - 1) os_version_str = '%d.%d.%d' % ((os_info >> 26) & ((1 << 6) - 1), (os_info >> 20) & ((1 << 6) - 1), 0) app_tag = app_info & ((1 << 20) - 1) app_version_str = '%d.%d.%d' % ((app_info >> 26) & ((1 << 6) - 1), (app_info >> 20) & ((1 << 6) - 1), 0) os_version = SemanticVersion.FromString(os_version_str) app_version = SemanticVersion.FromString(app_version_str) app_class = None # If name includes a .py, assume that it points to python file and try to load that. if name is None and path is not None: _name, app_class = ComponentRegistry().load_extension(path, class_filter=IOTileApp, unique=True) elif name is not None: app_class = self._named_apps.get(name) else: best_match = None matching_tags = self._known_apps.get(app_tag, []) for (ver_range, quality, app) in matching_tags: if ver_range.check(app_version): if best_match is None: best_match = (quality, app) elif quality > best_match[0]: best_match = (quality, app) if best_match is not None: app_class = best_match[1] if app_class is None: raise HardwareError("Could not find matching application for device", app_tag=app_tag, explicit_app=name, installed_apps=[x for x in self._named_apps]) app = app_class(self, (app_tag, app_version), (os_tag, os_version), device_id) return app
[docs] @param("uuid", "integer", desc="UUID of the device we would like to connect to") def controller(self, uuid=None): """Find an attached IOTile controller and attempt to connect to it.""" if uuid is not None: self.connect(uuid) return self.get(8)
@param("device_uuid", "integer", desc="UUID of the device we would like to connect to") @param("wait", "float", desc="Time to wait for devices to show up before connecting") def connect(self, device_uuid, wait=None): """Attempt to connect to a device by its UUID""" self.stream.connect(device_uuid, wait=wait)
[docs] @param("connection_string", "string", desc="opaque connection string indicating which device") def connect_direct(self, connection_string): """Attempt to connect to a device using a connection string""" self.stream.connect_direct(connection_string)
@annotated def disconnect(self): """Attempt to disconnect from a device.""" self._trace_queue = None self._stream_queue = None self.stream.disconnect() @param("connection_string", "string", desc="opaque connection string indicating which device") def debug(self, connection_string=None): """Prepare the device for debugging if supported. Some transport mechanisms support a low level debug channel that permits recovery and test operations such as erasing and forcibly reprogramming a device or dumping memory. If no debug operations are supported, this function will raise an exception. If you pass a connection_string to this method to force a connection to a device directly, it will be opened without the RPC interface being opened. If you need to subsequently send RPCs after performing the debug actions, you will need to disconnect from the device and reconnect normally (using ``connect`` or ``connect_direct``) first. """ if connection_string is not None: self.stream.connect_direct(connection_string, no_rpc=True) self.stream.enable_debug() return DebugManager(self.stream) @return_type("bool") def heartbeat(self): """Check if we still have a connection to the DeviceAdapter.""" result = self.stream.debug_command('heartbeat') return result.get('alive') @annotated def enable_broadcasting(self): """Enable the collection of broadcast IOTile reports. Broadcast reports contain tagged readings from an IOTile device but are sent without a connection to that device. The specific method that is used to broadcast the report varies by connection technology but it could be, e.g., a bluetooth low energy advertising packet. By default all broadcast reports are dropped unless you call enable_broadcasting to allocate a queue to receive them. There does not need to be an active connection for you to call enable_broadcasting. Once you call enable_broadcasting, it remains in effect for the duration of this HardwareManager object. """ self._broadcast_queue = self.stream.enable_broadcasting()
[docs] @annotated def enable_streaming(self): """Enable streaming of report data from the connected device. This function will create an internal queue to receive and store reports until the user looks at them and then inform the connected IOTile device that is should begin streaming data. This is done by telling the underlying DeviceAdapter managing the communication with the device that it should open the device's streaming interface. There is currently no way to close the streaming interface except by disconnecting from the device and then reconnecting to it. """ self._stream_queue = self.stream.enable_streaming()
@annotated def enable_tracing(self): """Enable tracing of realtime debug information over this interface.""" self._trace_queue = self.stream.enable_tracing() @return_type("integer") def count_reports(self): """Return the current size of the reports queue""" if self._stream_queue is None: return 0 return self._stream_queue.qsize() @docannotate def watch_broadcasts(self, whitelist=None, blacklist=None): """Spawn an interactive terminal UI to watch broadcast data from devices. Devices are allowed to post a broadcast report containing stream data. This function will create a list in your console window with the latest broadcast value from each device in range. Args: whitelist (list(integer)): Only include devices with these listed ids. blacklist (list(integer)): Include every device **except** those with these specific ids. If combined with whitelist, whitelist wins and this parameter has no effect. """ title = "Watching Broadcast Reports (Ctrl-C to Stop)" subtitle = "" if self.transport == 'bled112': reg = ConfigManager() if not reg.get('bled112:active-scan'): subtitle = "Active Scanning not active, you won't see v1 broadcasts" if whitelist is not None: whitelist = set(whitelist) if blacklist is not None: blacklist = set(blacklist) def _title(_items): return [title, subtitle] def _poll(): results = [x for x in self.iter_broadcast_reports(blocking=False)] return results def _text(item): fmt_uuid = "%08X" % item.origin fmt_uuid = fmt_uuid[:4] + '-' + fmt_uuid[4:] reading = item.visible_readings[0] return "{0: <15} stream: {1: 04X} value: {2: <8}".format(fmt_uuid, reading.stream, reading.value) def _sort_order(item): return item.origin def _hash(item): uuid = item.origin stream_id = item.visible_readings[0].stream if whitelist is not None and uuid not in whitelist: return None if blacklist is not None and whitelist is None and uuid in blacklist: return None item_id = str(uuid) + "," + str(stream_id) return item_id line_ui = LinebufferUI(_poll, _hash, _text, sortkey_func=_sort_order, title=_title) line_ui.run() @docannotate def watch_scan(self, whitelist=None, blacklist=None, sort="id"): """Spawn an interactive terminal UI to watch scan results. This is just a fancy way of calling scan() repeatedly and deduplicating results per device so that each one has a static place on the screen. You can decide how you want to order the results with the sort parameter. If you pick "id", the default, then results will have a largely static order based on the UUID of each device so that there will not be too much screen churn. Args: whitelist (list(integer)): Only include devices with these listed ids. blacklist (list(integer)): Include every device **except** those with these specific ids. If combined with whitelist, whitelist wins and this parameter has no effect. sort (str): The specific way to sort the list on the screen. Options are id, signal. Defaults to id. """ if whitelist is not None: whitelist = set(whitelist) if blacklist is not None: blacklist = set(blacklist) def _title(items): return ["Realtime Scan: %d Devices in Range" % len(items)] def _poll(): return self.scan() def _text(item): fmt_uuid = "%08X" % item['uuid'] fmt_uuid = fmt_uuid[:4] + '-' + fmt_uuid[4:] return "{0: <15} signal: {1: <7d} connected: {2: <8}".format(fmt_uuid, item['signal_strength'], str(item.get('user_connected', 'unk'))) def _sort_order(item): if sort == "signal": return -item['signal_strength'] return item['uuid'] def _hash(item): uuid = item['uuid'] if whitelist is not None and uuid not in whitelist: return None if blacklist is not None and whitelist is None and uuid in blacklist: return None return uuid line_ui = LinebufferUI(_poll, _hash, _text, sortkey_func=_sort_order, title=_title) line_ui.run() @docannotate def watch_reports(self, whitelist=None, blacklist=None): """Spawn an interactive terminal UI to watch reports once connected to a device. Args: whitelist (list(integer)): Only include streams with these listed ids. blacklist (list(integer)): Include every stream **except** those with these specific ids. If combined with whitelist, whitelist wins and this parameter has no effect. """ if whitelist is not None: whitelist = set(whitelist) if blacklist is not None: blacklist = set(blacklist) def _title(items): base = "Watching Report for Device ID " if items: base = base + str(items[list(items.keys())[0]].object.origin) meta = "{:15s} {:4s} {:8s}".format("Last Timestamp", "Stream ID", "Stream Value") return [base, meta] def _poll(): results = [x for x in self.iter_reports(blocking=False)] return results def _text(item): reading = item.visible_readings[0] return "{0:<15} {1:04X} value: {2:<8}".format(reading.raw_time, reading.stream, reading.value) def _sort_order(item): return item.origin def _hash(item): stream = item.visible_readings[0].stream if whitelist is not None and stream not in whitelist: return None if blacklist is not None and whitelist is None and stream in blacklist: return None return stream if not self.stream.connected: print("Not connected to a device. Please connect first") return if not self._stream_queue: print("Enable streaming to watch reports") return line_ui = LinebufferUI(_poll, _hash, _text, sortkey_func=_sort_order, title=_title) line_ui.run() @return_type("string") @param("encoding", "string", desc="The encoding to use to dump the trace, either 'hex' or 'raw'") def dump_trace(self, encoding): """Dump all received tracing data currently received from the device to stdout The data is encoded per the encoding parmeter which must be either the string 'hex' or 'raw'. If hex is passed, the data is printed as hex digits, if raw is passed, the data is printed as received from the device. """ if encoding not in ['raw', 'hex']: raise ValidationError("Unknown encoding type specified in dump trace", encoding=encoding, known_encodings=['hex', 'raw']) if self._trace_queue is None: return "" self._accumulate_trace() if encoding == 'raw': return bytes(self._trace_data) return binascii.hexlify(self._trace_data).decode('utf-8') def wait_trace(self, size, timeout=None, drop_before=False, progress_callback=None): """Wait for a specific amount of tracing data to be received. This function is the equivalent of wait_reports for streaming data. It allows you to block until a specific amount of tracing data has been received. The optional timeout parameter allows you to stop waiting if you never receive enough tracing data after a specific amount of time. Args: size (int): The number of bytes to wait for. timeout (float): The maximum number of seconds to wait for these bytes to be received. drop_before (bool): Truncate all data received until now before waiting for size bytes. progress_callback (callable): An optional progress callback that is called periodically with updates. It should have the signature progress_callback(received_byte_count, total_byte_count) Returns: bytearray: The raw trace data obtained. """ if drop_before: self._trace_data = bytearray() if progress_callback is None: progress_callback = lambda x, y: None if len(self._trace_data) >= size: progress_callback(size, size) data = self._trace_data[:size] self._trace_data = self._trace_data[size:] return data progress_callback(len(self._trace_data), size) start = time.time() while len(self._trace_data) < size: progress_callback(len(self._trace_data), size) self._accumulate_trace() time.sleep(0.1) now = time.time() if timeout is not None and ((now - start) > timeout): raise TimeoutExpiredError("Timeout waiting for tracing data", expected_size=size, received_size=len(self._trace_data), timeout=timeout) progress_callback(size, size) data = self._trace_data[:size] self._trace_data = self._trace_data[size:] return data def _accumulate_trace(self): """Copy tracing data from trace queue into _trace_data""" if self._trace_queue is None: return try: while True: blob = self._trace_queue.get(block=False) self._trace_data += bytearray(blob) except Empty: pass def iter_broadcast_reports(self, blocking=False): """Iterate over broadcast reports that have been received. This function is designed to allow the creation of dispatch or processing systems that process broadcast reports as they come in. Args: blocking (bool): Whether to stop when there are no more readings or block and wait for more. """ if self._broadcast_queue is None: return try: while True: yield self._broadcast_queue.get(block=blocking) except Empty: pass def wait_broadcast_reports(self, num_reports, timeout=2.0): """Wait until a specific number of broadcast reports have been received. Args: num_reports (int): The number of reports to wait for timeout (float): The maximum number of seconds to wait without receiving another report. """ if self._broadcast_queue is None: raise ExternalError("You have to enable broadcasting before you can wait for broadcast reports") reports = [] for i in range(0, num_reports): try: report = self._broadcast_queue.get(timeout=timeout) reports.append(report) except Empty: raise TimeoutExpiredError("Timeout waiting for a report", expected_number=num_reports, received_number=i, received_reports=reports) return reports
[docs] def iter_reports(self, blocking=False): """Iterate over reports that have been received. If blocking is True, this iterator will never stop. Otherwise it will iterate over all reports currently in the queue (and those added during iteration) Args: blocking (bool): Whether to stop when there are no more readings or block and wait for more. """ if self._stream_queue is None: return try: while True: yield self._stream_queue.get(block=blocking) except Empty: pass
def wait_reports(self, num_reports, timeout=2.0): """Wait for a fixed number of reports to be received Args: num_reports (int): The number of reports to wait for timeout (float): The maximum number of seconds to wait without receiving another report. """ if self._stream_queue is None: raise ExternalError("You have to enable streaming before you can wait for reports") reports = [] for i in range(0, num_reports): try: report = self._stream_queue.get(timeout=timeout) reports.append(report) except Empty: raise TimeoutExpiredError("Timeout waiting for a report", expected_number=num_reports, received_number=i, received_reports=reports) return reports def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.stream.close() return False @finalizer def close(self): """Stop and close this HardwareManager. This method will stop all background device activity and prevent any further usage of this HardwareManager object. If RPCs are being recorded, this will also save the recording to a file. """ self.stream.close() @return_type("list(basic_dict)") @param("wait", "float", desc="Time to wait for devices to show up before returning") @param("sort", "string", desc="Sort scan results by a key named key") @param("limit", "integer", desc="Limit results to the first n devices") @param("reverse", "bool", desc="Reverse the sort order") def scan(self, wait=None, sort=None, reverse=False, limit=None): """Scan for available devices and print a dictionary of information about them. If wait is specified as a floating point number in seconds, then the default wait times configured inside of the stream or device adapter used to find IOTile devices is overridden with the value specified. Args: wait (float): An optional override time to wait for results to accumulate before returning sort (string): An optional key to sort by reverse (bool): An optional key that will reverse the sort from ascending to descending limit (integer): An optional limit to the number of devices to return """ devices = self.stream.scan(wait=wait) for device in devices: # Add a Device Slug for user convenience if 'uuid' in device: device['slug'] = uuid_to_slug(device['uuid']) if sort is not None: devices.sort(key=lambda x: x[sort], reverse=reverse) if limit is not None: devices = devices[:limit] # FIXME: Use dictionary format in bled112stream to document information returned about devices return devices def get_proxy(self, short_name, version): """Find a proxy type given its short name. If no proxy type is found, return None. """ if short_name not in self._name_map: return None proxy_match = self.find_correct_proxy_version(self._name_map[short_name], version) return proxy_match if proxy_match is not None else self._name_map[short_name][0] def find_correct_proxy_version(self, proxies, version): """Retrieves the ModuleVersion of each proxy and match it with the tile version something Args: proxies (list): A list of proxies of a specific short name version (obj): A tuple that specifies the tile's version """ proxy_details = {} tile_version = SemanticVersion(version[0], version[1], version[2]) min_version = SemanticVersion(0, 0, 0) best_proxy = None self.logger.debug("Short name matched proxies found: {0}".format(proxies)) for proxy in proxies: proxy_details[proxy] = {} try: # If proxy has ModuleVersion(), get the SemanticVersionRange module_version = proxy.ModuleVersion() least_version = module_version._disjuncts[0][0][0] except AttributeError: # If proxy does not have ModuleVersion(), use None module_version = None least_version = SemanticVersion(0, 0, 0) if module_version is None: # Regardless if version is compatible, choose a best proxy for now if min_version == SemanticVersion(0, 0, 0): best_proxy = proxy self.logger.debug("Found a proxy without ModuleVersion info: {0}".format(proxy)) elif module_version.check(tile_version): # Set best proxy since it matches SVR and update min_version to beat if least_version > min_version: min_version = least_version best_proxy = proxy self.logger.debug("Found a compatible proxy: {0}".format(proxy)) else: self.logger.debug("Passed compatible proxy: {0}".format(proxy)) self.logger.debug("Best proxy found: {0} with base version {1}".format(best_proxy, min_version)) # If we don't make it in either conditional, best_proxy will return None return best_proxy def _create_proxy(self, proxy, address): """ Create a python proxy object to talk to an IOTile module with the given type at the given address. """ if proxy not in self._proxies: raise UnknownModuleTypeError("unknown proxy module specified", module_type=proxy, known_types=list(self._proxies)) proxy_class = self._proxies[proxy] return proxy_class(self.stream, address) def _create_stream(self, force_adapter=None, record=None): conn_string = None port = self.port if port is not None: port = port.strip() # Check if we're supposed to use a specific device adapter if force_adapter is not None: return AdapterStream(force_adapter, record=record) # Attempt to find a DeviceAdapter that can handle this transport type reg = ComponentRegistry() for _, adapter_factory in reg.load_extensions('iotile.device_adapter', name_filter=self.transport): return AdapterStream(adapter_factory(port), record=record) raise HardwareError("Could not find transport object registered to handle passed transport type", transport=self.transport)