From ae7c18d06283bbe271781f5b960fbeab8c0cfd42 Mon Sep 17 00:00:00 2001 From: Marek Ventur Date: Tue, 21 Aug 2018 20:47:39 +0100 Subject: [PATCH] This is getting too big, so commiting now: More Tilda mk4 changes --- .development/firmware_update.py | 5 ++- .development/pyboard.py | 7 ++-- .development/pyboard_util.py | 12 +++--- .development/sync.py | 4 +- badge_store/main.py | 9 +++-- lib/badge_store.py | 11 +++++- lib/buttons.py | 67 ++++++++++++++------------------- lib/dialogs.py | 60 +++++++++++++++-------------- lib/http.py | 21 ++++++++--- lib/test_dialogs.py | 25 ++++++------ lib/test_hall_effect.py | 3 +- lib/test_wifi.py | 12 +++++- lib/wifi.py | 31 +++++++++------ 13 files changed, 147 insertions(+), 120 deletions(-) diff --git a/.development/firmware_update.py b/.development/firmware_update.py index fd3b4e6..12549cc 100644 --- a/.development/firmware_update.py +++ b/.development/firmware_update.py @@ -20,7 +20,8 @@ def firmware_update(verbose): print("We couldn't find a DFU enabled badge. Please check the following:") print("") print("1) Your badge is plugged into this computer via USB") - print("2) Your badge is in DFU mode. You can tell by a small, red flashing light at the back") + print("2) The switch underneath the screen at the back of the badge is set to 'on'") + print("3) Your badge is in DFU mode. You can tell by a small, red flashing light at the back") print("") print("To put your badge into DFU mode (or if you're unsure whether it really is) you need to") print("press the joystick to the right while pressing the reset button at the back.") @@ -28,7 +29,7 @@ def firmware_update(verbose): print("After that, please try this script again.") return - print("Downloading newest firmware: ", end="") + print("Downloading newest firmware: ", end="", flush=True) with urllib.request.urlopen(url) as response: with open(temp_path, 'wb') as tmp_file: shutil.copyfileobj(response, tmp_file) diff --git a/.development/pyboard.py b/.development/pyboard.py index d8b2697..7309962 100644 --- a/.development/pyboard.py +++ b/.development/pyboard.py @@ -240,7 +240,7 @@ class Pyboard: delayed = False for attempt in range(wait + 1): try: - self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) + self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1, timeout=1, write_timeout=1) break except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: @@ -282,9 +282,8 @@ class Pyboard: time.sleep(0.01) return data - def enter_raw_repl(self): + def enter_raw_repl(self, retry_count = 2): self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program - # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() while n > 0: @@ -295,6 +294,8 @@ class Pyboard: data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): print(data) + if retry_count: + self.enter_raw_repl(retry_count - 1) raise PyboardError('could not enter raw repl') self.serial.write(b'\x04') # ctrl-D: soft reset diff --git a/.development/pyboard_util.py b/.development/pyboard_util.py index 585789a..260e464 100644 --- a/.development/pyboard_util.py +++ b/.development/pyboard_util.py @@ -6,7 +6,7 @@ _pyb = None def get_pyb(args): global _pyb if not _pyb: - print("Connected to badge:", end="") + print("Connected to badge:", end="", flush=True) if not args.device: args.device = find_tty() @@ -27,11 +27,9 @@ def close_pyb(): def stop_badge(args, verbose): pyb = get_pyb(args) - if verbose: - print("Stopping running app:", end="") + print("Stopping running app:", end="", flush=True) write_command(pyb, b'\r\x03\x03') # ctrl-C twice: interrupt any running program - if verbose: - print(" DONE") + print(" DONE") def write_command(pyb, command): flush_input(pyb) @@ -47,7 +45,7 @@ def flush_input(pyb): def soft_reset(args, verbose = True): pyb = get_pyb(args) if verbose: - print("Soft reboot:", end="") + print("Soft reboot:", end="", flush=True) write_command(pyb, b'\x04') # ctrl-D: soft reset data = pyb.read_until(1, b'soft reboot\r\n') if data.endswith(b'soft reboot\r\n'): @@ -76,7 +74,7 @@ def run(args, paths, verbose=True): pyb = get_pyb(args) if verbose: - print("Preparing execution:", end="") + print("Preparing execution:", end=" ", flush=True) # run any command or file(s) - this is mostly a copy from pyboard.py if len(paths): # we must enter raw-REPL mode to execute commands diff --git a/.development/sync.py b/.development/sync.py index a3add41..043c17d 100644 --- a/.development/sync.py +++ b/.development/sync.py @@ -31,7 +31,7 @@ def sync(storage, patterns, resources, verbose): if not found: print("WARN: No resources to copy found for pattern %s" % patterns) if not verbose: - print("Copying %s files: " % len(paths), end="") + print("Copying %s files: " % len(paths), end="", flush=True) for path in paths: if not path: continue @@ -41,7 +41,7 @@ def sync(storage, patterns, resources, verbose): if verbose: print("Copying %s..." % rel_path) else: - print(".", end="") + print(".", end="", flush=True) target = os.path.join(storage, rel_path) target_dir = os.path.dirname(target) diff --git a/badge_store/main.py b/badge_store/main.py index 13ecdd0..35e75ed 100644 --- a/badge_store/main.py +++ b/badge_store/main.py @@ -6,21 +6,21 @@ To publish apps use https://badge.emfcamp.org""" ___license___ = "MIT" ___title___ = "Badge Store" -___dependencies___ = ["app", "badge_store", "dialogs"] +___dependencies___ = ["app", "badge_store", "dialogs", "ugfx_helper"] ___categories___ = ["System"] ___bootstrapped___ = True -import ugfx +import ugfx_helper import os import wifi from dialogs import * import app from lib.badge_store import BadgeStore -ugfx.init() - ### VIEWS ### +ugfx_helper.init() + store = BadgeStore() title = "TiLDA Badge Store" @@ -89,5 +89,6 @@ def main_menu(): else: app.restart_to_default() +wifi.connect(show_wait_message=True) main_menu() #show_app("launcher") diff --git a/lib/badge_store.py b/lib/badge_store.py index cef6100..ef97c10 100644 --- a/lib/badge_store.py +++ b/lib/badge_store.py @@ -70,12 +70,19 @@ class Installer: count = 0 while get_hash(TEMP_FILE) != self.hash: count += 1 + print(count) if count > 5: - os.remove(TEMP_FILE) + try: + os.remove(TEMP_FILE) + except: + pass raise OSError("Aborting download of %s after 5 unsuccessful attempts" % self.path) try: + print("download ", self.url, self.params) get(self.url, params=self.params).raise_for_status().download_to(TEMP_FILE) - except OSError: + except OSError as e: + if "404" in str(e): + raise e pass try: os.remove(self.path) diff --git a/lib/buttons.py b/lib/buttons.py index 6f4265f..d53b6a6 100644 --- a/lib/buttons.py +++ b/lib/buttons.py @@ -1,41 +1,24 @@ -"""Convenience methods for dealing with the TiLDA buttons""" +"""Convenience methods for dealing with the TiLDA buttons + +Pins are decined in tilda.Buttons.BTN_XYZ: +BTN_0 - BTN_9, BTN_Hash, BTN_Star +BTN_A, BTN_B +BTN_Call, BTN_End +BTN_Menu +JOY_Center, JOY_Down, JOY_Left, JOY_Right, JOY_Up +""" ___license___ = "MIT" -import machine, time +import machine, time, tilda -CONFIG = { - "JOY_UP": [1, machine.Pin.PULL_DOWN], - "JOY_DOWN": [2, machine.Pin.PULL_DOWN], - "JOY_RIGHT": [4, machine.Pin.PULL_DOWN], - "JOY_LEFT": [3, machine.Pin.PULL_DOWN], - "JOY_CENTER": [0, machine.Pin.PULL_DOWN], - "BTN_MENU": [5, machine.Pin.PULL_UP] -} -# todo: port expander +# Convenience +Buttons = tilda.Buttons -_tilda_pins = {} -_tilda_interrupts = {} _tilda_bounce = {} -def _get_pin(button): - if button not in _tilda_pins: - raise ValueError("Please call button.init() first before using any other button functions") - return _tilda_pins[button] - -def init(buttons = CONFIG.keys()): - """Inits all pins used by the TiLDA badge""" - global _tilda_pins - for button in buttons: - _tilda_pins[button] = machine.Pin(CONFIG[button][0], machine.Pin.IN) - _tilda_pins[button].init(machine.Pin.IN, CONFIG[button][1]) - def is_pressed(button): - pin = _get_pin(button) - if pin.pull() == machine.Pin.PULL_DOWN: - return pin.value() > 0 - else: - return pin.value() == 0 + return tilda.Buttons.is_pressed(button) def is_triggered(button, interval = 30): """Use this function if you want buttons as a trigger for something in a loop @@ -52,22 +35,30 @@ def is_triggered(button, interval = 30): return False # The button might have bounced back to high # Wait for a while to avoid bounces to low - machine.sleep_ms(interval) + time.sleep_ms(interval) # Wait until button is released again while is_pressed(button): - machine.sleep_ms(1) + time.sleep_ms(1) _tilda_bounce[button] = time.ticks_ms() + interval return True + + +# The following functions might not work + + def has_interrupt(button): - global _tilda_interrupts - _get_pin(button) - if button in _tilda_interrupts: - return True - else: - return False + return False; + + # todo: re-enable + #global _tilda_interrupts + #_get_pin(button) + #if button in _tilda_interrupts: + # return True + #else: + # return False def enable_interrupt(button, interrupt, on_press = True, on_release = False): diff --git a/lib/dialogs.py b/lib/dialogs.py index d5d343e..f348ae2 100644 --- a/lib/dialogs.py +++ b/lib/dialogs.py @@ -32,7 +32,7 @@ def prompt_boolean(text, title="TiLDA", true_text="Yes", false_text="No", width if style == None: style = default_style_dialog ugfx.set_default_font(FONT_MEDIUM_BOLD) - window = ugfx.Container((ugfx.width() - width) // 2, (ugfx.height() - height) // 2, width, height, style=style) + window = ugfx.Container((ugfx.width() - width) // 2, (ugfx.height() - height) // 2, width, height) window.show() ugfx.set_default_font(font) window.text(5, 10, title, TILDA_COLOR) @@ -49,17 +49,15 @@ def prompt_boolean(text, title="TiLDA", true_text="Yes", false_text="No", width button_no = ugfx.Button(width // 2 + 5, height - 40, width // 2 - 15, 30 , false_text, parent=window) if false_text else None try: - buttons.init() - - button_yes.attach_input(ugfx.BTN_A,0) - if button_no: button_no.attach_input(ugfx.BTN_B,0) + #button_yes.attach_input(ugfx.BTN_A,0) # todo: re-enable once working + #if button_no: button_no.attach_input(ugfx.BTN_B,0) window.show() while True: sleep.wfi() - if buttons.is_triggered("BTN_A"): return True - if buttons.is_triggered("BTN_B"): return False + if buttons.is_triggered(buttons.Buttons.BTN_A): return True + if buttons.is_triggered(buttons.Buttons.BTN_B): return False finally: window.hide() @@ -68,13 +66,13 @@ def prompt_boolean(text, title="TiLDA", true_text="Yes", false_text="No", width if button_no: button_no.destroy() label.destroy() -def prompt_text(description, init_text = "", true_text="OK", false_text="Back", width = 300, height = 200, font=FONT_MEDIUM_BOLD, style=default_style_badge): +def prompt_text(description, init_text = "", true_text="OK", false_text="Back", font=FONT_MEDIUM_BOLD, style=default_style_badge): """Shows a dialog and keyboard that allows the user to input/change a string Returns None if user aborts with button B """ - window = ugfx.Container(int((ugfx.width()-width)/2), int((ugfx.height()-height)/2), width, height, style=style) + window = ugfx.Container(0, 0, ugfx.width(), ugfx.height()) if false_text: true_text = "M: " + true_text @@ -83,29 +81,27 @@ def prompt_text(description, init_text = "", true_text="OK", false_text="Back", if buttons.has_interrupt("BTN_MENU"): buttons.disable_interrupt("BTN_MENU") - ugfx.set_default_font(ugfx.FONT_MEDIUM) - kb = ugfx.Keyboard(0, int(height/2), width, int(height/2), parent=window) - edit = ugfx.Textbox(5, int(height/2)-30, int(width*4/5)-10, 25, text = init_text, parent=window) + ugfx.set_default_font(FONT_MEDIUM_BOLD) + kb = ugfx.Keyboard(0, ugfx.height()//2, ugfx.width(), ugfx.height()//2, parent=window) + edit = ugfx.Textbox(2, ugfx.height()//2-60, ugfx.width()-7, 25, text = init_text, parent=window) ugfx.set_default_font(FONT_SMALL) - button_yes = ugfx.Button(int(width*4/5), int(height/2)-30, int(width*1/5)-3, 25 , true_text, parent=window) - button_no = ugfx.Button(int(width*4/5), int(height/2)-30-30, int(width/5)-3, 25 , false_text, parent=window) if false_text else None + button_yes = ugfx.Button(2, ugfx.height()//2-30, ugfx.width()//2-6, 25 , true_text, parent=window) + button_no = ugfx.Button(ugfx.width()//2+2, ugfx.height()//2-30, ugfx.width()//2-6, 25 , false_text, parent=window) if false_text else None ugfx.set_default_font(font) - label = ugfx.Label(int(width/10), int(height/10), int(width*4/5), int(height*2/5)-60, description, parent=window) + label = ugfx.Label(ugfx.width()//10, ugfx.height()//10, ugfx.width()*4//5, ugfx.height()*2//5-90, description, parent=window) try: - buttons.init() - - button_yes.attach_input(ugfx.BTN_MENU,0) - if button_no: button_no.attach_input(ugfx.BTN_B,0) + #button_yes.attach_input(ugfx.BTN_MENU,0) # todo: re-enable this + #if button_no: button_no.attach_input(ugfx.BTN_B,0) window.show() - edit.set_focus() + # edit.set_focus() todo: do we need this? while True: sleep.wfi() ugfx.poll() - #if buttons.is_triggered("BTN_A"): return edit.text() - if buttons.is_triggered("BTN_B"): return None - if buttons.is_triggered("BTN_MENU"): return edit.text() + #if buttons.is_triggered(buttons.Buttons.BTN_A): return edit.text() + if buttons.is_triggered(buttons.Buttons.BTN_B): return None + if buttons.is_triggered(buttons.Buttons.BTN_Menu): return edit.text() finally: window.hide() @@ -143,7 +139,7 @@ def prompt_option(options, index=0, text = "Please select one of the following:" options_list.add_item(option["title"]) else: options_list.add_item(str(option)) - options_list.selected_index(index) + options_list.set_selected_index(index) select_text = "A: " + select_text if none_text: @@ -153,14 +149,20 @@ def prompt_option(options, index=0, text = "Please select one of the following:" button_none = ugfx.Button(ugfx.width() - 160, ugfx.height() - 50, 140, 30 , none_text, parent=window) if none_text else None try: - buttons.init() - while True: sleep.wfi() ugfx.poll() - if buttons.is_triggered("BTN_A"): return options[options_list.selected_index()] - if button_none and buttons.is_triggered("BTN_B"): return None - if button_none and buttons.is_triggered("BTN_MENU"): return None + # todo: temporary hack + if (buttons.is_triggered(buttons.Buttons.JOY_Up)): + index = max(index - 1, 0) + options_list.set_selected_index(index) + if (buttons.is_triggered(buttons.Buttons.JOY_Down)): + index = min(index + 1, len(options) - 1) + options_list.set_selected_index(index) + + if buttons.is_triggered(buttons.Buttons.BTN_A): return options[options_list.get_selected_index()] + if button_none and buttons.is_triggered(buttons.Buttons.BTN_B): return None + if button_none and buttons.is_triggered(buttons.Buttons.BTN_Menu): return None finally: window.hide() diff --git a/lib/http.py b/lib/http.py index c5f2298..574e1be 100644 --- a/lib/http.py +++ b/lib/http.py @@ -168,11 +168,7 @@ def open_http_socket(method, url, json=None, timeout=None, headers=None, data=No content = None # ToDo: Handle IPv6 addresses - if is_ipv4_address(host): - addr = (host, port) - else: - ai = usocket.getaddrinfo(host, port) - addr = ai[0][4] + addr = get_address_info(host, port) sock = None if proto == 'https:': @@ -203,6 +199,21 @@ def open_http_socket(method, url, json=None, timeout=None, headers=None, data=No return sock +def get_address_info(host, port, retries_left = 20): + try: + if is_ipv4_address(host): + addr = (host, port) + else: + return usocket.getaddrinfo(host, port)[0][4] + except OSError as e: + if ("-15" in str(e)) and retries_left: + # [addrinfo error -15] + # This tends to happen after startup and goes away after a while + time.sleep_ms(200) + return get_address_info(host, port, retries_left - 1) + else: + raise e + # Adapted from upip def request(method, url, json=None, timeout=None, headers=None, data=None, params=None): sock = open_http_socket(method, url, json, timeout, headers, data, params) diff --git a/lib/test_dialogs.py b/lib/test_dialogs.py index 571b41b..a4f1679 100644 --- a/lib/test_dialogs.py +++ b/lib/test_dialogs.py @@ -3,9 +3,9 @@ Very limited at the moment since we can't test the main input dialogs""" ___license___ = "MIT" -___dependencies___ = ["upip:unittest", "dialogs", "sleep"] +___dependencies___ = ["upip:unittest", "dialogs", "sleep", "ugfx_helper"] -import unittest, ugfx +import unittest, ugfx, ugfx_helper from machine import Pin from dialogs import * from sleep import * @@ -13,20 +13,21 @@ from sleep import * class TestDialogs(unittest.TestCase): def setUpClass(self): - ugfx.init() - Pin(Pin.PWM_LCD_BLIGHT).on() + ugfx_helper.init() def tearDownClass(self): - Pin(Pin.PWM_LCD_BLIGHT).off() + ugfx_helper.deinit() - def test_app_object(self): - count_max = 10 - with WaitingMessage("Testing...", "Foo") as c: - for i in range(1, count_max): - sleep_ms(100) - c.text = "%d/%d" % (i, count_max) +# def test_waiting(self): +# count_max = 3 +# with WaitingMessage("Testing...", "Foo") as c: +# for i in range(1, count_max): +# c.text = "%d/%d" % (i, count_max) +# +# print("done") - print("done") + def test_(self): + prompt_text("description") diff --git a/lib/test_hall_effect.py b/lib/test_hall_effect.py index f786186..ac76da2 100644 --- a/lib/test_hall_effect.py +++ b/lib/test_hall_effect.py @@ -3,7 +3,7 @@ ___license___ = "MIT" ___dependencies___ = ["upip:unittest", "hall_effect"] -import unittest, hall_effect +import unittest, hall_effect, speaker class TestHallEffect(unittest.TestCase): @@ -12,6 +12,5 @@ class TestHallEffect(unittest.TestCase): self.assertTrue(flux > 0) self.assertTrue(flux < 4000) - if __name__ == '__main__': unittest.main() diff --git a/lib/test_wifi.py b/lib/test_wifi.py index 673bd5c..880c089 100644 --- a/lib/test_wifi.py +++ b/lib/test_wifi.py @@ -3,13 +3,21 @@ ___license___ = "MIT" ___dependencies___ = ["upip:unittest", "wifi"] -import unittest, wifi +import unittest, wifi, ugfx +from machine import Pin class TestWifi(unittest.TestCase): + def setUpClass(self): + ugfx.init() + Pin(Pin.PWM_LCD_BLIGHT).on() + + def tearDownClass(self): + Pin(Pin.PWM_LCD_BLIGHT).off() + def test_connect(self): wifi.connect() - self.assertTrue(wifi.is_connected()) + self.assertTrue(wifi.is_connected(show_wait_message=True)) if __name__ == '__main__': unittest.main() diff --git a/lib/wifi.py b/lib/wifi.py index c833862..3ef236a 100644 --- a/lib/wifi.py +++ b/lib/wifi.py @@ -31,9 +31,7 @@ def ssid(): return connection_details()["ssid"] def connect(wait=True, timeout=10, show_wait_message=False, prompt_on_fail=True, dialog_title='TiLDA'): - retry_connect = True - - while retry_connect: + while True: if nic().isconnected(): return @@ -60,7 +58,7 @@ def connect(wait=True, timeout=10, show_wait_message=False, prompt_on_fail=True, text="Failed to connect to '%s'" % details['ssid'], title=dialog_title, true_text="Try again", - false_text="Forget it", + false_text="Change it", ) if not retry_connect: os.remove('wifi.json') @@ -76,11 +74,11 @@ def connect_wifi(details, timeout, wait=False): nic().connect(details['ssid']) if wait: - wait_until = time.ticks_ms() + 2000 + wait_until = time.ticks_ms() + timeout * 1000 while not nic().isconnected(): #nic().update() # todo: do we need this? if (time.ticks_ms() > wait_until): - raise Exception("Timeout while trying to connect to wifi") + raise OSError("Timeout while trying to connect to wifi") sleep.sleep_ms(100) @@ -88,6 +86,7 @@ def is_connected(): return nic().isconnected() def get_security_level(ap): + #todo: fix this n = nic() levels = {} try: @@ -106,22 +105,30 @@ def get_security_level(ap): def choose_wifi(dialog_title='TiLDA'): filtered_aps = [] with dialogs.WaitingMessage(text='Scanning for networks...', title=dialog_title): - visible_aps = nic().list_aps() - visible_aps.sort(key=lambda x:x['rssi'], reverse=True) + visible_aps = None + while not visible_aps: + visible_aps = nic().scan() + print(visible_aps) + sleep.sleep_ms(300) + #todo: timeout + print(visible_aps) + visible_aps.sort(key=lambda x:x[3], reverse=True) + print(visible_aps) # We'll get one result for each AP, so filter dupes for ap in visible_aps: - title = ap['ssid'] - security = get_security_level(ap) + title = ap[0] + security = "?" # todo: re-add get_security_level(ap) if security: title = title + ' (%s)' % security ap = { 'title': title, - 'ssid': ap['ssid'], - 'security': security, + 'ssid': ap[0], + 'security': ap[4], } if ap['ssid'] not in [ a['ssid'] for a in filtered_aps ]: filtered_aps.append(ap) del visible_aps + print(filtered_aps) ap = dialogs.prompt_option( filtered_aps,