bootstrap.py + switch to badgeserver

philcrump-phil-add-ntp
Marek Ventur 2018-08-25 16:52:30 +01:00
parent 14007f3d9e
commit d640329911
7 changed files with 154 additions and 46 deletions

View File

@ -1,8 +1,146 @@
# minimal one-file ota-bootstrap
"""Bootstraps the badge by downloading the base software"""
import usocket, ujson, os, time, gc, wifi
import ugfx, machine, network, json, time, usocket, os
# todo
HOST = "badgeserver.emfcamp.org"
if __name__ == '__main__':
bootstrap()
# Helpers
def msg(text):
ugfx.clear()
ugfx.text(5, 5, "EMF 2018", ugfx.BLACK)
ugfx.text(5, 30, "TiLDA Mk4", ugfx.BLACK)
lines = text.split("\n")
print(lines[0])
for i, line in enumerate(lines):
ugfx.text(5, 65 + i * 20, line, ugfx.BLACK)
def wifi_details():
if not "wifi.json" in os.listdir():
with open("wifi.json", "w") as f:
f.write('{"ssid":"emfcamp","pw":"emfemf"}')
with open("wifi.json") as f:
return json.loads(f.read())
def connect(wifi):
details = wifi_details()
if 'pw' in details:
wifi.connect(details['ssid'], details['pw'])
else:
wifi.connect(details['ssid'])
wait_until = time.ticks_ms() + 10000
while not wifi.isconnected():
if (time.ticks_ms() > wait_until):
raise OSError("Timeout while trying to\nconnect to wifi.\n\nPlease connect your\nbadge to your computer\nand edit wifi.json with\nyour wifi details");
time.sleep(0.1)
def get(path):
s = usocket.socket()
s.connect(usocket.getaddrinfo(HOST, 80)[0][4])
body = b""
status = None
try:
s.send('GET /2018/%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, HOST))
state = 1
hbuf = b""
clen = 9999999
headers = {}
while len(body) < clen:
buf = s.recv(1024)
if state == 1: # Status
nl = buf.find(b"\n")
if nl > -1:
hbuf += buf[:nl - 1]
status = int(hbuf.split(b' ')[1])
state = 2
hbuf = b"";
buf = buf[nl + 1:]
else:
hbuf += buf
if state == 2: # Headers
hbuf += buf
nl = hbuf.find(b"\n")
while nl > -1:
if nl < 2:
buf = hbuf[2:]
hbuf = None
state = 3
clen = int(headers["content-length"])
break
header = hbuf[:nl - 1].decode("utf8").split(':', 3)
headers[header[0].strip().lower()] = header[1].strip()
hbuf = hbuf[nl + 1:]
nl = hbuf.find(b"\n")
if state == 3: # Content
body += buf
finally:
s.close()
if status != 200:
raise Exception("HTTP %d for %s" % (status, path))
return body
# os.path bits
def split(path):
if path == "":
return ("", "")
r = path.rsplit("/", 1)
if len(r) == 1:
return ("", path)
head = r[0]
if not head:
head = "/"
return (head, r[1])
def dirname(path):
return split(path)[0]
def exists(path):
try:
os.stat(path)[0]
return True
except OSError:
return False
def makedirs(path):
sub_path = split(path)[0]
if sub_path and (not exists(sub_path)):
makedirs(sub_path)
if not exists(path):
os.mkdir(path)
# Steps
def step_wifi():
msg("Connecting to wifi...");
wifi = network.WLAN()
wifi.active(True)
if not wifi.isconnected():
connect(wifi)
def step_download():
msg("Connecting to server...")
files = list(json.loads(get("bootstrap")).keys())
for i, file in enumerate(files):
msg("Downloading - %d%%\n%s" % (100 * i // len(files), file))
makedirs(dirname(file))
with open(file, 'wb') as f:
f.write(get("download?repo=emfcamp/Mk4-Apps&path=%s" % file))
os.sync()
def step_goodbye():
msg("All done!\n\nRestarting badge...")
time.sleep(2)
machine.reset()
ugfx.init()
machine.Pin(machine.Pin.PWM_LCD_BLIGHT).on()
try:
step_wifi()
step_download()
step_goodbye()
except Exception as e:
msg("Error\nSomething went wrong :(\n\n" + str(e))
raise e

View File

@ -1,26 +0,0 @@
"""App that gets backed into the firmware.
It's only purpose is to download the base operating system on first boot.
It's not meant to be executed from the launcher"""
___license___ = "MIT"
___title___ = "Bootstrap"
___categories___ = ["System"]
___dependencies___ = ["badge_store", "dialogs"]
___launchable___ = False
___builtin___ = True
import ugfx, wifi, badge_store, machine, dialogs
ugfx.init()
machine.Pin(machine.Pin.PWM_LCD_BLIGHT).on()
wifi.connect(show_wait_message=True)
with dialogs.WaitingMessage(title="Setting up TiLDA Mk4", text="Please wait...") as message:
installers = badge_store.BadgeStore().bootstrap()
n = len(installers)
for i, installer in enumerate(installers):
message.text = "%s (%s/%s)" % (installer.path, i + 1, n)
installer.download()
machine.reset()

View File

@ -8,7 +8,7 @@ from http import *
import hashlib, binascii
class BadgeStore:
def __init__(self, url = "http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="master"):
def __init__(self, url = "http://badgeserver.emfcamp.org", repo="emfcamp/Mk4-Apps", ref="master"):
self.url = url
self.repo = repo
self.ref = ref

View File

@ -10,7 +10,7 @@ Current known issues:
___license___ = "MIT"
___dependencies___ = ["urlencode", "wifi"]
import usocket, ujson, os, time, gc, wifi
import usocket, ujson, os, time, gc, wifi, ussl
from urlencode import urlencode
"""Usage
@ -144,8 +144,6 @@ def open_http_socket(method, url, json=None, timeout=None, headers=None, data=No
if proto == 'http:':
port = 80
elif proto == 'https:':
#todo make this work
raise OSError("HTTPS is currently not supported")
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
@ -170,11 +168,11 @@ def open_http_socket(method, url, json=None, timeout=None, headers=None, data=No
# ToDo: Handle IPv6 addresses
addr = get_address_info(host, port)
sock = None
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
if proto == 'https:':
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET)
else:
sock = usocket.socket()
# todo: fix this
sock = ussl.wrap_socket(sock, ca_certs="DST Root CA X3", cert_reqs=ussl.CERT_OPTIONAL) # ,
if params:
urlpath += "?" + urlencode(params)

View File

@ -13,9 +13,9 @@ class TestHttp(unittest.TestCase):
wifi.connect()
def test_get_with_https(self):
with self.assertRaises(OSError) as context:
get("https://httpbin.org/get")
self.assertIn("HTTPS is currently not supported", str(context.exception))
with get("https://httpbin.org/get") as response:
self.assertEqual(response.status, 200)
print(response.text)
def test_get(self):
with get("http://httpbin.org/get", params={"foo": "bar"}, headers={"accept": "application/json"}) as response:

View File

@ -6,8 +6,6 @@ ___dependencies___ = ["upip:unittest", "speaker", "sleep"]
import unittest, speaker, ugfx_helper
from sleep import *
ugfx_helper.init()
class TestSpeaker(unittest.TestCase):
def tearDown(self):

View File

@ -35,7 +35,7 @@ def mode_buttons():
ugfx.clear()
ugfx.text(5, 5, "Synth", ugfx.BLACK)
ugfx.text(5, 30, "Use the buttons >", ugfx.BLACK)
ugfx.text(5, 80, "Octave: 4", ugfx.BLUE) # Make this adjustable
ugfx.text(5, 80, "Octave: 4", ugfx.BLUE) # Allow the octave to be changed
alive = True
while alive:
@ -49,4 +49,4 @@ def mode_buttons():
else:
speaker.stop()
mode_buttons()
mode_buttons() # Todo: Allow different modes and allow users to switch between them via joystick or something