diff --git a/.development/pyboard_util.py b/.development/pyboard_util.py index ebee605..6c07e3b 100644 --- a/.development/pyboard_util.py +++ b/.development/pyboard_util.py @@ -6,6 +6,7 @@ _pyb = None def get_pyb(args): global _pyb if not _pyb: + print("Connected to badge:", end="") if not args.device: args.device = find_tty() @@ -13,10 +14,10 @@ def get_pyb(args): try: _pyb = Pyboard(args.device, args.baudrate, None, None, args.wait) except PyboardError as er: + print(" FAIL") print(er) sys.exit(1) - print("Connected to badge.") - + print(" DONE") return _pyb def close_pyb(): @@ -24,10 +25,13 @@ def close_pyb(): if _pyb: _pyb.close() -def stop_badge(args): +def stop_badge(args, verbose): pyb = get_pyb(args) - print("stopping running app") + if verbose: + print("Stopping running app:", end="") write_command(pyb, b'\r\x03\x03') # ctrl-C twice: interrupt any running program + if verbose: + print(" DONE") def write_command(pyb, command): flush_input(pyb) @@ -42,14 +46,15 @@ def flush_input(pyb): def soft_reset(args): pyb = get_pyb(args) - print("trying to soft reboot badge") + print("Soft reboot:", end="") write_command(pyb, b'\x04') # ctrl-D: soft reset #print("1") data = pyb.read_until(1, b'soft reboot\r\n') #print("2") if data.endswith(b'soft reboot\r\n'): - print("Soft reboot was successful.") + print(" DONE") else: + print(" FAIL") raise PyboardError('could not soft reboot') def find_tty(): @@ -60,10 +65,17 @@ def find_tty(): print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary") sys.exit(1) +def check_run(args): + if args.command is not None or len(args.paths): + for filename in args.paths: + with open(filename, 'r') as f: + pyfile = f.read() + compile(pyfile + '\n', filename, 'exec') + def run(args): pyb = get_pyb(args) - print("executing %s" % args.paths) - print("----------------") + + print("Preparing execution:", end="") # run any command or file(s) - this is mostly a copy from pyboard.py if args.command is not None or len(args.paths): # we must enter raw-REPL mode to execute commands @@ -71,9 +83,11 @@ def run(args): try: pyb.enter_raw_repl() except PyboardError as er: + print(" FAIL") print(er) pyb.close() sys.exit(1) + print(" DONE") def execbuffer(buf): try: @@ -93,6 +107,7 @@ def run(args): # run any files for filename in args.paths: with open(filename, 'rb') as f: + print("-------- %s --------" % filename) pyfile = f.read() execbuffer(pyfile) diff --git a/.development/pydfu.py b/.development/pydfu.py new file mode 100644 index 0000000..e08f41a --- /dev/null +++ b/.development/pydfu.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python +# This file is part of the OpenMV project. +# Copyright (c) 2013/2014 Ibrahim Abdelkader +# This work is licensed under the MIT license, see the file LICENSE for +# details. + +"""This module implements enough functionality to program the STM32F4xx over +DFU, without requiring dfu-util. +See app note AN3156 for a description of the DFU protocol. +See document UM0391 for a dscription of the DFuse file. +""" + +from __future__ import print_function + +import argparse +import re +import struct +import sys +import usb.core +import usb.util +import zlib + +# VID/PID +__VID = 0x0483 +__PID = 0xdf11 + +# USB request __TIMEOUT +__TIMEOUT = 4000 + +# DFU commands +__DFU_DETACH = 0 +__DFU_DNLOAD = 1 +__DFU_UPLOAD = 2 +__DFU_GETSTATUS = 3 +__DFU_CLRSTATUS = 4 +__DFU_GETSTATE = 5 +__DFU_ABORT = 6 + +# DFU status +__DFU_STATE_APP_IDLE = 0x00 +__DFU_STATE_APP_DETACH = 0x01 +__DFU_STATE_DFU_IDLE = 0x02 +__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03 +__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04 +__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05 +__DFU_STATE_DFU_MANIFEST_SYNC = 0x06 +__DFU_STATE_DFU_MANIFEST = 0x07 +__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08 +__DFU_STATE_DFU_UPLOAD_IDLE = 0x09 +__DFU_STATE_DFU_ERROR = 0x0a + +_DFU_DESCRIPTOR_TYPE = 0x21 + + +# USB device handle +__dev = None + +__verbose = None + +# USB DFU interface +__DFU_INTERFACE = 0 + +import inspect +if 'length' in inspect.getfullargspec(usb.util.get_string).args: + # PyUSB 1.0.0.b1 has the length argument + def get_string(dev, index): + return usb.util.get_string(dev, 255, index) +else: + # PyUSB 1.0.0.b2 dropped the length argument + def get_string(dev, index): + return usb.util.get_string(dev, index) + + +def init(): + """Initializes the found DFU device so that we can program it.""" + global __dev + devices = get_dfu_devices(idVendor=__VID, idProduct=__PID) + if not devices: + raise ValueError('No DFU device found') + if len(devices) > 1: + raise ValueError("Multiple DFU devices found") + __dev = devices[0] + __dev.set_configuration() + + # Claim DFU interface + usb.util.claim_interface(__dev, __DFU_INTERFACE) + + # Clear status + clr_status() + + +def clr_status(): + """Clears any error status (perhaps left over from a previous session).""" + __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, + None, __TIMEOUT) + + +def get_status(): + """Get the status of the last operation.""" + stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, + 6, 20000) + # print (__DFU_STAT[stat[4]], stat) + return stat[4] + + +def mass_erase(): + """Performs a MASS erase (i.e. erases the entire device.""" + # Send DNLOAD with first byte=0x41 + __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, + "\x41", __TIMEOUT) + + # Execute last command + if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY: + raise Exception("DFU: erase failed") + + # Check command state + if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE: + raise Exception("DFU: erase failed") + + +def page_erase(addr): + """Erases a single page.""" + if __verbose: + print("Erasing page: 0x%x..." % (addr)) + + # Send DNLOAD with first byte=0x41 and page address + buf = struct.pack(" 0: + write_size = size + if not mass_erase_used: + for segment in mem_layout: + if addr >= segment['addr'] and \ + addr <= segment['last_addr']: + # We found the page containing the address we want to + # write, erase it + page_size = segment['page_size'] + page_addr = addr & ~(page_size - 1) + if addr + write_size > page_addr + page_size: + write_size = page_addr + page_size - addr + page_erase(page_addr) + break + write_memory(addr, data[:write_size], progress, + elem_addr, elem_size) + data = data[write_size:] + addr += write_size + size -= write_size + if progress: + progress(elem_addr, addr - elem_addr, elem_size) + + +def cli_progress(addr, offset, size): + """Prints a progress report suitable for use on the command line.""" + width = 25 + done = offset * width // size + print("\r0x{:08x} {:7d} [{}{}] {:3d}% " + .format(addr, size, '=' * done, ' ' * (width - done), + offset * 100 // size), end="") + sys.stdout.flush() + if offset == size: + print("") + + +def main(): + """Test program for verifying this files functionality.""" + global __verbose + # Parse CMD args + parser = argparse.ArgumentParser(description='DFU Python Util') + #parser.add_argument("path", help="file path") + parser.add_argument( + "-l", "--list", + help="list available DFU devices", + action="store_true", + default=False + ) + parser.add_argument( + "-m", "--mass-erase", + help="mass erase device", + action="store_true", + default=False + ) + parser.add_argument( + "-u", "--upload", + help="read file from DFU device", + dest="path", + default=False + ) + parser.add_argument( + "-v", "--verbose", + help="increase output verbosity", + action="store_true", + default=False + ) + args = parser.parse_args() + + __verbose = args.verbose + + if args.list: + list_dfu_devices(idVendor=__VID, idProduct=__PID) + return + + init() + + if args.mass_erase: + print ("Mass erase...") + mass_erase() + + if args.path: + elements = read_dfu_file(args.path) + if not elements: + return + print("Writing memory...") + write_elements(elements, args.mass_erase, progress=cli_progress) + + print("Exiting DFU...") + exit_dfu() + return + + print("No command specified") + +if __name__ == '__main__': + main() diff --git a/.development/pydfu_util.py b/.development/pydfu_util.py new file mode 100644 index 0000000..ba775a2 --- /dev/null +++ b/.development/pydfu_util.py @@ -0,0 +1,49 @@ +from pydfu import * +import urllib.request, tempfile, os, shutil, ssl + +def firmware_update(verbose): + global __verbose + __verbose = verbose + + temp_path = tempfile.mktemp("firmware.dfu") + url = "https://update.badge.emfcamp.org/firmware.dfu" + + print("Hello - Welcome to the automated TiLDA Mk4 firmware updater") + print("Finding badge: ", end="") + try: + init() + print("DONE") + + print("Downloading newest firmware: ", end="") + context = ssl._create_unverified_context() + with urllib.request.urlopen(url, context=context) as response: + with open(temp_path, 'wb') as tmp_file: + shutil.copyfileobj(response, tmp_file) + print("DONE") + + elements = read_dfu_file(temp_path) + if not elements: + return + + print("Resetting Badge: ", end="") + mass_erase() + print("DONE") + + print("Updating...") + write_elements(elements, True, progress=cli_progress) + exit_dfu() + + print("") + print("You can now restart your badge by pressing the reset button on the back. Please follow the instructions on the screen to finish the setup") + print("Have a nice day!") + + except ValueError as e: + print("FAIL") + print("") + print("We couldn't find your badge. You need to make sure it's plugged in and in DFU mode.") + print("To put your badge into DFU mode you need to press the joystick in the middle while pressing the reset button at the back.") + print("After that, please try this script again.") + print() + print("Error: %s" %(e)) + finally: + if os.path.isfile(temp_path): os.remove(temp_path) diff --git a/.development/resources.py b/.development/resources.py index f06a9a6..8ce1db1 100644 --- a/.development/resources.py +++ b/.development/resources.py @@ -30,8 +30,9 @@ This module has the following operations: resources = get_resources(path) # Gets resources for a given path add_hashes(path, resources) # Adds hashes to the file dict - not needed for testing add_metadata(path, resources) # Adds metadata -resolve_dependencies(resources) # Merges all dependencies into each resource's file dict validate(resources) # Runs basic validation +resolve_dependencies(resources) # Merges all dependencies into each resource's file dict +remove_upip(resources) # Remove upip resources from dict again This module encapsulates all the main operations the app library is expect to perform on a given checkout. It's intentionally kept in one file to make it easier @@ -72,14 +73,30 @@ def get_resources(path): if sub_path.startswith(".") or sub_path == "__pycache__": continue full_path = os.path.join(path, sub_path) + if os.path.islink(full_path): + continue if os.path.isfile(full_path): result[sub_path] = {"type": "root", "files": {sub_path: None}} continue - files = _scan_files(full_path, sub_path) if sub_path in ["lib", "shared"]: + files = _scan_files(full_path, sub_path) for rel_path in files: result[rel_path] = {"type": sub_path, "files": {rel_path: None}} + elif sub_path == "upip": + for upip_lib in os.listdir(full_path): + if upip_lib.startswith(".") or upip_lib == "__pycache__": + continue + full_lib_path = os.path.join(full_path, upip_lib) + files = {} + if os.path.isfile(full_lib_path): + files = {full_lib_path: None} + upip_lib = upip_lib.rsplit('.', 1)[0] + else: + for rel_path in _scan_files(full_lib_path, os.path.join(sub_path, upip_lib)): + files[rel_path] = None + result["upip:%s" % upip_lib] = {"type": sub_path, "files": files} else: + files = _scan_files(full_path, sub_path) result[sub_path] = {"type": "app", "files": {}} for rel_path in files: result[sub_path]["files"][rel_path] = None @@ -131,16 +148,10 @@ def add_metadata(path, resources): def _normalize_metadata(metadata): metadata['description'] = metadata.pop('doc') if 'dependencies' in metadata: - metadata['dependencies'] = [_normalize_lib(l) for l in metadata.pop('dependencies')] + metadata['dependencies'] = [normalize_dependency(l) for l in metadata.pop('dependencies')] return metadata -def _normalize_lib(lib): - """lib dependencies can be shortened to just their module name""" - if "." in lib or "/" in lib: - return lib - return "lib/%s.py" % lib - """ resolve_dependencies(resources) @@ -195,6 +206,21 @@ def _validate_resource(path, resource): if 'categories' not in resource or (not isinstance(resource['categories'], list)) or len(resource['categories']) == 0: resource.setdefault("errors", []).append("___categories___ list is required in main.py but not found") + +""" +remove_upip(resources) + +upip adds over a 100 resources to the list. Some of them have broken validation as well, so it's +useful to remove them after resolving dependencies. +""" +def remove_upip(resources): + to_delete = [] + for key, resource in resources.items(): + if resource['type'] == "upip": + to_delete.append(key) + for key in to_delete: + del resources[key] + """ helpers """ @@ -209,3 +235,12 @@ def get_error_summary(resources): summary += "\n" return summary.strip() +def pretty_print_resources(resources): + import json + return json.dumps(resources, indent=4) + +def normalize_dependency(dependency): + """lib dependencies can be shortened to just their module name""" + if "." in dependency or "/" in dependency or "upip:" in dependency: + return dependency + return "lib/%s.py" % dependency diff --git a/.development/sync.py b/.development/sync.py index 97e32c6..9ea6e2f 100644 --- a/.development/sync.py +++ b/.development/sync.py @@ -1,30 +1,46 @@ -import os, glob, shutil, sys +import os, shutil, sys, fnmatch -def sync(storage, patterns): +def sync(storage, patterns, resources, verbose): root = get_root() # Add all paths that are already files - paths = [os.path.join(root, p) for p in (patterns or []) if os.path.isfile(os.path.join(root, p))] + paths = set([p for p in (patterns or []) if os.path.isfile(os.path.join(root, p))]) - if patterns: - new_patterns = [] - patterns = [os.path.join(root, p, "**") for p in patterns] - else: - patterns = ["**/**", "boot.py"] + # Always copy boot.py + paths.add("boot.py") + + # wifi.json + wifi_path = os.path.join(root, "wifi.json") + if os.path.isfile(wifi_path): + paths.add(wifi_path) + + if not patterns: + patterns = ["*"] for pattern in patterns: - for path in glob.glob(pattern): - paths.append(path) - - if len(paths) == 0: - print("No files to copy found for pattern %s" % patterns) - sys.exit(1) + found = False + for key, resource in resources.items(): + if fnmatch.fnmatch(key, pattern): + found = True + if verbose: + print("Resource %s is going to be synced" % key) + for path in resource['files'].keys(): + paths.add(path) + if not found: + print("WARN: No resources to copy found for pattern %s" % patterns) + if not verbose: + print("Copying %s files: " % len(paths), end="") for path in paths: - rel_path = os.path.relpath(path, root) - if rel_path.startswith("."): + if not path: continue - print("Copying %s..." % rel_path) + rel_path = os.path.relpath(path, root) + if rel_path.startswith(".") or os.path.isdir(path) or os.path.islink(path): + continue + if verbose: + print("Copying %s..." % rel_path) + else: + print(".", end="") target = os.path.join(storage, rel_path) target_dir = os.path.dirname(target) @@ -35,9 +51,10 @@ def sync(storage, patterns): os.makedirs(target_dir) shutil.copy2(path, target) - else: + if verbose: print("Files copied successfully") - + else: + print(" DONE") def set_boot_app(storage, app_to_boot): path = os.path.join(storage, 'once.txt') @@ -47,7 +64,8 @@ def set_boot_app(storage, app_to_boot): pass with open(path, 'w') as f: f.write(app_to_boot + "\n") - print("setting next boot to %s" % app_to_boot) + if app_to_boot: + print("setting next boot to %s" % app_to_boot) def get_root(): root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..')) diff --git a/.development/tilda_tools.py b/.development/tilda_tools.py index 0cf56a1..6ab3f8d 100755 --- a/.development/tilda_tools.py +++ b/.development/tilda_tools.py @@ -16,7 +16,7 @@ $ tilda_tools sync Update files in folder(s) to match current local version $ tilda_tools sync my_game shared -$ tilda_tools sync ... +$ tilda_tools sync ... Sync (as above), but execute my_app after reboot $ tilda_toold.py sync --boot my_app [] @@ -36,6 +36,9 @@ $ tilda_tools test Update firmware on badge (warning, this will delete all settings etc. stored on the badge!) $ tilda_tools firmware-update +Setup wifi.json to be copied to the badge on every sync +$ tilda_tools wifi + Common parameters ----------------- @@ -45,16 +48,18 @@ Common parameters """ import sys, glob -import sync, pyboard_util +import sync, pyboard_util, wifi, pydfu_util from resources import * def main(): import argparse cmd_parser = argparse.ArgumentParser(description='Toolchain for working with the TiLDA Mk4') - cmd_parser.add_argument('command', nargs=1, help='command [test|reset|sync|run]') + cmd_parser.add_argument('command', nargs=1, help='command [test|reset|sync|run|validate|wifi|firmware-update]', choices=['test', 'reset', 'sync', 'validate', 'run', 'wifi', 'firmware-update']) cmd_parser.add_argument('-d', '--device', help='the serial device of the badge') cmd_parser.add_argument('-s', '--storage', help='the usb mass storage path of the badge') cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') + cmd_parser.add_argument('-v', '--verbose', action='store_true', help='adds more output') + cmd_parser.add_argument('--print_resources', action='store_true', help='prints resources in json') cmd_parser.add_argument('--boot', help='defines which app to boot into after reboot') cmd_parser.add_argument('--run', help='like run, but after a sync') cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') @@ -63,11 +68,20 @@ def main(): command = args.command[0] path = sync.get_root() - if command in ["test", "validate"]: + if command == "firmware-update": + pydfu_util.firmware_update(args.verbose) + + if command == "wifi": + wifi.select_wifi() + + if command in ["test", "validate", "sync"]: resources = get_resources(path) add_metadata(path, resources) - resolve_dependencies(resources) validate(path, resources) + resolve_dependencies(resources) + remove_upip(resources) + if args.print_resources: + print(pretty_print_resources(resources)) errors = get_error_summary(resources) if errors: print("Problems found:\n") @@ -76,15 +90,20 @@ def main(): print("Local Test: PASS") if command == "test": command = "sync" - args.path = [] - args.run = "test/main.py" + if len(args.paths) == 0: + args.run = "test/main.py" + else: + if "." not in args.paths[0]: + args.paths[0] = "lib/%s.py" % args.paths[0] + args.run = args.paths[0] + if command in ["reset", "sync"]: - pyboard_util.stop_badge(args) + pyboard_util.stop_badge(args, args.verbose) if command == "sync": paths = args.paths if len(args.paths) else None - sync.sync(get_storage(args), paths) + sync.sync(get_storage(args), paths, resources, args.verbose) if command in ["reset", "sync"]: sync.set_boot_app(get_storage(args), args.boot or "") @@ -94,6 +113,7 @@ def main(): args.paths = [args.run] if command == "run": + pyboard_util.check_run(args) pyboard_util.run(args) @@ -101,7 +121,7 @@ def main(): def find_storage(): # todo: find solution for windows and linux - for pattern in ['/Volumes/PYBFLASH']: + for pattern in ['/Volumes/PYBFLASH', '/Volumes/NO NAME']: for path in glob.glob(pattern): return path print("Couldn't find badge storage - Please make it's plugged in and reset it if necessary") diff --git a/.development/update_upip.sh b/.development/update_upip.sh new file mode 100755 index 0000000..92f5542 --- /dev/null +++ b/.development/update_upip.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +TARGET=$(dirname `pwd`)"/upip" +rm -rf "/tmp/upip.zip" +curl -L "https://github.com/micropython/micropython-lib/archive/master.zip" -o "/tmp/upip.zip" +rm -rf "/tmp/upip" +unzip -q -a "/tmp/upip.zip" -d "/tmp/upip" +cd "/tmp/upip/micropython-lib-master" +rm -rf "$TARGET/*" +for d in `find . -maxdepth 1 -type d ! -name ".*"`; do + echo $d; + find "$d" -maxdepth 1 -mindepth 1 \( -name '*.py' -not -name 'test_*' -not -name 'example_*' -not -name 'setup.py' -size +10c \) -or \( -type d -not -name 'dist' -not -name '*.egg-info' -not -name '__pycache__' \) | xargs -I{} bash -c -- 'ditto {} "'"$TARGET"'/"`echo "{}" | sed -e "s/\.\/[^\/]*\///"`'; +done + diff --git a/.development/wifi.py b/.development/wifi.py new file mode 100644 index 0000000..87e6a02 --- /dev/null +++ b/.development/wifi.py @@ -0,0 +1,13 @@ +import os, sync, json + +def select_wifi(): + ssid = input('Enter wifi name (SSID): ') + pw = input('Enter wifi password, leave empty for open network: ') + with open(os.path.join(sync.get_root(), "wifi.json"), "wt") as file: + if pw: + conn_details = {"ssid": ssid, "pw": pw} + else: + conn_details = {"ssid": ssid} + + file.write(json.dumps(conn_details)) + print("wifi.json created - It will be transfered to the badge on the next sync") diff --git a/.gitignore b/.gitignore index 6a4dae1..ce8cba1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .DS_Store __pycache__ +wifi.json diff --git a/boot.py b/boot.py index 4a83ee0..bafc524 100644 --- a/boot.py +++ b/boot.py @@ -1,7 +1,9 @@ -import pyb, os, micropython +import pyb, os, micropython, sys micropython.alloc_emergency_exception_buf(100) +sys.path.append('/flash/upip') + os.sync() root = os.listdir() diff --git a/lib/http.py b/lib/http.py index 378f89a..87844ee 100644 --- a/lib/http.py +++ b/lib/http.py @@ -1,3 +1,262 @@ -"""HTTP library specially tied to TiLDAs functionality""" +"""HTTP library specially tied to TiLDAs functionality + +Somewhat inspired by "request". + +Current known issues: +* HTTPS is not supported +* +""" ___license___ = "MIT" +___dependencies___ = ["urlencode"] + +import usocket, ujson, os, time, gc, wifi +from urlencode import urlencode + +"""Usage +from http_client import * +print(get("http://example.com").raise_for_status().content) +post("http://mydomain.co.uk/api/post", data="SOMETHING").raise_for_status().close() # If response is not consumed you need to close manually +# Or, if you prefer the with syntax: +with post("http://mydomain.co.uk/api/post", urlencoded="SOMETHING") as response: + response.raise_for_error() # No manual close needed +""" + +SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout') +CONTENT_TYPE_JSON = 'application/json' +BUFFER_SIZE = 1024 + +class Response(object): + def __init__(self): + self.encoding = 'utf-8' + self.headers = {} + self.status = None + self.socket = None + self._content = None + + # Hands the responsibility for a socket over to this reponse. This needs to happen + # before any content can be inspected + def add_socket(self, socket, content_so_far): + self.content_so_far = content_so_far + self.socket = socket + + @property + def content(self, timeout=90): + start_time = time.time() + if not self._content: + if not self.socket: + raise OSError("Invalid response socket state. Has the content been downloaded instead?") + try: + if "Content-Length" in self.headers: + content_length = int(self.headers["Content-Length"]) + elif "content-length" in self.headers: + content_length = int(self.headers["content-length"]) + else: + raise Exception("No Content-Length") + self._content = self.content_so_far + del self.content_so_far + while len(self._content) < content_length: + buf = self.socket.recv(BUFFER_SIZE) + self._content += buf + if (time.time() - start_time) > timeout: + raise Exception("HTTP request timeout") + + finally: + self.close() + return self._content; + + @property + def text(self): + return str(self.content, self.encoding) if self.content else '' + + # If you don't use the content of a Response at all you need to manually close it + def close(self): + if self.socket is not None: + self.socket.close() + self.socket = None + + def json(self): + return ujson.loads(self.text) + + # Writes content into a file. This function will write while receiving, which avoids + # having to load all content into memory + def download_to(self, target, timeout=90): + start_time = time.time() + if not self.socket: + raise OSError("Invalid response socket state. Has the content already been consumed?") + try: + if "Content-Length" in self.headers: + remaining = int(self.headers["Content-Length"]) + elif "content-length" in self.headers: + remaining = int(self.headers["content-length"]) + else: + raise Exception("No Content-Length") + + with open(target, 'wb') as f: + f.write(self.content_so_far) + remaining -= len(self.content_so_far) + del self.content_so_far + while remaining > 0: + buf = self.socket.recv(BUFFER_SIZE) + f.write(buf) + remaining -= len(buf) + + if (time.time() - start_time) > timeout: + raise Exception("HTTP request timeout") + + f.flush() + os.sync() + + finally: + self.close() + + def raise_for_status(self): + if 400 <= self.status < 500: + raise OSError('Client error: %s' % self.status) + if 500 <= self.status < 600: + raise OSError('Server error: %s' % self.status) + return self + + # In case you want to use "with" + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + +def open_http_socket(method, url, json=None, timeout=None, headers=None, data=None, params=None): + # This will immediately return if we're already connected, otherwise + # it'll attempt to connect or prompt for a new network. Proceeding + # without an active network connection will cause the getaddrinfo to + # fail. + wifi.connect( + wait=True, + show_wait_message=False, + prompt_on_fail=True, + dialog_title='TiLDA Wifi' + ) + + urlparts = url.split('/', 3) + proto = urlparts[0] + host = urlparts[2] + urlpath = '' if len(urlparts) < 4 else urlparts[3] + + if proto == 'http:': + port = 80 + elif proto == 'https:': + raise OSError("HTTPS is currently not supported") + port = 443 + else: + raise OSError('Unsupported protocol: %s' % proto[:-1]) + + if ':' in host: + host, port = host.split(':') + port = int(port) + + if data is not None: + if isinstance(data, str): + content = data + content_type = "text/plain; charset=UTF-8" + else: + content = urlencode(data) + content_type = "application/x-www-form-urlencoded" + elif json is not None: + content = ujson.dumps(json) + content_type = CONTENT_TYPE_JSON + else: + content = None + + # ToDo: Handle IPv6 addresses + if is_ipv4_address(host): + addr = (host, port) + else: + ai = usocket.getaddrinfo(host, port) + addr = ai[0][4] + + sock = None + if proto == 'https:': + sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET) + else: + sock = usocket.socket() + + if params: + urlpath += "?" + urlencode(params) + + sock.connect(addr) + if proto == 'https:': + sock.settimeout(0) # Actually make timeouts working properly with ssl + + sock.send('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host)) + + if headers is not None: + for header in headers.items(): + sock.send('%s: %s\r\n' % header) + + if content is not None: + sock.send('content-length: %s\r\n' % len(content)) + sock.send('content-type: %s\r\n' % content_type) + sock.send('\r\n') + sock.send(content) + else: + sock.send('\r\n') + + return sock + +# Adapted from upip +def request(method, url, json=None, timeout=None, headers=None, data=None, params=None): + sock = open_http_socket(method, url, json, timeout, headers, data, params) + try: + response = Response() + state = 1 + hbuf = b"" + while True: + buf = sock.recv(BUFFER_SIZE) + if state == 1: # Status + nl = buf.find(b"\n") + if nl > -1: + hbuf += buf[:nl - 1] + response.status = int(hbuf.split(b' ')[1]) + state = 2 + hbuf = b""; + buf = buf[nl + 1:] + else: + hbuf += buf + + if state == 2: # Headers + hbuf += buf + nl = hbuf.find(b"\n") + while nl > -1: + if nl < 2: + buf = hbuf[2:] + hbuf = None + state = 3 + break + + header = hbuf[:nl - 1].decode("utf8").split(':', 3) + response.headers[header[0].strip()] = header[1].strip() + hbuf = hbuf[nl + 1:] + nl = hbuf.find(b"\n") + + if state == 3: # Content + response.add_socket(sock, buf) + sock = None # It's not our responsibility to close the socket anymore + return response + finally: + if sock: sock.close() + gc.collect() + +def get(url, **kwargs): + return request('GET', url, **kwargs) + +def post(url, **kwargs): + return request('POST', url, **kwargs) + +def is_ipv4_address(address): + octets = address.split('.') + try: + valid_octets = [x for x in octets if 0 <= int(x) and int(x) <= 255] + return len(valid_octets) == 4 + except Exception: + return False + + diff --git a/lib/test_database.py b/lib/test_database.py index b9ea3b7..9933389 100644 --- a/lib/test_database.py +++ b/lib/test_database.py @@ -1,11 +1,4 @@ -"""This app's purpose is to run a series of tests against library code - -Once successful it displays and prints 'ok' on the screen. - -Please make sure that all tests pass before sending a PR. You can easily -do this by running "tilda_tools test". Thank you for keeping all the -tests green! *face-throwing-a-kiss-emoji* -""" +"""Tests for database""" ___license___ = "MIT" ___dependencies___ = ["unittest", "database"] diff --git a/lib/test_http.py b/lib/test_http.py index c0a3f7e..e2e8c65 100644 --- a/lib/test_http.py +++ b/lib/test_http.py @@ -1,21 +1,47 @@ -"""This app's purpose is to run a series of tests against library code - -Once successful it displays and prints 'ok' on the screen. - -Please make sure that all tests pass before sending a PR. You can easily -do this by running "tilda_tools test". Thank you for keeping all the -tests green! *face-throwing-a-kiss-emoji* -""" +"""Tests for http""" ___license___ = "MIT" -___dependencies___ = ["unittest"] +___dependencies___ = ["unittest", "http", "wifi"] import unittest +from http import * +import wifi class TestHttp(unittest.TestCase): - def test_foo(self): - pass + def setUpClass(self): + wifi.connect() + + def test_get_with_https(self): + with self.assertRaises(OSError) as context: + get("https://httpbin.org/get") + self.assertIn("HTTPS is currently not supported", str(context.exception)) + + def test_get(self): + with get("http://httpbin.org/get", params={"foo": "bar"}, headers={"accept": "application/json"}) as response: + self.assertEqual(response.headers["Content-Type"], "application/json") + self.assertEqual(response.status, 200) + content = response.json() + self.assertEqual(content["headers"]["Accept"], "application/json") + self.assertEqual(content["args"], {"foo":"bar"}) + + def test_post_form(self): + with post("http://httpbin.org/post", data={"foo": "bar"}).raise_for_status() as response: + content = response.json() + self.assertEqual(content["headers"]["Content-Type"], "application/x-www-form-urlencoded") + self.assertEqual(content["form"], {"foo":"bar"}) + + def test_post_string(self): + with post("http://httpbin.org/post", data="foobar").raise_for_status() as response: + content = response.json() + self.assertEqual(content["headers"]["Content-Type"], "text/plain; charset=UTF-8") + self.assertEqual(content["data"], "foobar") + + def test_post_json(self): + with post("http://httpbin.org/post", json={"foo":"bar"}).raise_for_status() as response: + content = response.json() + self.assertEqual(content["headers"]["Content-Type"], "application/json") + self.assertEqual(content["json"], {"foo":"bar"}) if __name__ == "__main__": diff --git a/lib/test_urlencode.py b/lib/test_urlencode.py new file mode 100644 index 0000000..ae4e26b --- /dev/null +++ b/lib/test_urlencode.py @@ -0,0 +1,19 @@ +"""Tests for urlencode""" + +___license___ = "MIT" +___dependencies___ = ["unittest", "urlencode"] + +import unittest +from urlencode import * + +class TestUrlencode(unittest.TestCase): + + def test_urlencode(self): + self.assertEqual( + urlencode({"täst":"!£$%(*&^%()", "l": "😃"}), + "l=%F0%9F%98%83&t%C3%A4st=%21%C2%A3%24%25%28%2A%26%5E%25%28%29" + ) + + +if __name__ == "__main__": + TestUrlencode().run_standalone() diff --git a/lib/unittest.py b/lib/unittest.py index cf3f32a..0d857bb 100644 --- a/lib/unittest.py +++ b/lib/unittest.py @@ -1,8 +1,12 @@ -"""Base libarary for test cases""" +"""Base libarary for test cases + +See https://github.com/python/cpython/blob/master/Lib/unittest/case.py for +some of the code copied here +""" ___license___ = "MIT" -import sys +import sys, ugfx class SkipTest(Exception): """Indicates a test has been skipped""" @@ -44,6 +48,7 @@ class TestCase(object): return self.count_fail == 0 def run_standalone(self): + ugfx.clear(0xFFFFFF) self.run() print_result(self.count_pass, self.count_fail, self.count_skip) @@ -72,8 +77,116 @@ class TestCase(object): def assertFalse(self, actual): self.assertEqual(actual, False) + def assertRaises(self, expected_exception, *args, **kwargs): + context = _AssertRaisesContext(expected_exception, self) + return context.handle('assertRaises', args, kwargs) + + def assertIn(self, sub, actual): + if not sub in actual: + raise FailTest("Expected %s to be in %s" % (sub, actual)) + def skip(self): raise SkipTest() def print_result(count_pass, count_fail, count_skip): print("PASS: %s FAIL: %s SKIP: %s" % (count_pass, count_fail, count_skip)) + +########################################### +#### Bits copied straight from cpython #### +########################################### + +class _BaseTestCaseContext: + + def __init__(self, test_case): + self.test_case = test_case + + def _raiseFailure(self, standardMsg): + msg = self.test_case._formatMessage(self.msg, standardMsg) + raise self.test_case.failureException(msg) + +class _AssertRaisesBaseContext(_BaseTestCaseContext): + + def __init__(self, expected, test_case, expected_regex=None): + _BaseTestCaseContext.__init__(self, test_case) + self.expected = expected + self.test_case = test_case + if expected_regex is not None: + expected_regex = re.compile(expected_regex) + self.expected_regex = expected_regex + self.obj_name = None + self.msg = None + + def handle(self, name, args, kwargs): + """ + If args is empty, assertRaises/Warns is being used as a + context manager, so check for a 'msg' kwarg and return self. + If args is not empty, call a callable passing positional and keyword + arguments. + """ + try: + if not _is_subtype(self.expected, self._base_type): + raise TypeError('%s() arg 1 must be %s' % + (name, self._base_type_str)) + if args and args[0] is None: + warnings.warn("callable is None", + DeprecationWarning, 3) + args = () + if not args: + self.msg = kwargs.pop('msg', None) + if kwargs: + warnings.warn('%r is an invalid keyword argument for ' + 'this function' % next(iter(kwargs)), + DeprecationWarning, 3) + return self + + callable_obj, *args = args + try: + self.obj_name = callable_obj.__name__ + except AttributeError: + self.obj_name = str(callable_obj) + with self: + callable_obj(*args, **kwargs) + finally: + # bpo-23890: manually break a reference cycle + self = None + + + +class _AssertRaisesContext(_AssertRaisesBaseContext): + """A context manager used to implement TestCase.assertRaises* methods.""" + + _base_type = BaseException + _base_type_str = 'an exception type or tuple of exception types' + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + if exc_type is None: + try: + exc_name = self.expected.__name__ + except AttributeError: + exc_name = str(self.expected) + if self.obj_name: + self._raiseFailure("{} not raised by {}".format(exc_name, + self.obj_name)) + else: + self._raiseFailure("{} not raised".format(exc_name)) + if not issubclass(exc_type, self.expected): + # let unexpected exceptions pass through + return False + # store exception + self.exception = exc_value + if self.expected_regex is None: + return True + + expected_regex = self.expected_regex + if not expected_regex.search(str(exc_value)): + self._raiseFailure('"{}" does not match "{}"'.format( + expected_regex.pattern, str(exc_value))) + return True + +def _is_subtype(expected, basetype): + if isinstance(expected, tuple): + return all(_is_subtype(e, basetype) for e in expected) + return isinstance(expected, type) and issubclass(expected, basetype) diff --git a/lib/urlencode.py b/lib/urlencode.py new file mode 100644 index 0000000..356f541 --- /dev/null +++ b/lib/urlencode.py @@ -0,0 +1,133 @@ +"""URL encoding helper + +Mostly taken from urllib.parse (which is sadly too large to be imported directly) + +I've removed most of the comment to make it easier on micropython +""" +___license___ = "Python" +___dependencies___ = ["upip:collections"] + +from collections.defaultdict import defaultdict + +_ALWAYS_SAFE = frozenset(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + b'abcdefghijklmnopqrstuvwxyz' + b'0123456789' + b'_.-') +_ALWAYS_SAFE_BYTES = bytes(_ALWAYS_SAFE) + +_safe_quoters = {} + +class Quoter(defaultdict): + def __init__(self, safe): + """safe: bytes object.""" + self.safe = _ALWAYS_SAFE.union(safe) + + def __repr__(self): + # Without this, will just display as a defaultdict + return "" % dict(self) + + def __missing__(self, b): + # Handle a cache miss. Store quoted string in cache and return. + res = chr(b) if b in self.safe else '%{:02X}'.format(b) + self[b] = res + return res + +def quote(string, safe='/', encoding=None, errors=None): + if isinstance(string, str): + if not string: + return string + if encoding is None: + encoding = 'utf-8' + if errors is None: + errors = 'strict' + string = string.encode(encoding, errors) + else: + if encoding is not None: + raise TypeError("quote() doesn't support 'encoding' for bytes") + if errors is not None: + raise TypeError("quote() doesn't support 'errors' for bytes") + return quote_from_bytes(string, safe) + +def quote_plus(string, safe='', encoding=None, errors=None): + if ((isinstance(string, str) and ' ' not in string) or + (isinstance(string, bytes) and b' ' not in string)): + return quote(string, safe, encoding, errors) + if isinstance(safe, str): + space = ' ' + else: + space = b' ' + string = quote(string, safe + space, encoding, errors) + return string.replace(' ', '+') + +def quote_from_bytes(bs, safe='/'): + if not isinstance(bs, (bytes, bytearray)): + raise TypeError("quote_from_bytes() expected bytes") + if not bs: + return '' + if isinstance(safe, str): + safe = safe.encode('ascii', 'ignore') + else: + safe = bytes([c for c in safe if c < 128]) + if not bs.rstrip(_ALWAYS_SAFE_BYTES + safe): + return bs.decode() + try: + quoter = _safe_quoters[safe] + except KeyError: + _safe_quoters[safe] = quoter = Quoter(safe).__getitem__ + return ''.join([quoter(char) for char in bs]) + +def urlencode(query, doseq=False, safe='', encoding=None, errors=None): + if hasattr(query, "items"): + query = query.items() + else: + try: + if len(query) and not isinstance(query[0], tuple): + raise TypeError + except TypeError: + raise TypeError("not a valid non-string sequence " + "or mapping object")#.with_traceback(tb) + + l = [] + if not doseq: + for k, v in query: + if isinstance(k, bytes): + k = quote_plus(k, safe) + else: + k = quote_plus(str(k), safe, encoding, errors) + + if isinstance(v, bytes): + v = quote_plus(v, safe) + else: + v = quote_plus(str(v), safe, encoding, errors) + l.append(k + '=' + v) + else: + for k, v in query: + if isinstance(k, bytes): + k = quote_plus(k, safe) + else: + k = quote_plus(str(k), safe, encoding, errors) + + if isinstance(v, bytes): + v = quote_plus(v, safe) + l.append(k + '=' + v) + elif isinstance(v, str): + v = quote_plus(v, safe, encoding, errors) + l.append(k + '=' + v) + else: + try: + # Is this a sufficient test for sequence-ness? + x = len(v) + except TypeError: + # not a sequence + v = quote_plus(str(v), safe, encoding, errors) + l.append(k + '=' + v) + else: + # loop over the sequence + for elt in v: + if isinstance(elt, bytes): + elt = quote_plus(elt, safe) + else: + elt = quote_plus(str(elt), safe, encoding, errors) + l.append(k + '=' + elt) + return '&'.join(l) + diff --git a/test/main.py b/test/main.py index 9c08cd5..404d889 100644 --- a/test/main.py +++ b/test/main.py @@ -10,11 +10,12 @@ tests green! *face-throwing-a-kiss-emoji* ___license___ = "MIT" ___categories___ = ["Development"] ___name___ = "Integration test app" -___dependencies___ = ["unittest", "test_database", "test_http"] +___dependencies___ = ["unittest", "test_database", "test_http", "test_urlencode"] # Add all tests that need to be run here: import test_database import test_http +import test_urlencode # run import sys, unittest diff --git a/upip/__future__.py b/upip/__future__.py new file mode 100644 index 0000000..45b935e --- /dev/null +++ b/upip/__future__.py @@ -0,0 +1,7 @@ +nested_scopes = True +generators = True +division = True +absolute_import = True +with_statement = True +print_function = True +unicode_literals = True diff --git a/upip/_libc.py b/upip/_libc.py new file mode 100644 index 0000000..a930cbf --- /dev/null +++ b/upip/_libc.py @@ -0,0 +1,34 @@ +import ffi +import sys + + +_h = None + +names = ('libc.so', 'libc.so.0', 'libc.so.6', 'libc.dylib') + +def get(): + global _h + if _h: + return _h + err = None + for n in names: + try: + _h = ffi.open(n) + return _h + except OSError as e: + err = e + raise err + + +def set_names(n): + global names + names = n + +# Find out bitness of the platform, even if long ints are not supported +# TODO: All bitness differences should be removed from micropython-lib, and +# this snippet too. +bitness = 1 +v = sys.maxsize +while v: + bitness += 1 + v >>= 1 diff --git a/upip/_markupbase.py b/upip/_markupbase.py new file mode 100644 index 0000000..2af5f1c --- /dev/null +++ b/upip/_markupbase.py @@ -0,0 +1,395 @@ +"""Shared support for scanning document type declarations in HTML and XHTML. + +This module is used as a foundation for the html.parser module. It has no +documented public API and should not be used directly. + +""" + +import re + +_declname_match = re.compile(r'[a-zA-Z][-_.a-zA-Z0-9]*\s*').match +_declstringlit_match = re.compile(r'(\'[^\']*\'|"[^"]*")\s*').match +_commentclose = re.compile(r'--\s*>') +_markedsectionclose = re.compile(r']\s*]\s*>') + +# An analysis of the MS-Word extensions is available at +# http://www.planetpublish.com/xmlarena/xap/Thursday/WordtoXML.pdf + +_msmarkedsectionclose = re.compile(r']\s*>') + +del re + + +class ParserBase: + """Parser base class which provides some common support methods used + by the SGML/HTML and XHTML parsers.""" + + def __init__(self): + if self.__class__ is ParserBase: + raise RuntimeError( + "_markupbase.ParserBase must be subclassed") + + def error(self, message): + raise NotImplementedError( + "subclasses of ParserBase must override error()") + + def reset(self): + self.lineno = 1 + self.offset = 0 + + def getpos(self): + """Return current line number and offset.""" + return self.lineno, self.offset + + # Internal -- update line number and offset. This should be + # called for each piece of data exactly once, in order -- in other + # words the concatenation of all the input strings to this + # function should be exactly the entire input. + def updatepos(self, i, j): + if i >= j: + return j + rawdata = self.rawdata + nlines = rawdata.count("\n", i, j) + if nlines: + self.lineno = self.lineno + nlines + pos = rawdata.rindex("\n", i, j) # Should not fail + self.offset = j-(pos+1) + else: + self.offset = self.offset + j-i + return j + + _decl_otherchars = '' + + # Internal -- parse declaration (for use by subclasses). + def parse_declaration(self, i): + # This is some sort of declaration; in "HTML as + # deployed," this should only be the document type + # declaration (""). + # ISO 8879:1986, however, has more complex + # declaration syntax for elements in , including: + # --comment-- + # [marked section] + # name in the following list: ENTITY, DOCTYPE, ELEMENT, + # ATTLIST, NOTATION, SHORTREF, USEMAP, + # LINKTYPE, LINK, IDLINK, USELINK, SYSTEM + rawdata = self.rawdata + j = i + 2 + assert rawdata[i:j] == "": + # the empty comment + return j + 1 + if rawdata[j:j+1] in ("-", ""): + # Start of comment followed by buffer boundary, + # or just a buffer boundary. + return -1 + # A simple, practical version could look like: ((name|stringlit) S*) + '>' + n = len(rawdata) + if rawdata[j:j+2] == '--': #comment + # Locate --.*-- as the body of the comment + return self.parse_comment(i) + elif rawdata[j] == '[': #marked section + # Locate [statusWord [...arbitrary SGML...]] as the body of the marked section + # Where statusWord is one of TEMP, CDATA, IGNORE, INCLUDE, RCDATA + # Note that this is extended by Microsoft Office "Save as Web" function + # to include [if...] and [endif]. + return self.parse_marked_section(i) + else: #all other declaration elements + decltype, j = self._scan_name(j, i) + if j < 0: + return j + if decltype == "doctype": + self._decl_otherchars = '' + while j < n: + c = rawdata[j] + if c == ">": + # end of declaration syntax + data = rawdata[i+2:j] + if decltype == "doctype": + self.handle_decl(data) + else: + # According to the HTML5 specs sections "8.2.4.44 Bogus + # comment state" and "8.2.4.45 Markup declaration open + # state", a comment token should be emitted. + # Calling unknown_decl provides more flexibility though. + self.unknown_decl(data) + return j + 1 + if c in "\"'": + m = _declstringlit_match(rawdata, j) + if not m: + return -1 # incomplete + j = m.end() + elif c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ": + name, j = self._scan_name(j, i) + elif c in self._decl_otherchars: + j = j + 1 + elif c == "[": + # this could be handled in a separate doctype parser + if decltype == "doctype": + j = self._parse_doctype_subset(j + 1, i) + elif decltype in {"attlist", "linktype", "link", "element"}: + # must tolerate []'d groups in a content model in an element declaration + # also in data attribute specifications of attlist declaration + # also link type declaration subsets in linktype declarations + # also link attribute specification lists in link declarations + self.error("unsupported '[' char in %s declaration" % decltype) + else: + self.error("unexpected '[' char in declaration") + else: + self.error( + "unexpected %r char in declaration" % rawdata[j]) + if j < 0: + return j + return -1 # incomplete + + # Internal -- parse a marked section + # Override this to handle MS-word extension syntax content + def parse_marked_section(self, i, report=1): + rawdata= self.rawdata + assert rawdata[i:i+3] == ' ending + match= _markedsectionclose.search(rawdata, i+3) + elif sectName in {"if", "else", "endif"}: + # look for MS Office ]> ending + match= _msmarkedsectionclose.search(rawdata, i+3) + else: + self.error('unknown status keyword %r in marked section' % rawdata[i+3:j]) + if not match: + return -1 + if report: + j = match.start(0) + self.unknown_decl(rawdata[i+3: j]) + return match.end(0) + + # Internal -- parse comment, return length or -1 if not terminated + def parse_comment(self, i, report=1): + rawdata = self.rawdata + if rawdata[i:i+4] != ' 'type://host/path' +# splittype('type:opaquestring') --> 'type', 'opaquestring' +# splithost('//host[:port]/path') --> 'host[:port]', '/path' +# splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]' +# splitpasswd('user:passwd') -> 'user', 'passwd' +# splitport('host:port') --> 'host', 'port' +# splitquery('/path?query') --> '/path', 'query' +# splittag('/path#tag') --> '/path', 'tag' +# splitattr('/path;attr1=value1;attr2=value2;...') -> +# '/path', ['attr1=value1', 'attr2=value2', ...] +# splitvalue('attr=value') --> 'attr', 'value' +# urllib.parse.unquote('abc%20def') -> 'abc def' +# quote('abc def') -> 'abc%20def') + +def to_bytes(url): + """to_bytes(u"URL") --> 'URL'.""" + # Most URL schemes require ASCII. If that changes, the conversion + # can be relaxed. + # XXX get rid of to_bytes() + if isinstance(url, str): + try: + url = url.encode("ASCII").decode() + except UnicodeError: + raise UnicodeError("URL " + repr(url) + + " contains non-ASCII characters") + return url + +def unwrap(url): + """unwrap('') --> 'type://host/path'.""" + url = str(url).strip() + if url[:1] == '<' and url[-1:] == '>': + url = url[1:-1].strip() + if url[:4] == 'URL:': url = url[4:].strip() + return url + +_typeprog = None +def splittype(url): + """splittype('type:opaquestring') --> 'type', 'opaquestring'.""" + global _typeprog + if _typeprog is None: + import re + _typeprog = re.compile('^([^/:]+):') + + match = _typeprog.match(url) + if match: + scheme = match.group(1) + return scheme.lower(), url[len(scheme) + 1:] + return None, url + +_hostprog = None +def splithost(url): + """splithost('//host[:port]/path') --> 'host[:port]', '/path'.""" + global _hostprog + if _hostprog is None: + import re + _hostprog = re.compile('^//([^/?]*)(.*)$') + + match = _hostprog.match(url) + if match: + host_port = match.group(1) + path = match.group(2) + if path and not path.startswith('/'): + path = '/' + path + return host_port, path + return None, url + +_userprog = None +def splituser(host): + """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.""" + global _userprog + if _userprog is None: + import re + _userprog = re.compile('^(.*)@(.*)$') + + match = _userprog.match(host) + if match: return match.group(1, 2) + return None, host + +_passwdprog = None +def splitpasswd(user): + """splitpasswd('user:passwd') -> 'user', 'passwd'.""" + global _passwdprog + if _passwdprog is None: + import re + _passwdprog = re.compile('^([^:]*):(.*)$',re.S) + + match = _passwdprog.match(user) + if match: return match.group(1, 2) + return user, None + +# splittag('/path#tag') --> '/path', 'tag' +_portprog = None +def splitport(host): + """splitport('host:port') --> 'host', 'port'.""" + global _portprog + if _portprog is None: + import re + _portprog = re.compile('^(.*):([0-9]+)$') + + match = _portprog.match(host) + if match: return match.group(1, 2) + return host, None + +_nportprog = None +def splitnport(host, defport=-1): + """Split host and port, returning numeric port. + Return given default port if no ':' found; defaults to -1. + Return numerical port if a valid number are found after ':'. + Return None if ':' but not a valid number.""" + global _nportprog + if _nportprog is None: + import re + _nportprog = re.compile('^(.*):(.*)$') + + match = _nportprog.match(host) + if match: + host, port = match.group(1, 2) + try: + if not port: raise ValueError("no digits") + nport = int(port) + except ValueError: + nport = None + return host, nport + return host, defport + +_queryprog = None +def splitquery(url): + """splitquery('/path?query') --> '/path', 'query'.""" + global _queryprog + if _queryprog is None: + import re + _queryprog = re.compile('^(.*)\?([^?]*)$') + + match = _queryprog.match(url) + if match: return match.group(1, 2) + return url, None + +_tagprog = None +def splittag(url): + """splittag('/path#tag') --> '/path', 'tag'.""" + global _tagprog + if _tagprog is None: + import re + _tagprog = re.compile('^(.*)#([^#]*)$') + + match = _tagprog.match(url) + if match: return match.group(1, 2) + return url, None + +def splitattr(url): + """splitattr('/path;attr1=value1;attr2=value2;...') -> + '/path', ['attr1=value1', 'attr2=value2', ...].""" + words = url.split(';') + return words[0], words[1:] + +_valueprog = None +def splitvalue(attr): + """splitvalue('attr=value') --> 'attr', 'value'.""" + global _valueprog + if _valueprog is None: + import re + _valueprog = re.compile('^([^=]*)=(.*)$') + + match = _valueprog.match(attr) + if match: return match.group(1, 2) + return attr, None diff --git a/upip/urllib/urequest.py b/upip/urllib/urequest.py new file mode 100644 index 0000000..fd52721 --- /dev/null +++ b/upip/urllib/urequest.py @@ -0,0 +1,65 @@ +import usocket + +def urlopen(url, data=None, method="GET"): + if data is not None and method == "GET": + method = "POST" + try: + proto, dummy, host, path = url.split("/", 3) + except ValueError: + proto, dummy, host = url.split("/", 2) + path = "" + if proto == "http:": + port = 80 + elif proto == "https:": + import ussl + port = 443 + else: + raise ValueError("Unsupported protocol: " + proto) + + if ":" in host: + host, port = host.split(":", 1) + port = int(port) + + ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM) + ai = ai[0] + + s = usocket.socket(ai[0], ai[1], ai[2]) + try: + s.connect(ai[-1]) + if proto == "https:": + s = ussl.wrap_socket(s, server_hostname=host) + + s.write(method) + s.write(b" /") + s.write(path) + s.write(b" HTTP/1.0\r\nHost: ") + s.write(host) + s.write(b"\r\n") + + if data: + s.write(b"Content-Length: ") + s.write(str(len(data))) + s.write(b"\r\n") + s.write(b"\r\n") + if data: + s.write(data) + + l = s.readline() + l = l.split(None, 2) + #print(l) + status = int(l[1]) + while True: + l = s.readline() + if not l or l == b"\r\n": + break + #print(l) + if l.startswith(b"Transfer-Encoding:"): + if b"chunked" in l: + raise ValueError("Unsupported " + l) + elif l.startswith(b"Location:"): + raise NotImplementedError("Redirects not yet supported") + except OSError: + s.close() + raise + + return s diff --git a/upip/utarfile.py b/upip/utarfile.py new file mode 100644 index 0000000..460ca2c --- /dev/null +++ b/upip/utarfile.py @@ -0,0 +1,94 @@ +import uctypes + +# http://www.gnu.org/software/tar/manual/html_node/Standard.html +TAR_HEADER = { + "name": (uctypes.ARRAY | 0, uctypes.UINT8 | 100), + "size": (uctypes.ARRAY | 124, uctypes.UINT8 | 11), +} + +DIRTYPE = "dir" +REGTYPE = "file" + +def roundup(val, align): + return (val + align - 1) & ~(align - 1) + +class FileSection: + + def __init__(self, f, content_len, aligned_len): + self.f = f + self.content_len = content_len + self.align = aligned_len - content_len + + def read(self, sz=65536): + if self.content_len == 0: + return b"" + if sz > self.content_len: + sz = self.content_len + data = self.f.read(sz) + sz = len(data) + self.content_len -= sz + return data + + def readinto(self, buf): + if self.content_len == 0: + return 0 + if len(buf) > self.content_len: + buf = memoryview(buf)[:self.content_len] + sz = self.f.readinto(buf) + self.content_len -= sz + return sz + + def skip(self): + sz = self.content_len + self.align + if sz: + buf = bytearray(16) + while sz: + s = min(sz, 16) + self.f.readinto(buf, s) + sz -= s + +class TarInfo: + + def __str__(self): + return "TarInfo(%r, %s, %d)" % (self.name, self.type, self.size) + +class TarFile: + + def __init__(self, name=None, fileobj=None): + if fileobj: + self.f = fileobj + else: + self.f = open(name, "rb") + self.subf = None + + def next(self): + if self.subf: + self.subf.skip() + buf = self.f.read(512) + if not buf: + return None + + h = uctypes.struct(uctypes.addressof(buf), TAR_HEADER, uctypes.LITTLE_ENDIAN) + + # Empty block means end of archive + if h.name[0] == 0: + return None + + d = TarInfo() + d.name = str(h.name, "utf-8").rstrip("\0") + d.size = int(bytes(h.size), 8) + d.type = [REGTYPE, DIRTYPE][d.name[-1] == "/"] + self.subf = d.subf = FileSection(self.f, d.size, roundup(d.size, 512)) + return d + + def __iter__(self): + return self + + def __next__(self): + v = self.next() + if v is None: + raise StopIteration + return v + + def extractfile(self, tarinfo): + return tarinfo.subf diff --git a/upip/uu.py b/upip/uu.py new file mode 100644 index 0000000..d68d293 --- /dev/null +++ b/upip/uu.py @@ -0,0 +1,199 @@ +#! /usr/bin/env python3 + +# Copyright 1994 by Lance Ellinghouse +# Cathedral City, California Republic, United States of America. +# All Rights Reserved +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose and without fee is hereby granted, +# provided that the above copyright notice appear in all copies and that +# both that copyright notice and this permission notice appear in +# supporting documentation, and that the name of Lance Ellinghouse +# not be used in advertising or publicity pertaining to distribution +# of the software without specific, written prior permission. +# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE +# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# +# Modified by Jack Jansen, CWI, July 1995: +# - Use binascii module to do the actual line-by-line conversion +# between ascii and binary. This results in a 1000-fold speedup. The C +# version is still 5 times faster, though. +# - Arguments more compliant with python standard + +"""Implementation of the UUencode and UUdecode functions. + +encode(in_file, out_file [,name, mode]) +decode(in_file [, out_file, mode]) +""" + +import binascii +import os +import sys + +__all__ = ["Error", "encode", "decode"] + +class Error(Exception): + pass + +def encode(in_file, out_file, name=None, mode=None): + """Uuencode file""" + # + # If in_file is a pathname open it and change defaults + # + opened_files = [] + try: + if in_file == '-': + in_file = sys.stdin.buffer + elif isinstance(in_file, str): + if name is None: + name = os.path.basename(in_file) + if mode is None: + try: + mode = os.stat(in_file).st_mode + except AttributeError: + pass + in_file = open(in_file, 'rb') + opened_files.append(in_file) + # + # Open out_file if it is a pathname + # + if out_file == '-': + out_file = sys.stdout.buffer + elif isinstance(out_file, str): + out_file = open(out_file, 'wb') + opened_files.append(out_file) + # + # Set defaults for name and mode + # + if name is None: + name = '-' + if mode is None: + mode = 0o666 + # + # Write the data + # + out_file.write(('begin %o %s\n' % ((mode & 0o777), name)).encode("ascii")) + data = in_file.read(45) + while len(data) > 0: + out_file.write(binascii.b2a_uu(data)) + data = in_file.read(45) + out_file.write(b' \nend\n') + finally: + for f in opened_files: + f.close() + + +def decode(in_file, out_file=None, mode=None, quiet=False): + """Decode uuencoded file""" + # + # Open the input file, if needed. + # + opened_files = [] + if in_file == '-': + in_file = sys.stdin.buffer + elif isinstance(in_file, str): + in_file = open(in_file, 'rb') + opened_files.append(in_file) + + try: + # + # Read until a begin is encountered or we've exhausted the file + # + while True: + hdr = in_file.readline() + if not hdr: + raise Error('No valid begin line found in input file') + if not hdr.startswith(b'begin'): + continue + hdrfields = hdr.split(b' ', 2) + if len(hdrfields) == 3 and hdrfields[0] == b'begin': + try: + int(hdrfields[1], 8) + break + except ValueError: + pass + if out_file is None: + # If the filename isn't ASCII, what's up with that?!? + out_file = hdrfields[2].rstrip(b' \t\r\n\f').decode("ascii") + if os.path.exists(out_file): + raise Error('Cannot overwrite existing file: %s' % out_file) + if mode is None: + mode = int(hdrfields[1], 8) + # + # Open the output file + # + if out_file == '-': + out_file = sys.stdout.buffer + elif isinstance(out_file, str): + fp = open(out_file, 'wb') + try: + os.path.chmod(out_file, mode) + except AttributeError: + pass + out_file = fp + opened_files.append(out_file) + # + # Main decoding loop + # + s = in_file.readline() + while s and s.strip(b' \t\r\n\f') != b'end': + try: + data = binascii.a2b_uu(s) + except binascii.Error as v: + # Workaround for broken uuencoders by /Fredrik Lundh + nbytes = (((s[0]-32) & 63) * 4 + 5) // 3 + data = binascii.a2b_uu(s[:nbytes]) + if not quiet: + sys.stderr.write("Warning: %s\n" % v) + out_file.write(data) + s = in_file.readline() + if not s: + raise Error('Truncated input file') + finally: + for f in opened_files: + f.close() + +def test(): + """uuencode/uudecode main program""" + + import optparse + parser = optparse.OptionParser(usage='usage: %prog [-d] [-t] [input [output]]') + parser.add_option('-d', '--decode', dest='decode', help='Decode (instead of encode)?', default=False, action='store_true') + parser.add_option('-t', '--text', dest='text', help='data is text, encoded format unix-compatible text?', default=False, action='store_true') + + (options, args) = parser.parse_args() + if len(args) > 2: + parser.error('incorrect number of arguments') + sys.exit(1) + + # Use the binary streams underlying stdin/stdout + input = sys.stdin.buffer + output = sys.stdout.buffer + if len(args) > 0: + input = args[0] + if len(args) > 1: + output = args[1] + + if options.decode: + if options.text: + if isinstance(output, str): + output = open(output, 'wb') + else: + print(sys.argv[0], ': cannot do -t to stdout') + sys.exit(1) + decode(input, output) + else: + if options.text: + if isinstance(input, str): + input = open(input, 'rb') + else: + print(sys.argv[0], ': cannot do -t from stdin') + sys.exit(1) + encode(input, output) + +if __name__ == '__main__': + test() diff --git a/upip/warnings.py b/upip/warnings.py new file mode 100644 index 0000000..1cb31b5 --- /dev/null +++ b/upip/warnings.py @@ -0,0 +1,2 @@ +def warn(msg, cat=None, stacklevel=1): + print("%s: %s" % ("Warning" if cat is None else cat.__name__, msg)) diff --git a/upip/weakref.py b/upip/weakref.py new file mode 100644 index 0000000..76aabfa --- /dev/null +++ b/upip/weakref.py @@ -0,0 +1,7 @@ +# +# This is completely dummy implementation, which does not +# provide real weak references, and thus will hoard memory! +# + +def proxy(obj, cb=None): + return obj diff --git a/upip/xmltok.py b/upip/xmltok.py new file mode 100644 index 0000000..c46f2bd --- /dev/null +++ b/upip/xmltok.py @@ -0,0 +1,142 @@ +TEXT = "TEXT" +START_TAG = "START_TAG" +#START_TAG_DONE = "START_TAG_DONE" +END_TAG = "END_TAG" +PI = "PI" +#PI_DONE = "PI_DONE" +ATTR = "ATTR" +#ATTR_VAL = "ATTR_VAL" + +class XMLSyntaxError(Exception): + pass + +class XMLTokenizer: + + def __init__(self, f): + self.f = f + self.nextch() + + def curch(self): + return self.c + + def getch(self): + c = self.c + self.nextch() + return c + + def eof(self): + return self.c == "" + + def nextch(self): + self.c = self.f.read(1) + if not self.c: + raise StopIteration + return self.c + + def skip_ws(self): + while self.curch().isspace(): + self.nextch() + + def isident(self): + self.skip_ws() + return self.curch().isalpha() + + def getident(self): + self.skip_ws() + ident = "" + while True: + c = self.curch() + if not(c.isalpha() or c.isdigit() or c in "_-."): + break + ident += self.getch() + return ident + + def getnsident(self): + ns = "" + ident = self.getident() + if self.curch() == ":": + self.nextch() + ns = ident + ident = self.getident() + return (ns, ident) + + def match(self, c): + self.skip_ws() + if self.curch() == c: + self.nextch() + return True + return False + + def expect(self, c): + if not self.match(c): + raise XMLSyntaxError + + def lex_attrs_till(self): + while self.isident(): + attr = self.getnsident() + #yield (ATTR, attr) + self.expect("=") + self.expect('"') + val = "" + while self.curch() != '"': + val += self.getch() + #yield (ATTR_VAL, val) + self.expect('"') + yield (ATTR, attr, val) + + def tokenize(self): + while not self.eof(): + if self.match("<"): + if self.match("/"): + yield (END_TAG, self.getnsident()) + self.expect(">") + elif self.match("?"): + yield (PI, self.getident()) + yield from self.lex_attrs_till() + self.expect("?") + self.expect(">") + elif self.match("!"): + self.expect("-") + self.expect("-") + last3 = '' + while True: + last3 = last3[-2:] + self.getch() + if last3 == "-->": + break + else: + tag = self.getnsident() + yield (START_TAG, tag) + yield from self.lex_attrs_till() + if self.match("/"): + yield (END_TAG, tag) + self.expect(">") + else: + text = "" + while self.curch() != "<": + text += self.getch() + if text: + yield (TEXT, text) + + +def gfind(gen, pred): + for i in gen: + if pred(i): + return i + +def text_of(gen, tag): + # Return text content of a leaf tag + def match_tag(t): + if t[0] != START_TAG: + return False + if isinstance(tag, ()): + return t[1] == tag + return t[1][1] == tag + + gfind(gen, match_tag) + # Assumes no attributes + t, val = next(gen) + assert t == TEXT + return val + +def tokenize(file): + return XMLTokenizer(file).tokenize() diff --git a/upip/zlib.py b/upip/zlib.py new file mode 100644 index 0000000..e803341 --- /dev/null +++ b/upip/zlib.py @@ -0,0 +1 @@ +from uzlib import *