* Add upip dependencies

* Add firmware updater
* Add urlencode lib
* Add tests for http library
* Support form encoding in http post
dont-delete-test-download-branch
Marek Ventur 2018-07-27 23:35:39 +01:00
parent 992d00dc0f
commit 1fd7f5c0db
155 changed files with 27456 additions and 71 deletions

View File

@ -6,6 +6,7 @@ _pyb = None
def get_pyb(args):
global _pyb
if not _pyb:
print("Connected to badge:", end="")
if not args.device:
args.device = find_tty()
@ -13,10 +14,10 @@ def get_pyb(args):
try:
_pyb = Pyboard(args.device, args.baudrate, None, None, args.wait)
except PyboardError as er:
print(" FAIL")
print(er)
sys.exit(1)
print("Connected to badge.")
print(" DONE")
return _pyb
def close_pyb():
@ -24,10 +25,13 @@ def close_pyb():
if _pyb:
_pyb.close()
def stop_badge(args):
def stop_badge(args, verbose):
pyb = get_pyb(args)
print("stopping running app")
if verbose:
print("Stopping running app:", end="")
write_command(pyb, b'\r\x03\x03') # ctrl-C twice: interrupt any running program
if verbose:
print(" DONE")
def write_command(pyb, command):
flush_input(pyb)
@ -42,14 +46,15 @@ def flush_input(pyb):
def soft_reset(args):
pyb = get_pyb(args)
print("trying to soft reboot badge")
print("Soft reboot:", end="")
write_command(pyb, b'\x04') # ctrl-D: soft reset
#print("1")
data = pyb.read_until(1, b'soft reboot\r\n')
#print("2")
if data.endswith(b'soft reboot\r\n'):
print("Soft reboot was successful.")
print(" DONE")
else:
print(" FAIL")
raise PyboardError('could not soft reboot')
def find_tty():
@ -60,10 +65,17 @@ def find_tty():
print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary")
sys.exit(1)
def check_run(args):
if args.command is not None or len(args.paths):
for filename in args.paths:
with open(filename, 'r') as f:
pyfile = f.read()
compile(pyfile + '\n', filename, 'exec')
def run(args):
pyb = get_pyb(args)
print("executing %s" % args.paths)
print("----------------")
print("Preparing execution:", end="")
# run any command or file(s) - this is mostly a copy from pyboard.py
if args.command is not None or len(args.paths):
# we must enter raw-REPL mode to execute commands
@ -71,9 +83,11 @@ def run(args):
try:
pyb.enter_raw_repl()
except PyboardError as er:
print(" FAIL")
print(er)
pyb.close()
sys.exit(1)
print(" DONE")
def execbuffer(buf):
try:
@ -93,6 +107,7 @@ def run(args):
# run any files
for filename in args.paths:
with open(filename, 'rb') as f:
print("-------- %s --------" % filename)
pyfile = f.read()
execbuffer(pyfile)

543
.development/pydfu.py Normal file
View File

