Source code for decocare.history

#!/usr/bin/python
"""
This module provides some basic helper/formatting utilities,
specifically targeted at decoding ReadHistoryData data.

"""
import io
from binascii import hexlify

import lib
from records import *
from datetime import date

_remote_ids = [
  bytearray([ 0x01, 0xe2, 0x40 ]),
  bytearray([ 0x03, 0x42, 0x2a ]),
  bytearray([ 0x0c, 0x89, 0x92 ]),
]

[docs]def decode_remote_id(msg): """ practice decoding some remote ids: | 0x27 | 0x01 0xe2 0x40 | 0x03 0x42 0x2a | 0x28 0x0c 0x89 | 0x92 0x00 0x00 0x00 >>> decode_remote_id(_remote_ids[0]) '123456' >>> decode_remote_id(_remote_ids[1]) '213546' >>> decode_remote_id(_remote_ids[2]) '821650' """ high = msg[ 0 ] * 256 * 256 middle = msg[ 1 ] * 256 low = msg[ 2 ] return str(high + middle + low)
[docs]class AlarmPump(KnownRecord): opcode = 0x06 head_length = 4
#class ResultTotals(KnownRecord):
[docs]class ResultDailyTotal(InvalidRecord): """On 722 this seems like two records.""" opcode = 0x07 #head_length = 5 head_length = 5 date_length = 2 #body_length = 37 + 4 #body_length = 2 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if self.larger: self.body_length = 3
[docs] def parse_time(self): date = parse_midnight(self.date) self.datetime = date if not hasattr(date, 'isoformat'): self.datetime = None return date
[docs] def decode (self): self.parse_time( ) mid = unmask_m_midnight(self.date)[0:3] return (dict(valid_date=date(*mid).isoformat()))
[docs] def date_str(self): result = 'unknown' if self.datetime is not None: result = self.datetime.isoformat( ) else: if len(self.date) >=2: result = "{}".format(unmask_m_midnight(self.date)) return result
[docs]class ChangeBasalProfile_old_profile (KnownRecord): opcode = 0x08 # old/bad # body_length = 46 # XXX: on LeDanaScott's, 522, this seems right body_length = 145 # head_length = 3 # XXX: # for 554!? def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if self.larger: self.body_length = 145
[docs] def decode (self): self.parse_time( ) rates = [ ] i = 0 for x in range(47): start = x * 3 end = start + 3 (offset, rate, q) = self.body[start:end] if [offset, rate, q] == [ 0x00, 0x00, 0x00]: break try: rates.append(describe_rate(offset, rate, q)) except TypeError, e: remainder = [ offset, rate, q ] rates.append(remainder) return rates
[docs]def describe_rate (offset, rate, q): return (dict(offset=(30*1000*60)*offset, rate=rate/40.0))
[docs]class ChangeBasalProfile_new_profile (KnownRecord): opcode = 0x09 body_length = 145 # body_length = 144 # XXX: # for 554!? # head_length = 3 # XXX: # for 554!?
[docs] def decode (self): self.parse_time( ) rates = [ ] i = 0 for x in range(47): start = x * 3 end = start + 3 (offset, rate, q) = self.body[start:end] if [offset, rate, q] == [ 0x00, 0x00, 0x00]: break rates.append(describe_rate(offset, rate, q)) return rates
[docs]class ClearAlarm(KnownRecord): opcode = 0x0C
[docs]class SelectBasalProfile(KnownRecord): opcode = 0x14
[docs]class ChangeTime(KnownRecord): opcode = 0x17
[docs]class NewTimeSet(KnownRecord): opcode = 0x18
[docs]class LowBattery(KnownRecord): opcode = 0x19
[docs]class Battery(KnownRecord): opcode = 0x1a
[docs]class PumpSuspend(KnownRecord): opcode = 0x1e
[docs]class PumpResume(KnownRecord): opcode = 0x1f
[docs]class Rewind(KnownRecord): opcode = 0x21
[docs]class EnableDisableRemote(KnownRecord): opcode = 0x26 # body_length = 14 # head_length = 3 # XXX: for 554 body_length = 14
[docs]class ChangeRemoteID(KnownRecord): opcode = 0x27
[docs]class TempBasalDuration(KnownRecord): opcode = 0x16 _test_1 = bytearray([ ])
[docs] def decode(self): self.parse_time( ) basal = { 'duration (min)': self.head[1] * 30, } return basal
[docs]class ChangeMazaheri2e (KnownRecord): opcode = 0x2e body_length = 100
# class BolusWizard512 (BolusWizard):
[docs]class BolusWizard512 (KnownRecord): opcode = 0x2f body_length = 12
[docs]class UnabsorbedInsulin512 (UnabsorbedInsulinBolus): opcode = 0x30
[docs]class TempBasal (KnownRecord): opcode = 0x33 body_length = 1 _test_1 = bytearray([ ])
[docs] def decode(self): self.parse_time( ) temp = { 0: 'absolute', 1: 'percent' }[(self.body[0] >> 3)] status = dict(temp=temp) if temp is 'absolute': rate = self.head[1] / 40.0 status.update(rate=rate) if temp is 'percent': rate = int(self.head[1]) status.update(rate=rate) return status
[docs]class LowReservoir(KnownRecord): """ >>> rec = LowReservoir( LowReservoir._test_1[:2] ) >>> decoded = rec.parse(LowReservoir._test_1) >>> print str(rec) LowReservoir 2012-12-07T11:02:43 head[2], body[0] op[0x34] >>> print pformat(decoded) {'amount': 20.0} """ opcode = 0x34 _test_1 = bytearray([ 0x34, 0xc8, 0xeb, 0x02, 0x0b, 0x07, 0x0c, ])
[docs] def decode(self): self.parse_time( ) reservoir = {'amount' : int(self.head[1]) / 10.0 } return reservoir
[docs]class ChangeAlarmNotifyMode (KnownRecord): opcode = 0x63 body_length = 0
[docs]class ChangeTimeDisplay(KnownRecord): opcode = 0x64
[docs]class ChangeBolusWizardSetup (KnownRecord): opcode = 0x4f body_length = 40
_confirmed = [ Bolus, Prime, AlarmPump, ResultDailyTotal, ChangeBasalProfile_old_profile, ChangeBasalProfile_new_profile, ClearAlarm, SelectBasalProfile, TempBasalDuration, ChangeTime, NewTimeSet, LowBattery, Battery, PumpSuspend, PumpResume, CalBGForPH, Rewind, EnableDisableRemote, ChangeRemoteID, TempBasal, LowReservoir, BolusWizard, UnabsorbedInsulinBolus, ChangeAlarmNotifyMode, ChangeTimeDisplay, ChangeBolusWizardSetup, ] # _confirmed.append(DanaScott0x09) _confirmed.append(ChangeMazaheri2e) _confirmed.append(BolusWizard512) _confirmed.append(UnabsorbedInsulin512)
[docs]class JournalEntryMealMarker(KnownRecord): """Capture Event > Meal marker""" opcode = 0x40 body_length = 2
[docs] def decode(self): super(JournalEntryMealMarker, self).decode() return dict(carb_input=int(lib.BangInt([self.head[1], self.body[0]])))
_confirmed.append(JournalEntryMealMarker)
[docs]class JournalEntryExerciseMarker(KnownRecord): """Capture Event > Exercise marker""" opcode = 0x41 body_length = 1
_confirmed.append(JournalEntryExerciseMarker)
[docs]class JournalEntryOtherMarker(KnownRecord): """Capture Event > Other""" opcode = 0x43 body_length = 0
_confirmed.append(JournalEntryOtherMarker)
[docs]class Ian69(KnownRecord): opcode = 0x69 body_length = 2
_confirmed.append(Ian69)
[docs]class Ian50(KnownRecord): opcode = 0x50 body_length = 34 # XXX: tghoward testing on 723 at length 30 body_length = 30 def __init__ (self, head, model, **kwds): super(Ian50, self).__init__(head, model, **kwds) self.body_length = model.Ian50Body
_confirmed.append(Ian50)
[docs]class Ian54(KnownRecord): opcode = 0x54 body_length = 34 + 23 body_length = 57
_confirmed.append(Ian54)
[docs]class AlarmSensor (KnownRecord): """Glucose sensor alarms. The second byte of the head represents the alarm type. The third byte contains an alarm-specific value. For example, a "Low Glucose" alarm type is: [ 0x0b, # 11: Opcode 0x66, # 102: Low glucose subtype 0x50 # 80: Glucose level (For a pump configured to mg/dL) ] """ opcode = 0x0B head_length = 3 alarm_types = { 101: 'High Glucose', 102: 'Low Glucose', 104: 'Meter BG Now', 105: 'Cal Reminder', 106: 'Calibration Error', 107: 'Sensor End', 112: 'Weak Signal', 113: 'Lost Sensor', 115: 'Low Glucose Predicted' }
[docs] def decode(self): super(AlarmSensor, self).decode() alarm_type = self.head[1] decoded_dict = { 'alarm_type': alarm_type, 'alarm_description': self.alarm_types.get(alarm_type, 'Unknown sensor alarm ({})'.format(alarm_type)) } if alarm_type in (101, 102,): year_bits = extra_year_bits(self.date[4]) decoded_dict['amount'] = int(lib.BangInt([year_bits[0], self.head[2]])) return decoded_dict
_confirmed.append(AlarmSensor)
[docs]class BGReceived (KnownRecord): opcode = 0x3F body_length = 3
[docs] def decode (self): self.parse_time( ) bg = (self.head[1] << 3) + (self.date[2] >> 5) return dict(link=str(self.body).encode('hex'), amount=bg)
_confirmed.append(BGReceived)
[docs]class IanA8(KnownRecord): opcode = 0xA8 head_length = 10
_confirmed.append(IanA8)
[docs]class BasalProfileStart(KnownRecord): opcode = 0x7b body_length = 3 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if self.larger: # body_length = 1 pass # self.body_length = 48
[docs] def decode (self): self.parse_time( ) if (len(self.body) % 3 == 0): rate = describe_rate(*self.body) rate['profile_index'] = self.head[1] return rate else: return dict(raw=hexlify(self.body))
_confirmed.append(BasalProfileStart) # 123, 143
[docs]class OldBolusWizardChange (KnownRecord): opcode = 0x5a body_length = 117 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if self.larger: self.body_length = 117 + 17 + 3 pass
[docs] def decode (self): self.parse_time( ) half = (self.body_length - 1) / 2 stale = self.body[0:half] changed = self.body[half:-1] tail = self.body[-1] return dict(stale=decode_wizard_settings(stale, model=self.model) # , _changed=changed , changed=decode_wizard_settings(changed, model=self.model) , tail=tail )
_confirmed.append(OldBolusWizardChange)
[docs]def decode_wizard_settings (data, num=8, model=None): head = data[0:2] tail = data[len(head):] carb_reader = model.read_carb_ratios.msg cr_size = carb_reader.item_size carb_ratios = tail[0:num*cr_size] tail = tail[num*cr_size:] insulin_sensitivies = tail[0:(num*2)] tail = tail[num*2:] isMg = head[0] & 0b00000100 isMmol = head[0] & 0b00001000 bg_units = 1 if isMmol and not isMg: bg_units = 2 bg_targets = tail[0:(num*3)+2] if model and model.larger: bg_targets = bg_targets[2:] return dict(head=str(head).encode('hex') # , carb_ratios=decode_carb_ratios(carb_ratios) , carb_ratios=carb_reader.decode_ratios(carb_ratios) # , _carb_ratios=str(carb_ratios).encode('hex') # , cr_len=len(carb_ratios) , insulin_sensitivies=decode_insulin_sensitivies(insulin_sensitivies) # , _insulin_sensitivies=str(insulin_sensitivies).encode('hex') # , is_len=len(insulin_sensitivies) # , bg_len=len(bg_targets) , bg_targets=decode_bg_targets(bg_targets, bg_units) # , _o_len=len(data) # , _bg_targets=str(bg_targets).encode('hex') , _head = "{0:#010b} {1:#010b}".format(*head) )
[docs]def decode_carb_ratios (data): ratios = [ ] for x in range(8): start = x * 3 end = start + 3 (offset, q, r) = data[start:end] ratio = r/10.0 if q: ratio = lib.BangInt([q, r]) / 1000.0 ratios.append(dict(i=x, offset=offset*30, q=q, _offset=offset, ratio=ratio, r=r)) return ratios
[docs]def decode_insulin_sensitivies (data): sensitivities = [ ] for x in range(8): start = x * 2 end = start + 2 (offset, sensitivity) = data[start:end] sensitivities.append(dict(i=x, offset=offset*30, _offset=offset, sensitivity=sensitivity)) return sensitivities
[docs]def decode_bg_targets (data, bg_units): # data = data[2:] targets = [ ] for x in range(8): start = x * 3 end = start + 3 # (low, high, offset) = data[start:end] (offset, low, high) = data[start:end] if bg_units is 2: low = low / 10.0 high = high / 10.0 targets.append(dict( #i=x, offset=offset*30, _offset=offset, # _raw=str(data[start:end]).encode('hex'), low=low, high=high)) return targets
[docs]class BigBolusWizardChange (KnownRecord): opcode = 0x5a body_length = 143
[docs]class SetAutoOff (KnownRecord): opcode = 0x1b
_confirmed.append(SetAutoOff)
[docs]class ChangeAudioBolus (KnownRecord): opcode = 0x5f
[docs] def decode (self): self.parse_time( )
_confirmed.append(ChangeAudioBolus)
[docs]class ChangeCaptureEventEnable (KnownRecord): opcode = 0x83
# body_length = 1 _confirmed.append(ChangeCaptureEventEnable)
[docs]class hack53 (KnownRecord): opcode = 0x53 body_length = 1
_confirmed.append(hack53)
[docs]class hack52 (KnownRecord): opcode = 0x52
# body_length = 1 _confirmed.append(hack52)
[docs]class hack51 (KnownRecord): opcode = 0x51
# body_length = 1 _confirmed.append(hack51)
[docs]class hack55 (KnownRecord): opcode = 0x55 # body_length = 1 + 47 # body_length = 2 + 46 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) # self.larger = larger self.body_length = (self.head[1] - 1) * 3
_confirmed.append(hack55)
[docs]class hack56 (KnownRecord): opcode = 0x56 body_length = 5
_confirmed.append(hack56)
[docs]class ChangeWatchdogMarriageProfile(KnownRecord): opcode = 0x81 body_length = 5
_confirmed.append(ChangeWatchdogMarriageProfile)
[docs]class DeleteOtherDeviceID (KnownRecord): opcode = 0x82 body_length = 5
_confirmed.append(DeleteOtherDeviceID)
[docs]class ChangeOtherDeviceID (KnownRecord): opcode = 0x7d body_length = 30
_confirmed.append(ChangeOtherDeviceID)
[docs]class SetBolusWizardEnabled (KnownRecord): opcode = 0x2d
[docs] def decode (self): self.parse_time( ) return dict(enabled=self.head[1] is 1)
_confirmed.append(SetBolusWizardEnabled)
[docs]class SettingSomething57 (KnownRecord): opcode = 0x57
# body_length = 1 _confirmed.append(SettingSomething57)
[docs]class ChangeMaxBasal (KnownRecord): opcode = 0x2c
[docs] def decode (self): self.parse_time( ) return dict(maxBasal=self.head[1] / 40.0)
_confirmed.append(ChangeMaxBasal)
[docs]class questionable22 (KnownRecord): opcode = 0x22
_confirmed.append(questionable22)
[docs]class questionable23 (KnownRecord): opcode = 0x23
_confirmed.append(questionable23)
[docs]class questionable24 (KnownRecord): opcode = 0x24
_confirmed.append(questionable24)
[docs]class ChangeBGReminderEnable (KnownRecord): opcode = 0x60
[docs] def decode (self): self.parse_time( ) enabled = self.head[1] is 1 return dict(enabled=enabled)
_confirmed.append(ChangeBGReminderEnable)
[docs]class questionable61 (KnownRecord): opcode = 0x61
_confirmed.append(questionable61)
[docs]class ChangeTempBasalType (KnownRecord): opcode = 0x62
[docs] def decode (self): self.parse_time( ) temp = { 0: 'absolute', 1: 'percent' }[self.head[1]] return dict(temp=temp)
# body_length = 1 _confirmed.append(ChangeTempBasalType)
[docs]class questionable65 (KnownRecord): opcode = 0x65
_confirmed.append(questionable65)
[docs]class questionable66 (KnownRecord): opcode = 0x66
_confirmed.append(questionable66)
[docs]class questionable6f (KnownRecord): opcode = 0x6f
_confirmed.append(questionable6f)
[docs]class questionable5e (KnownRecord): opcode = 0x5e
_confirmed.append(questionable5e)
[docs]class ChangeParadigmLinkID (KnownRecord): opcode = 0x3c body_length = 14
[docs] def decode (self): self.parse_time( ) data = self.body[1:] links = [ ] links.append(str(data[0:3]).encode('hex')) links.append(str(data[3:6]).encode('hex')) links.append(str(data[7:10]).encode('hex')) return dict(links=links)
_confirmed.append(ChangeParadigmLinkID)
[docs]class ConnectDevicesOtherDevicesEnabled (KnownRecord): opcode = 0x7c
[docs] def decode(self): super(ConnectDevicesOtherDevicesEnabled, self).decode() return dict(enabled=self.head[1] == 1)
_confirmed.append(ConnectDevicesOtherDevicesEnabled)
[docs]class Model522ResultTotals(KnownRecord): opcode = 0x6d head_length = 1 date_length = 2 body_length = 40
[docs] def parse_time(self): date = parse_midnight(self.date) self.datetime = date if not hasattr(date, 'isoformat'): self.datetime = None return date
[docs] def date_str(self): result = 'unknown' if self.datetime is not None: result = self.datetime.isoformat( ) else: if len(self.date) >=2: result = "{}".format(unmask_m_midnight(self.date)) return result
# class Model522ResultTotals(KnownRecord):
[docs]class old6c(Model522ResultTotals): opcode = 0x6c #head_length = 45 #xxx non 515 # body_length = 38 # body_length = 34 # XXX: 515 only? # body_length = 31 def __init__ (self, head, model, **kwds): super(old6c, self).__init__(head, model, **kwds) self.body_length = model.old6cBody + 3
_confirmed.append(old6c)
[docs]class questionable3b (KnownRecord): opcode = 0x3b
_confirmed.append(questionable3b) from dateutil.relativedelta import relativedelta
[docs]def parse_midnight (data): mid = unmask_m_midnight(data) oneday = relativedelta(days=1) try: date = datetime(*mid) + oneday return date except ValueError, e: print "ERROR", e, lib.hexdump(data) pass return mid
[docs]def unmask_m_midnight(data): """ Extract date values from a series of bytes. Always returns tuple given a bytearray of at least 3 bytes. Returns 6-tuple of scalar values year, month, day, hours, minutes, seconds. """ data = data[:] seconds = 0 minutes = 0 hours = 0 day = parse_day(data[0]) high = data[0] >> 4 low = data[0] & 0x1F year_high = data[1] >> 4 # month = int(high) #+ year_high # month = parse_months( data[0], data[1] ) mhigh = (data[0] & 0xE0) >> 4 mlow = (data[1] & 0x80) >> 7 month = int(mhigh + mlow) day = int(low) year = parse_years(data[1]) return (year, month, day, hours, minutes, seconds)
_confirmed.append(Model522ResultTotals)
[docs]class Sara6E(Model522ResultTotals): """Seems specific to 722?""" opcode = 0x6e #head_length = 52 - 5 # body_length = 1 body_length = 48 #body_length = 0 def __init__(self, head, larger=False): super(type(self), self).__init__(head, larger) if self.larger: self.body_length = 48
[docs] def decode (self): self.parse_time( ) mid = unmask_m_midnight(self.date)[0:3] try: return (dict(valid_date=date(*mid).isoformat())) except ValueError, e: return (dict(error_date=mid, error=str(e)))
_confirmed.append(Sara6E) _known = { } _variant = { } for x in _confirmed: _known[x.opcode] = x del x
[docs]def suggest(head, larger=False, model=None): """ Look in the known table of commands to find a suitable record type for this opcode. """ klass = _known.get(head[0], Base) record = klass(head, model) return record
[docs]def parse_record(fd, head=bytearray( ), larger=False, model=None): """ Given a file-like object, and the head of a record, parse the rest of the record. Look up the type of record, read in just enough data to parse it, return the result. """ # head = bytearray(fd.read(2)) date = bytearray( ) body = bytearray( ) record = suggest(head, larger, model=model) remaining = record.head_length - len(head) if remaining > 0: head.extend(bytearray(fd.read(remaining))) if record.date_length > 0: date.extend(bytearray(fd.read(record.date_length))) if record.body_length > 0: body.extend(bytearray(fd.read(record.body_length))) record.parse( head + date + body ) # print str(record) # print record.pformat(prefix=str(record) ) return record
[docs]def describe( ): keys = _known.keys( ) out = [ ] for k in keys: out.append(_known[k].describe( )) return out
[docs]class PagedData (object): """ PagedData - context for parsing a page of cgm data. """ def __init__ (self, raw, model): self.model = model data, crc = raw[0:1022], raw[1022:] computed = lib.CRC16CCITT.compute(bytearray(data)) if lib.BangInt(crc) != computed: assert lib.BangInt(crc) == computed, "CRC does not match page data" self.raw = raw self.clean(data)
[docs] def clean (self, data): data.reverse( ) self.data = self.eat_nulls(data) self.stream = io.BufferedReader(io.BytesIO(self.data))
[docs] def eat_nulls (self, data): i = 0 while data[i] == 0x00: i = i+1 return data[i:]
[docs]class HistoryPage (PagedData):
[docs] def clean (self, data): # data.reverse( ) # self.data = self.eat_nulls(data) #self.data.reverse( ) self.data = data[:] # XXX: under some circumstances, zero is the correct value and # eat_nulls actually eats valid data. This ugly hack restores two # nulls back ot the end. """ self.data.append(0x00) self.data.append(0x00) self.data.append(0x00) self.data.append(0x00) self.data.append(0x00) """ self.stream = io.BufferedReader(io.BytesIO(self.data))
[docs] def decode (self, larger=False): records = [ ] skipped = [ ] for B in iter(lambda: bytearray(self.stream.read(2)), bytearray("")): if B == bytearray( [ 0x00, 0x00 ] ): if skipped: if len(records) > 0: last = records[-1] last.update(appended=last.get('appended', [ ]) + skipped) # records.extend(skipped) skipped = [ ] break record = parse_record(self.stream, B, larger=larger, model=self.model) data = record.decode( ) if record.datetime: rec = dict(timestamp=record.datetime.isoformat( ), _type=str(record.__class__.__name__), _body=lib.hexlify(record.body), _head=lib.hexlify(record.head), _date=lib.hexlify(record.date), _description=str(record)) if data is not None: rec.update(data) if skipped: rec.update(appended=skipped) skipped = [ ] records.append(rec) else: rec = dict(_type=str(record.__class__.__name__), _body=lib.hexlify(record.body), _head=lib.hexlify(record.head), _date=lib.hexlify(record.date), _description=str(record)) data = record.decode( ) if data is not None: rec.update(data=data) skipped.append(rec) records.reverse( ) return records
if __name__ == '__main__': import doctest doctest.testmod( ) ##### # EOF