Source code for iotilegateway.device

import logging
import copy
import datetime
import tornado.ioloop
import tornado.gen
import uuid
from iotile.core.hw.reports import BroadcastReport
from iotile.core.exceptions import ArgumentError


[docs]class DeviceManager: """An object to manage connections to IOTile devices over one or more specific DeviceAdapters. DeviceManagers aggregate all of the available devices across each DeviceAdapter and route connections to the appropriate adapter as connections are requested. An API is provided to make connections to devices, monitor events that happen on devices and remember what devices have been seen on different adapters. It is assumed that devices have unique identifiers so if the same device is seen by multiple DeviceAdapters, those different instances are unified and the best route to the device is chosen when a user tries to connect to it. For this purpose there is an abstract notion of 'signal_strength' that is reported by each DeviceAdapter and used to rank which one has a better route to a given device. Args: loop (tornado.ioloop.IOLoop): A tornado IOLoop object that this DeviceManager will run itself in. It is up to the caller to make sure the loop is started and run. The DeviceManager will run forever until the loop is stopped. """ ConnectionIdleState = 0 ConnectionRequestedState = 1 ConnectedState = 3 DisconnectionStartedState = 4 DisconnectedState = 5 def __init__(self, loop): self.monitors = {} self._scanned_devices = {} self.adapters = {} self.connections = {} self._loop = loop self._logger = logging.getLogger(__name__) self._logger.setLevel(logging.DEBUG) self._next_conn_id = 0 tornado.ioloop.PeriodicCallback(self.device_expiry_callback, 1000, self._loop).start() def add_adapter(self, man): adapter_id = len(self.adapters) self.adapters[adapter_id] = man man.set_id(adapter_id) man.add_callback('on_scan', self.device_found_callback) man.add_callback('on_disconnect', self.device_disconnected_callback) man.add_callback('on_report', self.report_received_callback) man.add_callback('on_trace', self.trace_received_callback) # If this adapter supports probing for devices, probe now so we have an initial # list of connected devices without waiting for them to show up over time if man.get_config('probe_supported', False): man.probe_sync() tornado.ioloop.PeriodicCallback(man.periodic_callback, 1000, self._loop).start() def stop(self): """Stop all adapters managed by the DeviceManager """ for _, adapter in self.adapters.items(): adapter.stop_sync() @property def scanned_devices(self): """Return a dictionary of all scanned devices across all connected DeviceAdapters Returns: dict: A dictionary mapping UUIDs to device information dictionaries """ devs = {} for device_id, adapters in self._scanned_devices.items(): dev = None max_signal = None best_adapter = None for adapter_id, devinfo in adapters.items(): connstring = "{0}/{1}".format(adapter_id, devinfo['connection_string']) if dev is None: dev = copy.deepcopy(devinfo) del dev['connection_string'] if 'adapters' not in dev: dev['adapters'] = [] best_adapter = adapter_id dev['adapters'].append((adapter_id, devinfo['signal_strength'], connstring)) if max_signal is None: max_signal = devinfo['signal_strength'] elif devinfo['signal_strength'] > max_signal: max_signal = devinfo['signal_strength'] best_adapter = adapter_id # If device has been seen in no adapters, it will get expired # don't return it if dev is None: continue dev['adapters'] = sorted(dev['adapters'], key=lambda x: x[1], reverse=True) dev['best_adapter'] = best_adapter dev['signal_strength'] = max_signal devs[device_id] = dev return devs @tornado.gen.coroutine def connect(self, uuid): """Coroutine to attempt to connect to a device by its UUID Args: uuid (int): the IOTile UUID of the device that we're trying to connect to Returns: a dict containing: 'success': bool with whether the attempt was sucessful 'reason': failure_reason as a string if the attempt failed 'connection_id': int with the id for the connection if the attempt was successful, 'connection_string': a string that can be used to reconnect to this exact device in the future on success """ devs = self.scanned_devices if uuid not in devs: raise tornado.gen.Return({'success': False, 'reason': 'Could not find UUID'}) adapter_id = None connection_string = None # Find the best adapter to use based on the first adapter with an open connection spot for adapter, signal, connstring in devs[uuid]['adapters']: if self.adapters[adapter].can_connect(): adapter_id = adapter connection_string = connstring break if adapter_id is None: raise tornado.gen.Return({ 'success': False, 'reason': "No room on any adapter that sees this device for more connections" }) result = yield self.connect_direct(connection_string) if result['success']: result['connection_string'] = connection_string conn_id = result['connection_id'] self._update_connection_data(conn_id, 'uuid', uuid) raise tornado.gen.Return(result) @tornado.gen.coroutine def probe_async(self): """Coroutine to probe all adapters which can, to update the scanned devices. Returns: dict: A dictionary mapping UUIDs to device information dictionaries (self.scanned_devices) """ for adapter_id, manager in self.adapters.items(): if manager.get_config('probe_supported', False): yield tornado.gen.Task(manager.probe_async) raise tornado.gen.Return(self.scanned_devices) def register_monitor(self, device_uuid, filter_names, callback): """Register to receive callbacks when events happen on a specific device The registered callback function will be called whenever the following events occur using the given event_name: - 'report': a report is received from the device - 'connection': someone has connected to the device - 'trace': tracing data has been received from a device - 'device_seen': a scan event has seen the device - 'disconnection': someone has disconnected from the device - 'broadcast': A broadcast report has been received from the device. There are some specific requirements when using the broadcast monitor since that is not associated with a specific device UUID. You must specify a device_uuid of None to indicate that you do not want to filter by device and you must specify only the 'broadcast' filter, no others since the others are not compatible with a None device_uuid. Args: device_uuid (int): The device that we want to monitor filter_names (iterable): A list of strings with the event names that the caller would wish to receive callback (callable): The function that should be called when an event occurs callback must have the signature callback(event_name, event_arg) Returns: string: A unique string that can be used to remove or adjust this monitoring callback in the future """ # FIXME: Check filter_names to make sure they contain only supported event names monitor_uuid = uuid.uuid4() filters = set(filter_names) if 'broadcast' in filters: if device_uuid is not None: raise ArgumentError("You must pass a device_uuid of None when registering a broadcast filter", device_uuid=device_uuid) if len(filters) > 1: raise ArgumentError("You are not currently allowed to combine a broadcast filter with any other events", filter_names=filter_names) if device_uuid is None: device_uuid = '*' if device_uuid not in self.monitors: self.monitors[device_uuid] = {} self.monitors[device_uuid][monitor_uuid.hex] = (filters, callback) self._logger.debug("Registered monitor, device=%s, filters=%s, uuid=%s", device_uuid, filter_names, monitor_uuid) return "{}/{}".format(device_uuid, monitor_uuid.hex) def adjust_monitor(self, monitor_id, add_events=None, remove_events=None): """Adjust the events that this monitor wishes to receive Args: monitor_id (string): The exact string returned from a previous call to register_monitor add_events (iterable): A list of events to begin receiving remove_events (iterable): A list of events to stop receiving """ dev_uuid, _, monitor_name = monitor_id.partition('/') if dev_uuid == '*': raise ArgumentError("You are not currently allowed to adjust a global monitor", monitor_id=monitor_id) dev_uuid = int(dev_uuid) if dev_uuid not in self.monitors or monitor_name not in self.monitors[dev_uuid]: raise ArgumentError("Could not find monitor by name", monitor_id=monitor_id) filters, callback = self.monitors[dev_uuid][monitor_name] if add_events is not None: filters.update(add_events) if remove_events is not None: filters.difference_update(remove_events) self.monitors[dev_uuid][monitor_name] = (filters, callback) def remove_monitor(self, monitor_id): """Remove a previously added device event monitro Args: monitor_id (string): The exact string returned from a previous call to register_monitor """ dev_uuid, _, monitor_name = monitor_id.partition('/') if dev_uuid != '*': dev_uuid = int(dev_uuid) if dev_uuid not in self.monitors or monitor_name not in self.monitors[dev_uuid]: raise ArgumentError("Could not find monitor by name", monitor_id=monitor_id) del self.monitors[dev_uuid][monitor_name] def call_monitor(self, device_uuid, event, *args): """Call a monitoring function for an event on device Args: device_uuid (int): The UUID of the device event (string): The name of the event *args: Arguments to be passed to the event monitor function """ if device_uuid is None: device_uuid = '*' if device_uuid not in self.monitors: return for listeners, monitor in self.monitors[device_uuid].values(): if event in listeners: monitor(device_uuid, event, *args) @tornado.gen.coroutine def connect_direct(self, connection_string): """Directly connect to a device using its connection string Connection strings are opaque strings returned by DeviceAdapter objects that allow direct connection to a unique IOTile device accessible via that adapter. The DeviceManager prepends an adapter id to the connection string, separating both with a '/' so that you can directly address any device on any DeviceAdapter using a combined connection_string. Args: connection_string (string): A connection string that specifies a combination of an adapter and a device on that adapter. Returns: a dict containing: 'success': bool with whether the attempt was sucessful 'reason': failure_reason as a string if the attempt failed 'connection_id': int with the id for the connection if the attempt was successful """ adapter_id, _, connstring = connection_string.partition('/') adapter_id = int(adapter_id) if adapter_id not in self.adapters: raise tornado.gen.Return({'success': False, 'reason': "Adapter ID not found in connection string"}) conn_id = self._get_connection_id() self._update_connection_data(conn_id, 'adapter', adapter_id) self._update_connection_data(conn_id, 'report_callbacks', set()) self._update_connection_state(conn_id, self.ConnectionRequestedState) result = yield tornado.gen.Task(self.adapters[adapter_id].connect_async, conn_id, connstring) conn_id, adapter_id, success, failure_reason = result.args resp = {'success': success} if success: self._update_connection_state(conn_id, self.ConnectedState) resp['connection_id'] = conn_id else: del self.connections[conn_id] if 'failure_reason' is not None: resp['reason'] = failure_reason else: resp['reason'] = 'Unknown failure reason' raise tornado.gen.Return(resp) @tornado.gen.coroutine def open_interface(self, connection_id, interface): """Coroutine to attempt to enable a particular interface on a connected device Args: connection_id (int): The id of a previously opened connection interface (string): The name of the interface that we are trying to enable Returns: a dictionary containg two keys: 'success': bool with whether the attempt was sucessful 'reason': failure_reason as a string if the attempt failed """ if connection_id not in self.connections: raise tornado.gen.Return({'success': False, 'reason': 'Could not find connection id %d' % connection_id}) adapter = self._get_connection_data(connection_id, 'adapter') result = yield tornado.gen.Task(self.adapters[adapter].open_interface_async, connection_id, interface) _, _, success, failure_reason = result.args resp = {'success': success} if not success: if 'failure_reason' is not None: resp['reason'] = failure_reason else: resp['reason'] = 'Unknown failure reason' raise tornado.gen.Return(resp) @tornado.gen.coroutine def close_interface(self, connection_id, interface): """Coroutine to attempt to disable a particular interface on a connected device Args: connection_id (int): The id of a previously opened connection interface (string): The name of the interface that we are trying to disable Returns: a dictionary containg two keys: 'success': bool with whether the attempt was sucessful 'reason': failure_reason as a string if the attempt failed """ if connection_id not in self.connections: raise tornado.gen.Return({'success': False, 'reason': 'Could not find connection id %d' % connection_id}) adapter = self._get_connection_data(connection_id, 'adapter') result = yield tornado.gen.Task(self.adapters[adapter].close_interface_async, connection_id, interface) _, _, success, failure_reason = result.args resp = {'success': success} if not success: if 'failure_reason' is not None: resp['reason'] = failure_reason else: resp['reason'] = 'Unknown failure reason' raise tornado.gen.Return(resp) @tornado.gen.coroutine def disconnect(self, connection_id): """Disconnect from a current connection Args: connection_id (int): The connection id returned from a previous call to connect() Returns: a dictionary containing two keys: 'success': bool with whether the attempt was sucessful 'reason': failure_reason as a string if the attempt failed """ if connection_id not in self.connections: raise tornado.gen.Return({'success': False, 'reason': 'Could not find connection id %d' % connection_id}) if self.connections[connection_id]['state'] != self.ConnectedState: raise tornado.gen.Return({ 'success': False, 'reason': 'Connection id %d is not in the right state' % connection_id }) adapter_id = self.connections[connection_id]['context']['adapter'] result = yield tornado.gen.Task(self.adapters[adapter_id].disconnect_async, connection_id) _, _, success, failure_reason = result.args resp = {'success': success} if success: del self.connections[connection_id] else: if 'failure_reason' is not None: resp['reason'] = failure_reason else: resp['reason'] = 'Unknown failure reason' raise tornado.gen.Return(resp) @tornado.gen.coroutine def send_rpc(self, connection_id, address, feature, command, payload, timeout): """Send an RPC to an IOTile device Args: connection_id (int): The connection id returned from a previous call to connect() address (int): the address of the tile that you want to talk to feature (int): the high byte of the rpc id command (int): the low byte of the rpc id payload (string): the payload to send (up to 20 bytes) timeout (float): the maximum amount of time to wait for a response """ if connection_id not in self.connections: raise tornado.gen.Return({'success': False, 'reason': 'Could not find connection id %d' % connection_id}) if self.connections[connection_id]['state'] != self.ConnectedState: raise tornado.gen.Return({ 'success': False, 'reason': 'Connection id %d is not in the right state' % connection_id}) rpc_id = (feature << 8) | command adapter_id = self.connections[connection_id]['context']['adapter'] result = yield tornado.gen.Task( self.adapters[adapter_id].send_rpc_async, connection_id, address, rpc_id, payload, timeout ) _, _, success, failure_reason, status, payload = result.args resp = {'success': success} if success: resp['status'] = status resp['payload'] = payload else: resp['reason'] = failure_reason raise tornado.gen.Return(resp) @tornado.gen.coroutine def send_script(self, connection_id, data, progress_callback): """ """ if connection_id not in self.connections: raise tornado.gen.Return({'success': False, 'reason': 'Could not find connection id %d' % connection_id}) if self.connections[connection_id]['state'] != self.ConnectedState: raise tornado.gen.Return({ 'success': False, 'reason': 'Connection id %d is not in the right state' % connection_id }) adapter_id = self.connections[connection_id]['context']['adapter'] result = yield tornado.gen.Task( self.adapters[adapter_id].send_script_async, connection_id, data, progress_callback ) _, _, success, failure_reason = result.args resp = {'success': success} if not success: resp['reason'] = failure_reason raise tornado.gen.Return(resp) def _get_connection_id(self): """Get a unique connection ID Returns: int: the unique id for this connection """ next_id = self._next_conn_id self._next_conn_id += 1 self.connections[next_id] = {'state': self.ConnectionIdleState, 'context': {}} return next_id def _update_connection_data(self, conn_id, key, value): if conn_id not in self.connections: raise ValueError("Unknown conn_id") self.connections[conn_id]['context'][key] = value def _get_connection_data(self, conn_id, key): if conn_id not in self.connections: raise ValueError("Unknown conn_id") return self.connections[conn_id]['context'][key] def _update_connection_state(self, conn_id, new_state): """Update the connection state for this connection Args: conn_id (int): The connection ID to update new_state: The new state to transition into """ if conn_id not in self.connections: raise ValueError("Unknown conn_id") self.connections[conn_id]['state'] = new_state def _get_connection_string(self, uuid, adapter_id): """Return the connection string appropriate to connect to a device using a given adapter Returns: string: the appropriate connection string that can be passed to the given adapter to connect to this device. """ devs = self._scanned_devices return devs[uuid][adapter_id]['connection_string'] def device_disconnected_callback(self, adapter, connection_id): """Called when an adapter has had an unexpected device disconnection Args: adapter (int): The id of the adapter that was disconnected connection_id (int): The id of the connection that has been disconnected """ def sync_device_disconnected_callback(self, adapter, connection_id): pass self._loop.add_callback(sync_device_disconnected_callback, self, adapter, connection_id) def device_found_callback(self, ad, inf, exp): """Add or update a device record in scanned_devices This notification function is called by a DeviceAdapter and notifies the device manager that a new device has been seen. This callback must only be called on the main tornado ioloop. Args: ad (int): the id of the adapter that found this device inf (dict): a dictionary of information about the device including its uuid exp (int): the number of seconds the device should stay in scanned_devices before expiring. If expires==0 then the record will never expire on its own, """ def sync_device_found_callback(self, adapter, info, expires): uuid = info['uuid'] if expires > 0: info['expires'] = datetime.datetime.now() + datetime.timedelta(seconds=expires) if uuid not in self._scanned_devices: self._scanned_devices[uuid] = {} devrecord = self._scanned_devices[uuid] devrecord[adapter] = info self._loop.add_callback(sync_device_found_callback, self, ad, inf, exp) def device_lost_callback(self, adapter, uuid): """Remove a device record from scanned_devices DeviceAdapters should call this function when they lose track of an IOTile device. This function SHOULD NOT be called when a device just hasn't been seen for awhile unless the DeviceAdapter knows that it is no longer accessible. This function should be useful only for DeviceAdapter objects that do not periodically scan for device where the natural expiration logic in DeviceManager would remove them, but rather know explicitly when devices come and go so they provide non-expiring records in device_found_callback Args: adapter (int): the id of the adapter that lost this device uuid (int): the UUID of the device to expire. """ if uuid not in self._scanned_devices: self._logger.warn('Device lost called for UUID %d but device was not in scanned_devices list', uuid) return devrecord = self._scanned_devices[uuid] if adapter not in devrecord: self._logger.warn('Device lost called for UUID %d but device was not registered for the adapter that lost it (adapter id=%d)', adapter, uuid) return del devrecord[adapter] def trace_received_callback(self, connection_id, trace): """Callback when tracing data has been received for a connection Args: connection_id (int): The id of the connection for which the report was received trace (bytearray): The raw data traced from the device """ def sync_trace_received_callback(self, connection_id, report): if connection_id not in self.connections: self._logger.warn('Dropping tracing data for an unknown connection %d', connection_id) try: dev_uuid = self._get_connection_data(connection_id, 'uuid') self.call_monitor(dev_uuid, 'trace', report) except KeyError: self._logger.warn( 'Dropping tracing data for a connection that has no associated UUID %d', connection_id ) self._loop.add_callback(sync_trace_received_callback, self, connection_id, trace) def report_received_callback(self, connection_id, report): """Callback when a report has been received. There are two cases when this can be called: 1. When connection_id is not None so the report is a normal report streamed over a connection to a device and it is forwarded to people who have a 'report' monitor. 2. When connection_id is None and the report is a BroadcastReport that is sent to all registered 'broadcast' monitors. Args: connection_id (int): The id of the connection for which the report was received. This can only be None if isinstance(report, BroadcastReport) report (IOTileReport): A report streamed from a device """ def sync_reported_received_callback(self, connection_id, report): """Properly forward on a received report.""" if connection_id is None and isinstance(report, BroadcastReport): self.call_monitor(None, 'broadcast', report) return if connection_id not in self.connections: self._logger.warn('Dropping report for an unknown connection %d', connection_id) try: dev_uuid = self._get_connection_data(connection_id, 'uuid') self.call_monitor(dev_uuid, 'report', report) except KeyError: self._logger.warn('Dropping report for a connection that has no associated UUID %d', connection_id) self._loop.add_callback(sync_reported_received_callback, self, connection_id, report) def device_expiry_callback(self): """Periodic callback to remove expired devices from scanned_devices list """ expired = 0 for uuid, adapters in self._scanned_devices.items(): to_remove = [] now = datetime.datetime.now() for adapter, dev in adapters.items(): if 'expires' not in dev: continue if now > dev['expires']: to_remove.append(adapter) for x in to_remove: del adapters[x] expired += 1 if expired > 0: self._logger.info('Expired %d devices' % expired)