@ -0,0 +1,543 @@
#!/usr/bin/env python
# This file is part of the OpenMV project.
# Copyright (c) 2013/2014 Ibrahim Abdelkader <i.abdalkader@gmail.com>
# This work is licensed under the MIT license, see the file LICENSE for
# details.
"""This module implements enough functionality to program the STM32F4xx over
DFU, without requiring dfu-util.
See app note AN3156 for a description of the DFU protocol.
See document UM0391 for a dscription of the DFuse file.
"""
from __future__ import print_function
import argparse
import re
import struct
import sys
import usb.core
import usb.util
import zlib
# VID/PID
__VID = 0x0483
__PID = 0xdf11
# USB request __TIMEOUT
__TIMEOUT = 4000
# DFU commands
__DFU_DETACH = 0
__DFU_DNLOAD = 1
__DFU_UPLOAD = 2
__DFU_GETSTATUS = 3
__DFU_CLRSTATUS = 4
__DFU_GETSTATE = 5
__DFU_ABORT = 6
# DFU status
__DFU_STATE_APP_IDLE = 0x00
__DFU_STATE_APP_DETACH = 0x01
__DFU_STATE_DFU_IDLE = 0x02
__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03
__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04
__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05
__DFU_STATE_DFU_MANIFEST_SYNC = 0x06
__DFU_STATE_DFU_MANIFEST = 0x07
__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08
__DFU_STATE_DFU_UPLOAD_IDLE = 0x09
__DFU_STATE_DFU_ERROR = 0x0a
_DFU_DESCRIPTOR_TYPE = 0x21
# USB device handle
__dev = None
__verbose = None
# USB DFU interface
__DFU_INTERFACE = 0
import inspect
if 'length' in inspect.getfullargspec(usb.util.get_string).args:
# PyUSB 1.0.0.b1 has the length argument
def get_string(dev, index):
return usb.util.get_string(dev, 255, index)
else:
# PyUSB 1.0.0.b2 dropped the length argument
def get_string(dev, index):
return usb.util.get_string(dev, index)
def init():
"""Initializes the found DFU device so that we can program it."""
global __dev
devices = get_dfu_devices(idVendor=__VID, idProduct=__PID)
if not devices:
raise ValueError('No DFU device found')
if len(devices) > 1:
raise ValueError("Multiple DFU devices found")
__dev = devices[0]
__dev.set_configuration()
# Claim DFU interface
usb.util.claim_interface(__dev, __DFU_INTERFACE)
# Clear status
clr_status()
def clr_status():
"""Clears any error status (perhaps left over from a previous session)."""
__dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE,
None, __TIMEOUT)
def get_status():
"""Get the status of the last operation."""
stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE,
6, 20000)
# print (__DFU_STAT[stat[4]], stat)
return stat[4]
def mass_erase():
"""Performs a MASS erase (i.e. erases the entire device."""
# Send DNLOAD with first byte=0x41
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE,
"\x41", __TIMEOUT)
# Execute last command
if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY:
raise Exception("DFU: erase failed")
# Check command state
if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE:
raise Exception("DFU: erase failed")
def page_erase(addr):
"""Erases a single page."""
if __verbose:
print("Erasing page: 0x%x..." % (addr))
# Send DNLOAD with first byte=0x41 and page address
buf = struct.pack("<BI", 0x41, addr)
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY:
raise Exception("DFU: erase failed")
# Check command state
if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE:
raise Exception("DFU: erase failed")
def set_address(addr):
"""Sets the address for the next operation."""
# Send DNLOAD with first byte=0x21 and page address
buf = struct.pack("<BI", 0x21, addr)
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY:
raise Exception("DFU: set address failed")
# Check command state
if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE:
raise Exception("DFU: set address failed")
def write_memory(addr, buf, progress=None, progress_addr=0, progress_size=0):
"""Writes a buffer into memory. This routine assumes that memory has
already been erased.
"""
xfer_count = 0
xfer_bytes = 0
xfer_total = len(buf)
xfer_base = addr
while xfer_bytes < xfer_total:
if __verbose and xfer_count % 512 == 0:
print ("Addr 0x%x %dKBs/%dKBs..." % (xfer_base + xfer_bytes,
xfer_bytes // 1024,
xfer_total // 1024))
if progress and xfer_count % 2 == 0:
progress(progress_addr, xfer_base + xfer_bytes - progress_addr,
progress_size)
# Set mem write address
set_address(xfer_base+xfer_bytes)
# Send DNLOAD with fw data
# the "2048" is the DFU transfer size supported by the ST DFU bootloader
# TODO: this number should be extracted from the USB config descriptor
chunk = min(2048, xfer_total-xfer_bytes)
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE,
buf[xfer_bytes:xfer_bytes + chunk], __TIMEOUT)
# Execute last command
if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY:
raise Exception("DFU: write memory failed")
# Check command state
if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE:
raise Exception("DFU: write memory failed")
xfer_count += 1
xfer_bytes += chunk
def write_page(buf, xfer_offset):
"""Writes a single page. This routine assumes that memory has already
been erased.
"""
xfer_base = 0x08000000
# Set mem write address
set_address(xfer_base+xfer_offset)
# Send DNLOAD with fw data
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY:
raise Exception("DFU: write memory failed")
# Check command state
if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE:
raise Exception("DFU: write memory failed")
if __verbose:
print ("Write: 0x%x " % (xfer_base + xfer_offset))
def exit_dfu():
"""Exit DFU mode, and start running the program."""
# set jump address
set_address(0x08000000)
# Send DNLOAD with 0 length to exit DFU
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE,
None, __TIMEOUT)
try:
# Execute last command
if get_status() != __DFU_STATE_DFU_MANIFEST:
print("Failed to reset device")
# Release device
usb.util.dispose_resources(__dev)
except:
pass
def named(values, names):
"""Creates a dict with `names` as fields, and `values` as values."""
return dict(zip(names.split(), values))
def consume(fmt, data, names):
"""Parses the struct defined by `fmt` from `data`, stores the parsed fields
into a named tuple using `names`. Returns the named tuple, and the data
with the struct stripped off."""
size = struct.calcsize(fmt)
return named(struct.unpack(fmt, data[:size]), names), data[size:]
def cstring(string):
"""Extracts a null-terminated string from a byte array."""
return string.decode('utf-8').split('\0', 1)[0]
def compute_crc(data):
"""Computes the CRC32 value for the data passed in."""
return 0xFFFFFFFF & -zlib.crc32(data) - 1
def read_dfu_file(filename):
"""Reads a DFU file, and parses the individual elements from the file.
Returns an array of elements. Each element is a dictionary with the
following keys:
num - The element index
address - The address that the element data should be written to.
size - The size of the element ddata.
data - The element data.
If an error occurs while parsing the file, then None is returned.
"""
print("File: {}".format(filename))
with open(filename, 'rb') as fin:
data = fin.read()
crc = compute_crc(data[:-4])
elements = []
# Decode the DFU Prefix
#
# <5sBIB
# < little endian
# 5s char[5] signature "DfuSe"
# B uint8_t version 1
# I uint32_t size Size of the DFU file (not including suffix)
# B uint8_t targets Number of targets
dfu_prefix, data = consume('<5sBIB', data,
'signature version size targets')
print (" %(signature)s v%(version)d, image size: %(size)d, "
"targets: %(targets)d" % dfu_prefix)
for target_idx in range(dfu_prefix['targets']):
# Decode the Image Prefix
#
# <6sBI255s2I
# < little endian
# 6s char[6] signature "Target"
# B uint8_t altsetting
# I uint32_t named bool indicating if a name was used
# 255s char[255] name name of the target
# I uint32_t size size of image (not incl prefix)
# I uint32_t elements Number of elements in the image
img_prefix, data = consume('<6sBI255s2I', data,
'signature altsetting named name '
'size elements')
img_prefix['num'] = target_idx
if img_prefix['named']:
img_prefix['name'] = cstring(img_prefix['name'])
else:
img_prefix['name'] = ''
print(' %(signature)s %(num)d, alt setting: %(altsetting)s, '
'name: "%(name)s", size: %(size)d, elements: %(elements)d'
% img_prefix)
target_size = img_prefix['size']
target_data, data = data[:target_size], data[target_size:]
for elem_idx in range(img_prefix['elements']):
# Decode target prefix
# < little endian
# I uint32_t element address
# I uint32_t element size
elem_prefix, target_data = consume('<2I', target_data, 'addr size')
elem_prefix['num'] = elem_idx
print(' %(num)d, address: 0x%(addr)08x, size: %(size)d'
% elem_prefix)
elem_size = elem_prefix['size']
elem_data = target_data[:elem_size]
target_data = target_data[elem_size:]
elem_prefix['data'] = elem_data
elements.append(elem_prefix)
if len(target_data):
print("target %d PARSE ERROR" % target_idx)
# Decode DFU Suffix
# < little endian
# H uint16_t device Firmware version
# H uint16_t product
# H uint16_t vendor
# H uint16_t dfu 0x11a (DFU file format version)
# 3s char[3] ufd 'UFD'
# B uint8_t len 16
# I uint32_t crc32
dfu_suffix = named(struct.unpack('<4H3sBI', data[:16]),
'device product vendor dfu ufd len crc')
print (' usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, '
'dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x' % dfu_suffix)
if crc != dfu_suffix['crc']:
print("CRC ERROR: computed crc32 is 0x%08x" % crc)
return
data = data[16:]
if data:
print("PARSE ERROR")
return
return elements
class FilterDFU(object):
"""Class for filtering USB devices to identify devices which are in DFU
mode.
"""
def __call__(self, device):
for cfg in device:
for intf in cfg:
return (intf.bInterfaceClass == 0xFE and
intf.bInterfaceSubClass == 1)
def get_dfu_devices(*args, **kwargs):
"""Returns a list of USB device which are currently in DFU mode.
Additional filters (like idProduct and idVendor) can be passed in to
refine the search.
"""
# convert to list for compatibility with newer pyusb
return list(usb.core.find(*args, find_all=True,
custom_match=FilterDFU(), **kwargs))
def get_memory_layout(device):
"""Returns an array which identifies the memory layout. Each entry
of the array will contain a dictionary with the following keys:
addr - Address of this memory segment
last_addr - Last address contained within the memory segment.
size - size of the segment, in bytes
num_pages - number of pages in the segment
page_size - size of each page, in bytes
"""
cfg = device[0]
intf = cfg[(0, 0)]
mem_layout_str = get_string(device, intf.iInterface)
mem_layout = mem_layout_str.split('/')
result = []
for mem_layout_index in range(1, len(mem_layout), 2):
addr = int(mem_layout[mem_layout_index], 0)
segments = mem_layout[mem_layout_index + 1].split(',')
seg_re = re.compile(r'(\d+)\*(\d+)(.)(.)')
for segment in segments:
seg_match = seg_re.match(segment)
num_pages = int(seg_match.groups()[0], 10)
page_size = int(seg_match.groups()[1], 10)
multiplier = seg_match.groups()[2]
if multiplier == 'K':
page_size *= 1024
if multiplier == 'M':
page_size *= 1024 * 1024
size = num_pages * page_size
last_addr = addr + size - 1
result.append(named((addr, last_addr, size, num_pages, page_size),
"addr last_addr size num_pages page_size"))
addr += size
return result
def list_dfu_devices(*args, **kwargs):
"""Prints a lits of devices detected in DFU mode."""
devices = get_dfu_devices(*args, **kwargs)
if not devices:
print("No DFU capable devices found")
return
for device in devices:
print("Bus {} Device {:03d}: ID {:04x}:{:04x}"
.format(device.bus, device.address,
device.idVendor, device.idProduct))
layout = get_memory_layout(device)
print("Memory Layout")
for entry in layout:
print(" 0x{:x} {:2d} pages of {:3d}K bytes"
.format(entry['addr'], entry['num_pages'],
entry['page_size'] // 1024))
def write_elements(elements, mass_erase_used, progress=None):
"""Writes the indicated elements into the target memory,
erasing as needed.
"""
mem_layout = get_memory_layout(__dev)
for elem in elements:
addr = elem['addr']
size = elem['size']
data = elem['data']
elem_size = size
elem_addr = addr
if progress:
progress(elem_addr, 0, elem_size)
while size > 0:
write_size = size
if not mass_erase_used:
for segment in mem_layout:
if addr >= segment['addr'] and \
addr <= segment['last_addr']:
# We found the page containing the address we want to
# write, erase it
page_size = segment['page_size']
page_addr = addr & ~(page_size - 1)
if addr + write_size > page_addr + page_size:
write_size = page_addr + page_size - addr
page_erase(page_addr)
break
write_memory(addr, data[:write_size], progress,
elem_addr, elem_size)
data = data[write_size:]
addr += write_size
size -= write_size
if progress:
progress(elem_addr, addr - elem_addr, elem_size)
def cli_progress(addr, offset, size):
"""Prints a progress report suitable for use on the command line."""
width = 25
done = offset * width // size
print("\r0x{:08x} {:7d} [{}{}] {:3d}% "
.format(addr, size, '=' * done, ' ' * (width - done),
offset * 100 // size), end="")
sys.stdout.flush()
if offset == size:
print("")
def main():
"""Test program for verifying this files functionality."""
global __verbose
# Parse CMD args
parser = argparse.ArgumentParser(description='DFU Python Util')
#parser.add_argument("path", help="file path")
parser.add_argument(
"-l", "--list",
help="list available DFU devices",
action="store_true",
default=False
)
parser.add_argument(
"-m", "--mass-erase",
help="mass erase device",
action="store_true",
default=False
)
parser.add_argument(
"-u", "--upload",
help="read file from DFU device",
dest="path",
default=False
)
parser.add_argument(
"-v", "--verbose",
help="increase output verbosity",
action="store_true",
default=False
)
args = parser.parse_args()
__verbose = args.verbose
if args.list:
list_dfu_devices(idVendor=__VID, idProduct=__PID)
return
init()
if args.mass_erase:
print ("Mass erase...")
mass_erase()
if args.path:
elements = read_dfu_file(args.path)
if not elements:
return
print("Writing memory...")
write_elements(elements, args.mass_erase, progress=cli_progress)
print("Exiting DFU...")
exit_dfu()
return
print("No command specified")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,49 @@
from pydfu import *
import urllib.request, tempfile, os, shutil, ssl
def firmware_update(verbose):
global __verbose
__verbose = verbose
temp_path = tempfile.mktemp("firmware.dfu")
url = "https://update.badge.emfcamp.org/firmware.dfu"
print("Hello - Welcome to the automated TiLDA Mk4 firmware updater")
print("Finding badge: ", end="")
try:
init()
print("DONE")
print("Downloading newest firmware: ", end="")
context = ssl._create_unverified_context()
with urllib.request.urlopen(url, context=context) as response:
with open(temp_path, 'wb') as tmp_file:
shutil.copyfileobj(response, tmp_file)
print("DONE")
elements = read_dfu_file(temp_path)
if not elements:
return
print("Resetting Badge: ", end="")
mass_erase()
print("DONE")
print("Updating...")
write_elements(elements, True, progress=cli_progress)
exit_dfu()
print("")
print("You can now restart your badge by pressing the reset button on the back. Please follow the instructions on the screen to finish the setup")
print("Have a nice day!")
except ValueError as e:
print("FAIL")
print("")
print("We couldn't find your badge. You need to make sure it's plugged in and in DFU mode.")
print("To put your badge into DFU mode you need to press the joystick in the middle while pressing the reset button at the back.")
print("After that, please try this script again.")
print()
print("Error: %s" %(e))
finally:
if os.path.isfile(temp_path): os.remove(temp_path)

View File

@ -30,8 +30,9 @@ This module has the following operations:
resources = get_resources(path) # Gets resources for a given path
add_hashes(path, resources) # Adds hashes to the file dict - not needed for testing
add_metadata(path, resources) # Adds metadata
resolve_dependencies(resources) # Merges all dependencies into each resource's file dict
validate(resources) # Runs basic validation
resolve_dependencies(resources) # Merges all dependencies into each resource's file dict
remove_upip(resources) # Remove upip resources from dict again
This module encapsulates all the main operations the app library is expect to
perform on a given checkout. It's intentionally kept in one file to make it easier
@ -72,14 +73,30 @@ def get_resources(path):
if sub_path.startswith(".") or sub_path == "__pycache__":
continue
full_path = os.path.join(path, sub_path)
if os.path.islink(full_path):
continue
if os.path.isfile(full_path):
result[sub_path] = {"type": "root", "files": {sub_path: None}}
continue
files = _scan_files(full_path, sub_path)
if sub_path in ["lib", "shared"]:
files = _scan_files(full_path, sub_path)
for rel_path in files:
result[rel_path] = {"type": sub_path, "files": {rel_path: None}}
elif sub_path == "upip":
for upip_lib in os.listdir(full_path):
if upip_lib.startswith(".") or upip_lib == "__pycache__":
continue
full_lib_path = os.path.join(full_path, upip_lib)
files = {}
if os.path.isfile(full_lib_path):
files = {full_lib_path: None}
upip_lib = upip_lib.rsplit('.', 1)[0]
else:
for rel_path in _scan_files(full_lib_path, os.path.join(sub_path, upip_lib)):
files[rel_path] = None
result["upip:%s" % upip_lib] = {"type": sub_path, "files": files}
else:
files = _scan_files(full_path, sub_path)
result[sub_path] = {"type": "app", "files": {}}
for rel_path in files:
result[sub_path]["files"][rel_path] = None
@ -131,16 +148,10 @@ def add_metadata(path, resources):
def _normalize_metadata(metadata):
metadata['description'] = metadata.pop('doc')
if 'dependencies' in metadata:
metadata['dependencies'] = [_normalize_lib(l) for l in metadata.pop('dependencies')]
metadata['dependencies'] = [normalize_dependency(l) for l in metadata.pop('dependencies')]
return metadata
def _normalize_lib(lib):
"""lib dependencies can be shortened to just their module name"""
if "." in lib or "/" in lib:
return lib
return "lib/%s.py" % lib
"""
resolve_dependencies(resources)
@ -195,6 +206,21 @@ def _validate_resource(path, resource):
if 'categories' not in resource or (not isinstance(resource['categories'], list)) or len(resource['categories']) == 0:
resource.setdefault("errors", []).append("___categories___ list is required in main.py but not found")
"""
remove_upip(resources)
upip adds over a 100 resources to the list. Some of them have broken validation as well, so it's
useful to remove them after resolving dependencies.
"""
def remove_upip(resources):
to_delete = []
for key, resource in resources.items():
if resource['type'] == "upip":
to_delete.append(key)
for key in to_delete:
del resources[key]
"""
helpers
"""
@ -209,3 +235,12 @@ def get_error_summary(resources):
summary += "\n"
return summary.strip()
def pretty_print_resources(resources):
import json
return json.dumps(resources, indent=4)
def normalize_dependency(dependency):
"""lib dependencies can be shortened to just their module name"""
if "." in dependency or "/" in dependency or "upip:" in dependency:
return dependency
return "lib/%s.py" % dependency

View File

@ -1,30 +1,46 @@
import os, glob, shutil, sys
import os, shutil, sys, fnmatch
def sync(storage, patterns):
def sync(storage, patterns, resources, verbose):
root = get_root()
# Add all paths that are already files
paths = [os.path.join(root, p) for p in (patterns or []) if os.path.isfile(os.path.join(root, p))]
paths = set([p for p in (patterns or []) if os.path.isfile(os.path.join(root, p))])
if patterns:
new_patterns = []
patterns = [os.path.join(root, p, "**") for p in patterns]
else:
patterns = ["**/**", "boot.py"]
# Always copy boot.py
paths.add("boot.py")
# wifi.json
wifi_path = os.path.join(root, "wifi.json")
if os.path.isfile(wifi_path):
paths.add(wifi_path)
if not patterns:
patterns = ["*"]
for pattern in patterns:
for path in glob.glob(pattern):
paths.append(path)
if len(paths) == 0:
print("No files to copy found for pattern %s" % patterns)
sys.exit(1)
found = False
for key, resource in resources.items():
if fnmatch.fnmatch(key, pattern):
found = True
if verbose:
print("Resource %s is going to be synced" % key)
for path in resource['files'].keys():
paths.add(path)
if not found:
print("WARN: No resources to copy found for pattern %s" % patterns)
if not verbose:
print("Copying %s files: " % len(paths), end="")
for path in paths:
rel_path = os.path.relpath(path, root)
if rel_path.startswith("."):
if not path:
continue
print("Copying %s..." % rel_path)
rel_path = os.path.relpath(path, root)
if rel_path.startswith(".") or os.path.isdir(path) or os.path.islink(path):
continue
if verbose:
print("Copying %s..." % rel_path)
else:
print(".", end="")
target = os.path.join(storage, rel_path)
target_dir = os.path.dirname(target)
@ -35,9 +51,10 @@ def sync(storage, patterns):
os.makedirs(target_dir)
shutil.copy2(path, target)
else:
if verbose:
print("Files copied successfully")
else:
print(" DONE")
def set_boot_app(storage, app_to_boot):
path = os.path.join(storage, 'once.txt')
@ -47,7 +64,8 @@ def set_boot_app(storage, app_to_boot):
pass
with open(path, 'w') as f:
f.write(app_to_boot + "\n")
print("setting next boot to %s" % app_to_boot)
if app_to_boot:
print("setting next boot to %s" % app_to_boot)
def get_root():
root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..'))

View File

@ -16,7 +16,7 @@ $ tilda_tools sync
Update files in folder(s) to match current local version
$ tilda_tools sync my_game shared
$ tilda_tools sync <folder1> <folder2> ...
$ tilda_tools sync <pattern1> <pattern2> ...
Sync (as above), but execute my_app after reboot
$ tilda_toold.py sync --boot my_app [<other sync parameter>]
@ -36,6 +36,9 @@ $ tilda_tools test
Update firmware on badge (warning, this will delete all settings etc. stored on the badge!)
$ tilda_tools firmware-update
Setup wifi.json to be copied to the badge on every sync
$ tilda_tools wifi
Common parameters
-----------------
@ -45,16 +48,18 @@ Common parameters
"""
import sys, glob
import sync, pyboard_util
import sync, pyboard_util, wifi, pydfu_util
from resources import *
def main():
import argparse
cmd_parser = argparse.ArgumentParser(description='Toolchain for working with the TiLDA Mk4')
cmd_parser.add_argument('command', nargs=1, help='command [test|reset|sync|run]')
cmd_parser.add_argument('command', nargs=1, help='command [test|reset|sync|run|validate|wifi|firmware-update]', choices=['test', 'reset', 'sync', 'validate', 'run', 'wifi', 'firmware-update'])
cmd_parser.add_argument('-d', '--device', help='the serial device of the badge')
cmd_parser.add_argument('-s', '--storage', help='the usb mass storage path of the badge')
cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
cmd_parser.add_argument('-v', '--verbose', action='store_true', help='adds more output')
cmd_parser.add_argument('--print_resources', action='store_true', help='prints resources in json')
cmd_parser.add_argument('--boot', help='defines which app to boot into after reboot')
cmd_parser.add_argument('--run', help='like run, but after a sync')
cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
@ -63,11 +68,20 @@ def main():
command = args.command[0]
path = sync.get_root()
if command in ["test", "validate"]:
if command == "firmware-update":
pydfu_util.firmware_update(args.verbose)
if command == "wifi":
wifi.select_wifi()
if command in ["test", "validate", "sync"]:
resources = get_resources(path)
add_metadata(path, resources)
resolve_dependencies(resources)
validate(path, resources)
resolve_dependencies(resources)
remove_upip(resources)
if args.print_resources:
print(pretty_print_resources(resources))
errors = get_error_summary(resources)
if errors:
print("Problems found:\n")
@ -76,15 +90,20 @@ def main():
print("Local Test: PASS")
if command == "test":
command = "sync"
args.path = []
args.run = "test/main.py"
if len(args.paths) == 0:
args.run = "test/main.py"
else:
if "." not in args.paths[0]:
args.paths[0] = "lib/%s.py" % args.paths[0]
args.run = args.paths[0]
if command in ["reset", "sync"]:
pyboard_util.stop_badge(args)
pyboard_util.stop_badge(args, args.verbose)
if command == "sync":
paths = args.paths if len(args.paths) else None
sync.sync(get_storage(args), paths)
sync.sync(get_storage(args), paths, resources, args.verbose)
if command in ["reset", "sync"]:
sync.set_boot_app(get_storage(args), args.boot or "")
@ -94,6 +113,7 @@ def main():
args.paths = [args.run]
if command == "run":
pyboard_util.check_run(args)
pyboard_util.run(args)
@ -101,7 +121,7 @@ def main():
def find_storage():
# todo: find solution for windows and linux
for pattern in ['/Volumes/PYBFLASH']:
for pattern in ['/Volumes/PYBFLASH', '/Volumes/NO NAME']:
for path in glob.glob(pattern):
return path
print("Couldn't find badge storage - Please make it's plugged in and reset it if necessary")

14
.development/update_upip.sh Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
TARGET=$(dirname `pwd`)"/upip"
rm -rf "/tmp/upip.zip"
curl -L "https://github.com/micropython/micropython-lib/archive/master.zip" -o "/tmp/upip.zip"
rm -rf "/tmp/upip"
unzip -q -a "/tmp/upip.zip" -d "/tmp/upip"
cd "/tmp/upip/micropython-lib-master"
rm -rf "$TARGET/*"
for d in `find . -maxdepth 1 -type d ! -name ".*"`; do
echo $d;
find "$d" -maxdepth 1 -mindepth 1 \( -name '*.py' -not -name 'test_*' -not -name 'example_*' -not -name 'setup.py' -size +10c \) -or \( -type d -not -name 'dist' -not -name '*.egg-info' -not -name '__pycache__' \) | xargs -I{} bash -c -- 'ditto {} "'"$TARGET"'/"`echo "{}" | sed -e "s/\.\/[^\/]*\///"`';
done

13
.development/wifi.py Normal file
View File

@ -0,0 +1,13 @@
import os, sync, json
def select_wifi():
ssid = input('Enter wifi name (SSID): ')
pw = input('Enter wifi password, leave empty for open network: ')
with open(os.path.join(sync.get_root(), "wifi.json"), "wt") as file:
if pw:
conn_details = {"ssid": ssid, "pw": pw}
else:
conn_details = {"ssid": ssid}
file.write(json.dumps(conn_details))
print("wifi.json created - It will be transfered to the badge on the next sync")

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.DS_Store
__pycache__
wifi.json

View File

@ -1,7 +1,9 @@
import pyb, os, micropython
import pyb, os, micropython, sys
micropython.alloc_emergency_exception_buf(100)
sys.path.append('/flash/upip')
os.sync()
root = os.listdir()

View File

@ -1,3 +1,262 @@
"""HTTP library specially tied to TiLDAs functionality"""
"""HTTP library specially tied to TiLDAs functionality
Somewhat inspired by "request".
Current known issues:
* HTTPS is not supported
*
"""
___license___ = "MIT"
___dependencies___ = ["urlencode"]
import usocket, ujson, os, time, gc, wifi
from urlencode import urlencode
"""Usage
from http_client import *
print(get("http://example.com").raise_for_status().content)
post("http://mydomain.co.uk/api/post", data="SOMETHING").raise_for_status().close() # If response is not consumed you need to close manually
# Or, if you prefer the with syntax:
with post("http://mydomain.co.uk/api/post", urlencoded="SOMETHING") as response:
response.raise_for_error() # No manual close needed
"""
SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'
BUFFER_SIZE = 1024
class Response(object):
def __init__(self):
self.encoding = 'utf-8'
self.headers = {}
self.status = None
self.socket = None
self._content = None
# Hands the responsibility for a socket over to this reponse. This needs to happen
# before any content can be inspected
def add_socket(self, socket, content_so_far):
self.content_so_far = content_so_far
self.socket = socket
@property
def content(self, timeout=90):
start_time = time.time()
if not self._content:
if not self.socket:
raise OSError("Invalid response socket state. Has the content been downloaded instead?")
try:
if "Content-Length" in self.headers:
content_length = int(self.headers["Content-Length"])
elif "content-length" in self.headers:
content_length = int(self.headers["content-length"])
else:
raise Exception("No Content-Length")
self._content = self.content_so_far
del self.content_so_far
while len(self._content) < content_length:
buf = self.socket.recv(BUFFER_SIZE)
self._content += buf
if (time.time() - start_time) > timeout:
raise Exception("HTTP request timeout")
finally:
self.close()
return self._content;
@property
def text(self):
return str(self.content, self.encoding) if self.content else ''
# If you don't use the content of a Response at all you need to manually close it
def close(self):
if self.socket is not None:
self.socket.close()
self.socket = None
def json(self):
return ujson.loads(self.text)
# Writes content into a file. This function will write while receiving, which avoids
# having to load all content into memory
def download_to(self, target, timeout=90):
start_time = time.time()
if not self.socket:
raise OSError("Invalid response socket state. Has the content already been consumed?")
try:
if "Content-Length" in self.headers:
remaining = int(self.headers["Content-Length"])
elif "content-length" in self.headers:
remaining = int(self.headers["content-length"])
else:
raise Exception("No Content-Length")
with open(target, 'wb') as f:
f.write(self.content_so_far)
remaining -= len(self.content_so_far)
del self.content_so_far
while remaining > 0:
buf = self.socket.recv(BUFFER_SIZE)
f.write(buf)
remaining -= len(buf)
if (time.time() - start_time) > timeout:
raise Exception("HTTP request timeout")
f.flush()
os.sync()
finally:
self.close()
def raise_for_status(self):
if 400 <= self.status < 500:
raise OSError('Client error: %s' % self.status)
if 500 <= self.status < 600:
raise OSError('Server error: %s' % self.status)
return self
# In case you want to use "with"
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def open_http_socket(method, url, json=None, timeout=None, headers=None, data=None, params=None):
# This will immediately return if we're already connected, otherwise
# it'll attempt to connect or prompt for a new network. Proceeding
# without an active network connection will cause the getaddrinfo to
# fail.
wifi.connect(
wait=True,
show_wait_message=False,
prompt_on_fail=True,
dialog_title='TiLDA Wifi'
)
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
raise OSError("HTTPS is currently not supported")
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if data is not None:
if isinstance(data, str):
content = data
content_type = "text/plain; charset=UTF-8"
else:
content = urlencode(data)
content_type = "application/x-www-form-urlencoded"
elif json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
# ToDo: Handle IPv6 addresses
if is_ipv4_address(host):
addr = (host, port)
else:
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = None
if proto == 'https:':
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET)
else:
sock = usocket.socket()
if params:
urlpath += "?" + urlencode(params)
sock.connect(addr)
if proto == 'https:':
sock.settimeout(0) # Actually make timeouts working properly with ssl
sock.send('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.send('%s: %s\r\n' % header)
if content is not None:
sock.send('content-length: %s\r\n' % len(content))
sock.send('content-type: %s\r\n' % content_type)
sock.send('\r\n')
sock.send(content)
else:
sock.send('\r\n')
return sock
# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None, data=None, params=None):
sock = open_http_socket(method, url, json, timeout, headers, data, params)
try:
response = Response()
state = 1
hbuf = b""
while True:
buf = sock.recv(BUFFER_SIZE)
if state == 1: # Status
nl = buf.find(b"\n")
if nl > -1:
hbuf += buf[:nl - 1]
response.status = int(hbuf.split(b' ')[1])
state = 2
hbuf = b"";
buf = buf[nl + 1:]
else:
hbuf += buf
if state == 2: # Headers
hbuf += buf
nl = hbuf.find(b"\n")
while nl > -1:
if nl < 2:
buf = hbuf[2:]
hbuf = None
state = 3
break
header = hbuf[:nl - 1].decode("utf8").split(':', 3)
response.headers[header[0].strip()] = header[1].strip()
hbuf = hbuf[nl + 1:]
nl = hbuf.find(b"\n")
if state == 3: # Content
response.add_socket(sock, buf)
sock = None # It's not our responsibility to close the socket anymore
return response
finally:
if sock: sock.close()
gc.collect()
def get(url, **kwargs):
return request('GET', url, **kwargs)
def post(url, **kwargs):
return request('POST', url, **kwargs)
def is_ipv4_address(address):
octets = address.split('.')
try:
valid_octets = [x for x in octets if 0 <= int(x) and int(x) <= 255]
return len(valid_octets) == 4
except Exception:
return False

View File

@ -1,11 +1,4 @@
"""This app's purpose is to run a series of tests against library code
Once successful it displays and prints 'ok' on the screen.
Please make sure that all tests pass before sending a PR. You can easily
do this by running "tilda_tools test". Thank you for keeping all the
tests green! *face-throwing-a-kiss-emoji*
"""
"""Tests for database"""
___license___ = "MIT"
___dependencies___ = ["unittest", "database"]

View File

@ -1,21 +1,47 @@
"""This app's purpose is to run a series of tests against library code
Once successful it displays and prints 'ok' on the screen.
Please make sure that all tests pass before sending a PR. You can easily
do this by running "tilda_tools test". Thank you for keeping all the
tests green! *face-throwing-a-kiss-emoji*
"""
"""Tests for http"""
___license___ = "MIT"
___dependencies___ = ["unittest"]
___dependencies___ = ["unittest", "http", "wifi"]
import unittest
from http import *
import wifi
class TestHttp(unittest.TestCase):
def test_foo(self):
pass
def setUpClass(self):
wifi.connect()
def test_get_with_https(self):
with self.assertRaises(OSError) as context:
get("https://httpbin.org/get")
self.assertIn("HTTPS is currently not supported", str(context.exception))
def test_get(self):
with get("http://httpbin.org/get", params={"foo": "bar"}, headers={"accept": "application/json"}) as response:
self.assertEqual(response.headers["Content-Type"], "application/json")
self.assertEqual(response.status, 200)
content = response.json()
self.assertEqual(content["headers"]["Accept"], "application/json")
self.assertEqual(content["args"], {"foo":"bar"})
def test_post_form(self):
with post("http://httpbin.org/post", data={"foo": "bar"}).raise_for_status() as response:
content = response.json()
self.assertEqual(content["headers"]["Content-Type"], "application/x-www-form-urlencoded")
self.assertEqual(content["form"], {"foo":"bar"})
def test_post_string(self):
with post("http://httpbin.org/post", data="foobar").raise_for_status() as response:
content = response.json()
self.assertEqual(content["headers"]["Content-Type"], "text/plain; charset=UTF-8")
self.assertEqual(content["data"], "foobar")
def test_post_json(self):
with post("http://httpbin.org/post", json={"foo":"bar"}).raise_for_status() as response:
content = response.json()
self.assertEqual(content["headers"]["Content-Type"], "application/json")
self.assertEqual(content["json"], {"foo":"bar"})
if __name__ == "__main__":

19
lib/test_urlencode.py Normal file
View File

@ -0,0 +1,19 @@
"""Tests for urlencode"""