Source code for

"""Base class for data streamed from an IOTile device"""

import datetime
from iotile.core.exceptions import NotFoundError

[docs]class IOTileReading: """Base class for readings streamed from IOTile device. Each reading represents a single time/value pair sent from an IOTile Device. Since many IOTile Devices do not have a hardware realtime clock, the timestamp that is assigned to a reading may only be a relative interval from a fixed event in the past, like the time the device turned on. If the user knows the absolute time for this event they can pass it as a datetime in time_base to turn the relative reading timestamp into an absolute datetime accessible as reading_time. Args: raw_time (int): the number of seconds since the device turned on when the reading was taken time_base (datetime): An optional estimate of when the device was last turned on so that we can calculate the actual time of the reading reading_time (datetime): An optional UTC time when this event was acquired. If combined with time_base, this value will take precedence and time_base and raw_time will be ignored. reading_id (int): An optional unique identifier for this reading that allows deduplication. If no reading id is passed, InvalidReadingID is used. stream (int): The stream that this reading is part of value (int): The raw reading value """ _Y2KReference = datetime.datetime(2000, 1, 1) InvalidReadingID = 0 def __init__(self, raw_time, stream, value, time_base=None, reading_id=None, reading_time=None): self.raw_time = raw_time = stream self.value = value if reading_id is None: reading_id = IOTileReading.InvalidReadingID self.reading_id = reading_id self.reading_time = reading_time if self.reading_time is None: self.reading_time = self._try_assign_utc_time(self.raw_time, time_base) def _try_assign_utc_time(self, raw_time, time_base): """Try to assign a UTC time to this reading.""" # Check if the raw time is encoded UTC since y2k or just uptime if raw_time != IOTileEvent.InvalidRawTime and (raw_time & (1 << 31)): y2k_offset = self.raw_time ^ (1 << 31) return self._Y2KReference + datetime.timedelta(seconds=y2k_offset) if time_base is not None: return time_base + datetime.timedelta(seconds=raw_time) return None
[docs] def asdict(self): """Encode the data in this reading into a dictionary. Returns: dict: A dictionary containing the information from this reading. """ timestamp_str = None if self.reading_time is not None: timestamp_str = self.reading_time.isoformat() return { 'stream':, 'device_timestamp': self.raw_time, 'streamer_local_id': self.reading_id, 'timestamp': timestamp_str, 'value': self.value }
[docs] @classmethod def FromDict(cls, obj): """Create an IOTileReading from the result of a previous call to asdict(). Args: obj (dict): A dictionary produced by a call to IOTileReading.asdict() Returns: IOTileReading: The converted IOTileReading object. """ timestamp = obj.get('timestamp') if timestamp is not None: import dateutil.parser timestamp = dateutil.parser.parse(timestamp) return IOTileReading(obj.get('device_timestamp'), obj.get('stream'), obj.get('value'), reading_id=obj.get('streamer_local_id'), reading_time=timestamp)
def __eq__(self, other): return self.raw_time == other.raw_time and == and self.value == other.value and self.reading_id == other.reading_id def __str__(self): if self.reading_time is not None: return "Stream 0x{:04X} (id 0x{:08X}): 0x{:08X} at {}".format(, self.reading_id, self.value, self.reading_time) else: return "Stream 0x{:04X} (id 0x{:08X}): 0x{:08X} at uncorrected time {}".format(, self.reading_id, self.value, self.raw_time)
class IOTileEvent: """Base class for all unstructured events. An event is a dictionary with a small summary section and an arbitrarily large data section. The difference between IOTileReading and IOTileEvent is that all readings are integers whereas events are key/value stores. There are two different key/value stores in an IOTileEvent because there may be a very large amount of raw data that is summarized into a smaller representation. It may be useful to know that separation so that we can store the large data somewhere different from where we store the summary. Args: raw_time (int): the number of seconds since the device turned on when the reading was taken. This may be 0xFFFFFFFF if the raw time is not known. time_base (datetime): An optional estimate of when the device was last turned on so that we can calculate the actual time of the reading. If this is passed it is combined with raw_time to figure out the UTC time when the reading was taken. reading_time (datetime): An optional UTC time when this event was acquired. If combined with time_base, this value will take precedence and time_base and raw_time will be ignored. reading_id (int): An optional unique identifier for this reading that allows deduplication. If no reading id is passed, InvalidReadingID is used. stream (int): The stream that this reading is part of summary_data (dict): A dictionary of any summary data this event has. You may pass None if there is no summary data. raw_data (dict): A dictionary (possibly very large) of all data associated with this event. You may pass None if all data is contained in the summary_data member. """ InvalidRawTime = 0xFFFFFFFF def __init__(self, raw_time, stream, summary_data, raw_data, time_base=None, reading_id=None, reading_time=None): self.raw_time = raw_time = stream if reading_id is None: reading_id = IOTileReading.InvalidReadingID self.reading_id = reading_id self.reading_time = reading_time if self.reading_time is None and time_base is not None and raw_time != IOTileEvent.InvalidRawTime: self.reading_time = time_base + datetime.timedelta(seconds=raw_time) self.summary_data = summary_data self.raw_data = raw_data def asdict(self): """Encode the data in this event into a dictionary. The dictionary returned from this method is a reference to the data stored in the IOTileEvent, not a copy. It should be treated as read only. Returns: dict: A dictionary containing the information from this event. """ return { 'stream':, 'device_timestamp': self.raw_time, 'streamer_local_id': self.reading_id, 'timestamp': self.reading_time, 'extra_data': self.summary_data, 'data': self.raw_data } @classmethod def FromDict(cls, obj): """Create an IOTileEvent from the result of a previous call to asdict(). Args: obj (dict): A dictionary produced by a call to IOTileEvent.asdict() Returns: IOTileEvent: The converted IOTileEvent object. """ timestamp = obj.get('timestamp') if timestamp is not None: import dateutil.parser timestamp = dateutil.parser.parse(timestamp) return IOTileEvent(obj.get('device_timestamp'), obj.get('stream'), obj.get('extra_data'), obj.get('data'), reading_id=obj.get('streamer_local_id'), reading_time=timestamp) def __str__(self): if self.reading_time is not None: return "Stream 0x{:04X}: Event at {}".format(, self.reading_time) elif self.raw_time != self.InvalidRawTime: return "Stream 0x{:04X}: Event at uncorrected time {}".format(, self.raw_time) return "Stream 0x{:04X}: Event at unknown time".format( class IOTileReport: """Base class for data streamed from an IOTile device. All IOTileReports must derive from this class and must implement the following interface - class method HeaderLength(cls) function returns the number of bytes that must be read before the total length of the report can be determined. HeaderLength() must always be less than or equal to the length of the smallest version of this report. - class method ReportLength(cls, header): function that takes HeaderLength() bytes and returns the total size of the report, including the header. - class method FromReadings(cls, uuid, readings) function that creates an instance of an IOTileReport subclass from a list of readings and a device uuid. - property ReportType: The one byte type code that defines this report type - instance method verify(self): function that verifies that a report is correctly received and, if possible, that the sender is who it says it is. - instance method decode(self): function that decodes a report into a series of IOTileReading objects. The function should return a list of readings. - instance method serialize(self): function that should turn the report into a serialized bytearray that could be decoded with decode(). Args: rawreport (bytearray): The raw data of this report signed (bool): Whether this report is signed to specify who it is from encrypted (bool): Whether this report is encrypted received_time (datetime): The time in UTC when this report was received from a device. If not received, the time is assumed to be utcnow(). """ def __init__(self, rawreport, signed, encrypted, received_time=None): self.visible_readings = [] self.visible_events = [] self.origin = None if received_time is None: self.received_time = datetime.datetime.utcnow() else: self.received_time = received_time self.raw_report = rawreport self.signed = signed self.encrypted = encrypted self.verified = False # We may not have any visible readings if our report is encrypted # and we do not have access to the decryption key. self.visible_readings, self.visible_events = self.decode() @classmethod def HeaderLength(cls): """Return the length of a header needed to calculate this report's length Returns: int: the length of the needed report header """ raise NotFoundError("IOTileReport HeaderLength needs to be overridden") @classmethod def ReportLength(cls, header): """Given a header of HeaderLength bytes, calculate the size of this report """ raise NotFoundError("IOTileReport ReportLength needs to be overriden") def decode(self): """Decode a raw report into a series of readings """ raise NotFoundError("IOTileReport decode needs to be overriden") def encode(self): """Encode this report into a binary blob that could be decoded by a report format's decode method.""" return self.raw_report def save(self, path): """Save a binary copy of this report Args: path (string): The path where we should save the binary copy of the report """ data = self.encode() with open(path, "wb") as out: out.write(data) def serialize(self): """Turn this report into a dictionary that encodes all information including received timestamp""" info = {} info['received_time'] = self.received_time info['encoded_report'] = bytes(self.encode()) # Handle python 2 / python 3 differences report_format = info['encoded_report'][0] if not isinstance(report_format, int): report_format = ord(report_format) info['report_format'] = report_format # Report format is the first byte of the encoded report info['origin'] = self.origin return info def __str__(self): if self.verified: verified = "verified" else: verified = "not verified" if self.encrypted: enc = "encrypted" else: enc = "not encrypted" return "IOTile Report (length: %d, visible readings: %d, visible events: %d, %s and %s)" \ % (len(self.raw_report), len(self.visible_readings), len(self.visible_events), verified, enc)