diff --git a/src/seedsigner/controller.py b/src/seedsigner/controller.py index 206ad2c0a..f0a952874 100644 --- a/src/seedsigner/controller.py +++ b/src/seedsigner/controller.py @@ -398,6 +398,15 @@ def start_screensaver(self): # Start the screensaver, but it will block until it can acquire the Renderer.lock. self.screensaver.start() print("Controller: Screensaver started") + + + def reset_screensaver_timeout(self): + """ + Reset the screensaver's timeout starting point to right now (i.e. make it think + that zero time has elapsed since the last user interaction). + """ + from seedsigner.hardware.buttons import HardwareButtons + HardwareButtons.get_instance().update_last_input_time() def activate_toast(self, toast_manager_thread: BaseToastOverlayManagerThread): diff --git a/src/seedsigner/gui/components.py b/src/seedsigner/gui/components.py index e505635ae..2046f4e17 100644 --- a/src/seedsigner/gui/components.py +++ b/src/seedsigner/gui/components.py @@ -29,6 +29,8 @@ class GUIConstants: BITCOIN_ORANGE = "#FF9416" TESTNET_COLOR = "#00F100" REGTEST_COLOR = "#00CAF1" + GREEN_INDICATOR_COLOR = "#00FF00" + INACTIVE_COLOR = "#414141" ICON_FONT_NAME__FONT_AWESOME = "Font_Awesome_6_Free-Solid-900" ICON_FONT_NAME__SEEDSIGNER = "seedsigner-icons" diff --git a/src/seedsigner/gui/screens/scan_screens.py b/src/seedsigner/gui/screens/scan_screens.py index 13d3dcf78..e1af8e157 100644 --- a/src/seedsigner/gui/screens/scan_screens.py +++ b/src/seedsigner/gui/screens/scan_screens.py @@ -7,10 +7,10 @@ from seedsigner.hardware.buttons import HardwareButtonsConstants from seedsigner.hardware.camera import Camera from seedsigner.models.decode_qr import DecodeQR, DecodeQRStatus -from seedsigner.models.threads import BaseThread +from seedsigner.models.threads import BaseThread, ThreadsafeCounter -from .screen import BaseScreen, ButtonListScreen -from ..components import GUIConstants, Fonts, TextArea +from .screen import BaseScreen +from ..components import GUIConstants, Fonts, SeedSignerIconConstants @@ -47,6 +47,9 @@ class ScanScreen(BaseScreen): framerate: int = 6 # TODO: alternate optimization for Pi Zero 2W? render_rect: tuple[int,int,int,int] = None + FRAME__ADDED_PART = 1 + FRAME__REPEATED_PART = 2 + FRAME__MISS = 3 def __post_init__(self): from seedsigner.hardware.camera import Camera @@ -58,17 +61,22 @@ def __post_init__(self): self.camera = Camera.get_instance() self.camera.start_video_stream_mode(resolution=self.resolution, framerate=self.framerate, format="rgb") + self.frames_decode_status = ThreadsafeCounter() + self.frames_decoded_counter = ThreadsafeCounter() + self.threads.append(ScanScreen.LivePreviewThread( camera=self.camera, decoder=self.decoder, renderer=self.renderer, instructions_text=self.instructions_text, render_rect=self.render_rect, + frame_decode_status=self.frames_decode_status, + frames_decoded_counter=self.frames_decoded_counter, )) class LivePreviewThread(BaseThread): - def __init__(self, camera: Camera, decoder: DecodeQR, renderer: renderer.Renderer, instructions_text: str, render_rect: tuple[int,int,int,int]): + def __init__(self, camera: Camera, decoder: DecodeQR, renderer: renderer.Renderer, instructions_text: str, render_rect: tuple[int,int,int,int], frame_decode_status: ThreadsafeCounter, frames_decoded_counter: ThreadsafeCounter): self.camera = camera self.decoder = decoder self.renderer = renderer @@ -77,6 +85,9 @@ def __init__(self, camera: Camera, decoder: DecodeQR, renderer: renderer.Rendere self.render_rect = render_rect else: self.render_rect = (0, 0, self.renderer.canvas_width, self.renderer.canvas_height) + self.frame_decode_status = frame_decode_status + self.frames_decoded_counter = frames_decoded_counter + self.last_frame_decoded_count = self.frames_decoded_counter.cur_count self.render_width = self.render_rect[2] - self.render_rect[0] self.render_height = self.render_rect[3] - self.render_rect[1] self.decoder_fps = "0.0" @@ -85,12 +96,15 @@ def __init__(self, camera: Camera, decoder: DecodeQR, renderer: renderer.Rendere def run(self): - from timeit import default_timer as timer - instructions_font = Fonts.get_font(GUIConstants.BODY_FONT_NAME, GUIConstants.BUTTON_FONT_SIZE) + # pre-calculate how big the animated QR percent display can be + left, _, right, _ = instructions_font.getbbox("100%") + progress_text_width = right - left + start_time = time.time() num_frames = 0 + debug = False show_framerate = False # enable for debugging / testing while self.keep_running: frame = self.camera.read_video_stream(as_image=True) @@ -98,16 +112,22 @@ def run(self): num_frames += 1 cur_time = time.time() cur_fps = num_frames / (cur_time - start_time) - if self.decoder and self.decoder.get_percent_complete() > 0 and self.decoder.is_psbt: - scan_text = str(self.decoder.get_percent_complete()) + "% Complete" - if show_framerate: - scan_text += f" {cur_fps:0.2f} | {self.decoder_fps}" - else: + + scan_text = None + progress_percentage = self.decoder.get_percent_complete() + if progress_percentage == 0: + # We've just started scanning, no results yet if show_framerate: scan_text = f"{cur_fps:0.2f} | {self.decoder_fps}" else: scan_text = self.instructions_text + elif debug: + # Special debugging output for animated QRs + scan_text = f"{self.decoder.get_percent_complete()}% | {self.decoder.get_percent_complete(weight_mixed_frames=True)}% (new)" + if show_framerate: + scan_text += f" {cur_fps:0.2f} | {self.decoder_fps}" + with self.renderer.lock: if frame.width > self.render_width or frame.height > self.render_height: frame = frame.resize( @@ -115,14 +135,13 @@ def run(self): resample=Image.NEAREST # Use nearest neighbor for max speed ) - draw = ImageDraw.Draw(frame) - if scan_text: # Note: shadowed text (adding a 'stroke' outline) can # significantly slow down the rendering. # Temp solution: render a slight 1px shadow behind the text # TODO: Replace the instructions_text with a disappearing # toast/popup (see: QR Brightness UI)? + draw = ImageDraw.Draw(frame) draw.text(xy=( int(self.renderer.canvas_width/2 + 2), self.renderer.canvas_height - GUIConstants.EDGE_PADDING + 2 @@ -142,8 +161,82 @@ def run(self): font=instructions_font, anchor="ms") + else: + # Render the progress bar + rectangle = Image.new('RGBA', (self.renderer.canvas_width - 2*GUIConstants.EDGE_PADDING, GUIConstants.BUTTON_HEIGHT), (0, 0, 0, 0)) + draw = ImageDraw.Draw(rectangle) + + # Start with a background rounded rectangle, same dims as the buttons + overlay_color = (0, 0, 0, 191) # opacity ranges from 0-255 + draw.rounded_rectangle( + ( + (0, 0), + (rectangle.width, rectangle.height) + ), + fill=overlay_color, + radius=8, + outline=overlay_color, + width=2, + ) + + progress_bar_thickness = 4 + progress_bar_width = rectangle.width - 2*GUIConstants.EDGE_PADDING - progress_text_width - int(GUIConstants.EDGE_PADDING/2) + progress_bar_xy = ( + (GUIConstants.EDGE_PADDING, int((rectangle.height - progress_bar_thickness) / 2)), + (GUIConstants.EDGE_PADDING + progress_bar_width, int(rectangle.height + progress_bar_thickness) / 2) + ) + draw.rounded_rectangle( + progress_bar_xy, + fill=GUIConstants.INACTIVE_COLOR, + radius=8 + ) + + progress_percentage = self.decoder.get_percent_complete(weight_mixed_frames=True) + draw.rounded_rectangle( + ( + progress_bar_xy[0], + (GUIConstants.EDGE_PADDING + int(progress_percentage * progress_bar_width / 100.0), progress_bar_xy[1][1]) + ), + fill=GUIConstants.GREEN_INDICATOR_COLOR, + radius=8 + ) + + + draw.text( + xy=(rectangle.width - GUIConstants.EDGE_PADDING, int(rectangle.height / 2)), + text=f"{progress_percentage}%", + # text=f"100%", + fill=GUIConstants.BODY_FONT_COLOR, + font=instructions_font, + anchor="rm", # right-justified, middle + ) + + frame.paste(rectangle, (GUIConstants.EDGE_PADDING, self.renderer.canvas_height - GUIConstants.EDGE_PADDING - rectangle.height), rectangle) + + # Render the dot to indicate successful QR frame read + indicator_size = 10 + self.last_frame_decoded_count = self.frames_decoded_counter.cur_count + status_color_map = { + ScanScreen.FRAME__ADDED_PART: GUIConstants.SUCCESS_COLOR, + ScanScreen.FRAME__REPEATED_PART: GUIConstants.INACTIVE_COLOR, + ScanScreen.FRAME__MISS: None, + } + status_color = status_color_map.get(self.frame_decode_status.cur_count) + if status_color: + # Good! Most recent frame successfully decoded. + # Draw the onscreen indicator dot + draw = ImageDraw.Draw(frame) + draw.ellipse( + ( + (self.renderer.canvas_width - GUIConstants.EDGE_PADDING - indicator_size, self.renderer.canvas_height - GUIConstants.EDGE_PADDING - GUIConstants.BUTTON_HEIGHT - GUIConstants.COMPONENT_PADDING - indicator_size), + (self.renderer.canvas_width - GUIConstants.EDGE_PADDING, self.renderer.canvas_height - GUIConstants.EDGE_PADDING - GUIConstants.BUTTON_HEIGHT - GUIConstants.COMPONENT_PADDING) + ), + fill=status_color, + outline="black", + width=1, + ) + self.renderer.show_image(frame, show_direct=True) - # print(f" {cur_fps:0.2f} | {self.decoder_fps}") if self.camera._video_stream is None: break @@ -169,6 +262,21 @@ def _run(self): if status in (DecodeQRStatus.COMPLETE, DecodeQRStatus.INVALID): self.camera.stop_video_stream_mode() break + + self.frames_decoded_counter.increment() + # Notify the live preview thread how our most recent decode went + if status == DecodeQRStatus.FALSE: + # Did not find anything to decode in the current frame + self.frames_decode_status.set_value(self.FRAME__MISS) + + else: + if status == DecodeQRStatus.PART_COMPLETE: + # We received a valid frame that added new data + self.frames_decode_status.set_value(self.FRAME__ADDED_PART) + + elif status == DecodeQRStatus.PART_EXISTING: + # We received a valid frame, but we've already seen in + self.frames_decode_status.set_value(self.FRAME__REPEATED_PART) if self.hw_inputs.check_for_low(HardwareButtonsConstants.KEY_RIGHT) or self.hw_inputs.check_for_low(HardwareButtonsConstants.KEY_LEFT): self.camera.stop_video_stream_mode() diff --git a/src/seedsigner/hardware/camera.py b/src/seedsigner/hardware/camera.py index 32dc182d8..59ff41016 100644 --- a/src/seedsigner/hardware/camera.py +++ b/src/seedsigner/hardware/camera.py @@ -38,7 +38,7 @@ def read_video_stream(self, as_image=False): return frame else: if frame is not None: - return Image.fromarray(frame.astype('uint8'), 'RGB').rotate(90 + self._camera_rotation) + return Image.fromarray(frame.astype('uint8'), 'RGB').convert('RGBA').rotate(90 + self._camera_rotation) return None diff --git a/src/seedsigner/helpers/ur2/fountain_decoder.py b/src/seedsigner/helpers/ur2/fountain_decoder.py index 69eb64e3c..9e8dcdf34 100644 --- a/src/seedsigner/helpers/ur2/fountain_decoder.py +++ b/src/seedsigner/helpers/ur2/fountain_decoder.py @@ -4,7 +4,7 @@ # Copyright © 2020 Foundation Devices, Inc. # Licensed under the "BSD-2-Clause Plus Patent License" # - +import time from .fountain_utils import choose_fragments, contains, is_strict_subset, set_difference from .utils import join_lists, join_bytes, crc32_int, xor_with, take_first @@ -71,13 +71,53 @@ def result_message(self): def result_error(self): return self.result - def estimated_percent_complete(self): + + def estimated_percent_complete(self, weight_mixed_frames: bool = False): + """ + Weighted mixed frame method: + * counts completed frames + * counts each additional frame that is currently XORed in a mixed frame; its + score is weighted by the number of frames mixed together (1/num frames mixed). + """ if self.is_complete(): return 1 if self.expected_part_indexes == None: return 0 - estimated_input_parts = self.expected_part_count() * 1.75 - return min(0.99, self.processed_parts_count / estimated_input_parts) + + if not weight_mixed_frames: + # Original estimation method + estimated_input_parts = self.expected_part_count() * 1.75 + return min(0.99, self.processed_parts_count / estimated_input_parts) + else: + parts = self.expected_part_count() if self.expected_part_indexes != None else 'None' + mixed = [] + mixed_index_scoring = {} + mixed_set = set() + for indexes, p in self.mixed_parts.items(): + if not indexes: + continue + mixed.append(self.indexes_to_string(indexes)) + mixed_set.update(indexes) + score = 1.0 / float(len(indexes)) + for index in indexes: + if index not in mixed_index_scoring: + mixed_index_scoring[index] = 0.0 + + # sum up partial scores + mixed_index_scoring[index] += score + + mixed_score = 0.0 + for index, score in mixed_index_scoring.items(): + # set a ceiling; don't let an index in a mixed/XOR frame + # achieve equal weight as a fully decoded frame. Also if + # the ceiling is too high, can potentially see your + # reported progress percentage DECREASE during a decode. + mixed_score += min(score, 0.75) + + num_complete = len(self.received_part_indexes) + weighted_estimate = (num_complete + mixed_score) / float(parts) + return weighted_estimate + def receive_part(self, encoder_part): # Don't process the part if we're already done @@ -93,6 +133,9 @@ def receive_part(self, encoder_part): self.last_part_indexes = p.indexes self.enqueue(p) + num_complete = len(self.received_part_indexes) + num_mixed_frames = len(self.mixed_parts) + # Process the queue until we're done or the queue is empty while not self.is_complete() and len(self.queued_parts) != 0: self.process_queue_item() @@ -101,6 +144,12 @@ def receive_part(self, encoder_part): self.processed_parts_count += 1 # self.print_part_end() + # self.print_state() + + if num_complete == len(self.received_part_indexes) and num_mixed_frames == len(self.mixed_parts): + # This part didn't add any new info + # print("No new data") + return False return True @@ -114,6 +163,7 @@ def enqueue(self, p): self.queued_parts.append(p) def process_queue_item(self): + start = time.time() part = self.queued_parts.pop(0) # self.print_part(part) @@ -121,6 +171,8 @@ def process_queue_item(self): self.process_simple_part(part) else: self.process_mixed_part(part) + + # print(f"Queue processing: {int((time.time() - start)*1000.0)}ms") # self.print_state() def reduce_mixed_by(self, p): @@ -141,6 +193,7 @@ def reduce_mixed_by(self, p): new_mixed[reduced_part.indexes] = reduced_part self.mixed_parts = new_mixed + # print(self.mixed_parts.keys()) def reduce_part_by_part(self, a, b): # If the fragments mixed into `b` are a strict (proper) subset of those in `a`... @@ -266,13 +319,23 @@ def print_part_end(self): print("processed: {}, expected: {}, received: {}, percent: {}%".format(self.processed_parts_count, expected, len(self.received_part_indexes), percent)) def print_state(self): - parts = self.expected_part_count() if self.expected_part_indexes != None else 'None' - received = self.indexes_to_string(self.received_part_indexes) + guesstimate = self.estimated_percent_complete(weight_mixed_frames=True) + original_metric = self.estimated_percent_complete() mixed = [] - for indexes, p in self.mixed_parts.items(): - mixed.append(self.indexes_to_string(indexes)) - - mixed_s = "[{}]".format(', '.join(mixed)) - queued = len(self.queued_parts) - res = self.result_description() - print('parts: {}, received: {}, mixed: {}, queued: {}, result: {}'.format(parts, received, mixed_s, queued, res)) + mixed_set = set() + try: + for indexes, p in self.mixed_parts.items(): + if not indexes or len(indexes) == 0: + continue + mixed.append(self.indexes_to_string(indexes)) + mixed_set.update(indexes) + + num_complete = len(self.received_part_indexes) + + mixed_s = "[{}]".format(', '.join(mixed)) + queued = len(self.queued_parts) + print(f"{original_metric*100.0:5.1f}% | {guesstimate*100.0:5.1f}% | done: {num_complete:2d}, mixed: {len(mixed_set):2d}, queued: {queued}, frames: {self.processed_parts_count:2d} | {mixed_s}") + + except Exception as e: + import traceback + traceback.print_exc() diff --git a/src/seedsigner/helpers/ur2/ur_decoder.py b/src/seedsigner/helpers/ur2/ur_decoder.py index c0baf21f8..bb282b09b 100644 --- a/src/seedsigner/helpers/ur2/ur_decoder.py +++ b/src/seedsigner/helpers/ur2/ur_decoder.py @@ -153,8 +153,8 @@ def last_part_indexes(self): def processed_parts_count(self): return self.fountain_decoder.processed_parts_count - def estimated_percent_complete(self): - return self.fountain_decoder.estimated_percent_complete() + def estimated_percent_complete(self, weight_mixed_frames: bool = False): + return self.fountain_decoder.estimated_percent_complete(weight_mixed_frames=weight_mixed_frames) def is_success(self): result = self.result diff --git a/src/seedsigner/models/decode_qr.py b/src/seedsigner/models/decode_qr.py index a020c3127..da5f25269 100644 --- a/src/seedsigner/models/decode_qr.py +++ b/src/seedsigner/models/decode_qr.py @@ -120,11 +120,14 @@ def add_data(self, data): qr_str = data if self.qr_type in [QRType.PSBT__UR2, QRType.OUTPUT__UR, QRType.ACCOUNT__UR, QRType.BYTES__UR]: - self.decoder.receive_part(qr_str) + added_part = self.decoder.receive_part(qr_str) if self.decoder.is_complete(): self.complete = True return DecodeQRStatus.COMPLETE - return DecodeQRStatus.PART_COMPLETE # segment added to ur2 decoder + if added_part: + return DecodeQRStatus.PART_COMPLETE + else: + return DecodeQRStatus.PART_EXISTING else: # All other formats use the same method signature @@ -219,12 +222,12 @@ def get_wallet_descriptor(self): return self.decoder.get_wallet_descriptor() - def get_percent_complete(self) -> int: + def get_percent_complete(self, weight_mixed_frames: bool = False) -> int: if not self.decoder: return 0 if self.qr_type in [QRType.PSBT__UR2, QRType.OUTPUT__UR, QRType.ACCOUNT__UR, QRType.BYTES__UR]: - return int(self.decoder.estimated_percent_complete() * 100) + return int(self.decoder.estimated_percent_complete(weight_mixed_frames=weight_mixed_frames) * 100) elif self.qr_type in [QRType.PSBT__SPECTER]: if self.decoder.total_segments == None: @@ -261,6 +264,7 @@ def is_psbt(self) -> bool: QRType.PSBT__BASE43, ] + @property def is_seed(self): return self.qr_type in [ @@ -305,7 +309,7 @@ def is_settings(self): @staticmethod - def extract_qr_data(image, is_binary:bool = False) -> str: + def extract_qr_data(image, is_binary:bool = False) -> str | None: if image is None: return None @@ -337,10 +341,10 @@ def detect_segment_type(s, wordlist_language_code=None): # PSBT if re.search("^UR:CRYPTO-PSBT/", s, re.IGNORECASE): return QRType.PSBT__UR2 - + elif re.search("^UR:CRYPTO-OUTPUT/", s, re.IGNORECASE): return QRType.OUTPUT__UR - + elif re.search("^UR:CRYPTO-ACCOUNT/", s, re.IGNORECASE): return QRType.ACCOUNT__UR @@ -362,10 +366,10 @@ def detect_segment_type(s, wordlist_language_code=None): elif re.search(r'^\{\"label\".*\"descriptor\"\:.*', desc_str, re.IGNORECASE): # if json starting with label and contains descriptor, assume specter wallet json return QRType.WALLET__SPECTER - + elif "multisig setup file" in s.lower(): return QRType.WALLET__CONFIGFILE - + elif "sortedmulti" in s: return QRType.WALLET__GENERIC @@ -392,7 +396,7 @@ def detect_segment_type(s, wordlist_language_code=None): _4LETTER_WORDLIST = [word[:4].strip() for word in wordlist] except: _4LETTER_WORDLIST = [] - + if all(x in wordlist for x in s.strip().split(" ")): # checks if all words in list are in bip39 word list return QRType.SEED__MNEMONIC @@ -408,7 +412,7 @@ def detect_segment_type(s, wordlist_language_code=None): # Probably this isn't meant to be string data; check if it's valid byte data # below. pass - + # Is it byte data? # 32 bytes for 24-word CompactSeedQR; 16 bytes for 12-word CompactSeedQR if len(s) == 32 or len(s) == 16: diff --git a/src/seedsigner/views/scan_views.py b/src/seedsigner/views/scan_views.py index 17f713337..7bac9a459 100644 --- a/src/seedsigner/views/scan_views.py +++ b/src/seedsigner/views/scan_views.py @@ -47,6 +47,10 @@ def run(self): decoder=self.decoder ) + # A long scan might have exceeded the screensaver timeout; ensure screensaver + # doesn't immediately engage when we leave here. + self.controller.reset_screensaver_timeout() + # Handle the results if self.decoder.is_complete: if not self.is_valid_qr_type: