-
Notifications
You must be signed in to change notification settings - Fork 2
/
ota.py
executable file
·194 lines (166 loc) · 8.63 KB
/
ota.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import sys
import argparse
from intelhex import IntelHex
from bluepy.btle import Scanner, DefaultDelegate, Peripheral, ADDR_TYPE_RANDOM
from struct import pack, unpack
from enum import Enum
class NotificationDelegate(DefaultDelegate):
def __init__(self, ota):
DefaultDelegate.__init__(self)
self.ota = ota
def handleNotification(self, cHandle, data):
self.ota.notification(cHandle, data)
class OtaError(Enum):
OTA_SUCCESS = 0
OTA_FLASH_VERIFY_ERROR = 0x3c
OTA_FLASH_WRITE_ERROR = 0xff
OTA_SEQUENCE_ERROR = 0xf0
OTA_CHECKSUM_ERROR = 0x0f
class Ota():
OTA_SERVICE_UUID = "8a97f7c0-8506-11e3-baa7-0800200c9a66"
IMAGE_CHARACTERISTIC_UUID = "122e8cc0-8508-11e3-baa7-0800200c9a66"
NEW_IMAGE_CHARACTERISTIC_UUID = "210f99f0-8508-11e3-baa7-0800200c9a66"
NEW_IMAGE_TRANSFER_UNIT_CONTENT_CHARACTERISTIC_UUID = "2691aa80-8508-11e3-baa7-0800200c9a66"
NEW_IMAGE_EXPECTED_TRANSFER_UNIT_CHARACTERISTIC_UUID = "2bdc5760-8508-11e3-baa7-0800200c9a66"
notification_interval = 8
peripheral = None
service = None
image_char = None
new_image_char = None
new_image_tu_char = None
new_image_expected_tu_char = None
delegate = None
data = None
upload_in_progress = False
sequence = 0
last_sequence = 0
def __init__(self, peripheral):
self.peripheral = peripheral
self.service = peripheral.getServiceByUUID(self.OTA_SERVICE_UUID)
characteristics = self.service.getCharacteristics()
for c in characteristics:
if c.uuid == self.IMAGE_CHARACTERISTIC_UUID:
self.image_char = c
elif c.uuid == self.NEW_IMAGE_CHARACTERISTIC_UUID:
self.new_image_char = c
elif c.uuid == self.NEW_IMAGE_TRANSFER_UNIT_CONTENT_CHARACTERISTIC_UUID:
self.new_image_tu_char = c
elif c.uuid == self.NEW_IMAGE_EXPECTED_TRANSFER_UNIT_CHARACTERISTIC_UUID:
self.new_image_expected_tu_char = c
if (self.image_char is None or
self.new_image_char is None or
self.new_image_tu_char is None or
self.new_image_expected_tu_char is None):
raise Exception("Could not find all characteristics in OTA service on %s" % (peripheral))
self.read_free_space_info()
print("Initialized OTA application, application space 0x%x-0x%x (%d bytes)" % (self.free_space_start,
self.free_space_end, self.free_space_end - self.free_space_start))
self.delegate = NotificationDelegate(self)
self.peripheral.withDelegate(self.delegate)
def read_free_space_info(self):
response = self.image_char.read()
# while being little endian everywhere, the free space addresses are explicitly reversed in firmware
self.free_space_start, self.free_space_end = unpack(">II", response)
"""
Writes to the new image characteristic of the OTA service: Prepare firmware update
:param base_addr start address of firmware image
:param size size in bytes of firmware image
:param notification_interval changes the number of BLE GATT blocks before an ACK/ERROR cycle with the device is
done. Implementation in OTA service hasn't been studied enough, for now use with 1 only
"""
def write_new_image_characteristic(self, base_addr, size, notification_interval = 8):
if (base_addr < self.free_space_start or
base_addr > self.free_space_end or
base_addr + size > self.free_space_end):
# Careful! while the OTA library application (as of v2.1.0) does not come with any range checks,
# the flash erase is actually implemented only in the reset or ota service managers, not in the OTA
# application. Writing outside of the allowed range would work but transform the flash contents
# into garbage!
# Also, writing the same device range (with a different image) a second time would work but misses the
# erase cycle except a reset occured!
raise Exception("program image out of allowed range (image: 0x%x-0x%x, device: 0x%x-0x%x)" %
(base_addr, base_addr + size, self.free_space_start, self.free_space_end))
data = pack("<BII", self.notification_interval, size, base_addr)
self.new_image_char.write(data)
res_ni, res_size, res_base = self.read_new_image_characteristic()
if (res_ni != notification_interval or res_size != size or res_base != base_addr):
raise Exception("writing new image characteristic verify failed!")
print("wrote new image characteristic: Programming %x (len: %d)" % (base_addr, size))
"""
Reads new image characteristic of OTA service.
:return tuple of notification_interval, size, base_addr
"""
def read_new_image_characteristic(self):
response = self.new_image_char.read()
return unpack("<BII", response)
def register_notification(self, valHandle, enabled):
data = pack("<H", 1 if enabled else 0)
self.peripheral.writeCharacteristic(valHandle + 1, data)
def program(self, data):
min_addr = data.minaddr()
max_addr = data.maxaddr()
self.write_new_image_characteristic(min_addr, max_addr - min_addr, self.notification_interval)
self.register_notification(self.new_image_expected_tu_char.getHandle(), True)
#self.write_image_block(0, data.tobinarray(min_addr, min_addr + 15))
self.data = data
self.sequence = 0
self.last_sequence = ((max_addr - min_addr + 15) >> 4) - 1
print("Starting upload, last sequence: %d" % (self.last_sequence))
self.upload_in_progress = True
while self.upload_in_progress:
if self.peripheral.waitForNotifications(10) == False:
raise Exception("Upload failed with timeout")
def write_image_block(self, sequence, request_ack):
image = self.data.tobinarray(self.data.minaddr() + sequence * 16, self.data.minaddr() + sequence * 16 + 15)
needs_ack = 1 if request_ack else 0
checksum = needs_ack ^ (sequence % 256) ^ (sequence >> 8)
for b in image:
checksum ^= b
data = pack("<B16sBH", checksum, bytes(image), needs_ack, sequence)
self.new_image_tu_char.write(data)
def notification(self, handle, data):
if handle == self.new_image_expected_tu_char.getHandle():
next_sequence, error = unpack("<HH", data)
status = OtaError(error)
#print("Received notification for expected tu, seq: %d, status: %s" % (next_sequence, status))
print("\rsending block %d of %d" % (next_sequence, self.last_sequence), end='\r')
if error != 0:
print("\nReceived notification for expected tu, seq: %d, status: %s" % (next_sequence, status))
print("\nReceived error, retrying...")
sys.stdout.flush()
if next_sequence <= self.last_sequence:
for i in range(next_sequence, next_sequence + self.notification_interval):
request_ack = i == next_sequence + self.notification_interval - 1
if i <= self.last_sequence:
if i == self.last_sequence:
request_ack = True
#print("write image seq %d with ack %r" % (i, request_ack))
self.write_image_block(i, request_ack)
else:
print("\nUpload finished")
else:
print("\nReceived notification for %d with %s" % (handle, data.hex()))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--scan", help="Scans for Bluetooth low energy devices", action="store_true")
parser.add_argument("--device", help="Connects to given bluetooth device and give OTA information")
parser.add_argument("--program", help="Writes firmware image onto device. Due to erase procedure, you might get"
+ "an error on a non-empty device and have to try the same command a second time.")
args = parser.parse_args()
ota_service = None
if args.scan:
scanner = Scanner()
devices = scanner.scan(5.0)
for dev in devices:
print("Device %s (%s), RSSI=%d dB, connectable=%r" % (dev.addr, dev.addrType, dev.rssi, dev.connectable))
for (adtype, desc, value) in dev.getScanData():
print(" %s = %s" % (desc, value))
if args.device:
peripheral = Peripheral(args.device, ADDR_TYPE_RANDOM)
ota_service = Ota(peripheral)
if args.device and args.program:
ih = IntelHex()
ih.fromfile(args.program, format='hex')
ota_service.program(ih)
if __name__ == "__main__":
main()