parent
6e4e87e6c4
commit
32e278d4c9
14 changed files with 943 additions and 10 deletions
@ -0,0 +1,466 @@ |
||||
#!/usr/bin/env python |
||||
# |
||||
# This file is part of the MicroPython project, http://micropython.org/ |
||||
# |
||||
# The MIT License (MIT) |
||||
# |
||||
# Copyright (c) 2014-2016 Damien P. George |
||||
# Copyright (c) 2017 Paul Sokolovsky |
||||
# |
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
# of this software and associated documentation files (the "Software"), to deal |
||||
# in the Software without restriction, including without limitation the rights |
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
# copies of the Software, and to permit persons to whom the Software is |
||||
# furnished to do so, subject to the following conditions: |
||||
# |
||||
# The above copyright notice and this permission notice shall be included in |
||||
# all copies or substantial portions of the Software. |
||||
# |
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
||||
# THE SOFTWARE. |
||||
|
||||
""" |
||||
pyboard interface |
||||
|
||||
This module provides the Pyboard class, used to communicate with and |
||||
control a MicroPython device over a communication channel. Both real |
||||
boards and emulated devices (e.g. running in QEMU) are supported. |
||||
Various communication channels are supported, including a serial |
||||
connection, telnet-style network connection, external process |
||||
connection. |
||||
|
||||
Example usage: |
||||
|
||||
import pyboard |
||||
pyb = pyboard.Pyboard('/dev/ttyACM0') |
||||
|
||||
Or: |
||||
|
||||
pyb = pyboard.Pyboard('192.168.1.1') |
||||
|
||||
Then: |
||||
|
||||
pyb.enter_raw_repl() |
||||
pyb.exec('pyb.LED(1).on()') |
||||
pyb.exit_raw_repl() |
||||
|
||||
Note: if using Python2 then pyb.exec must be written as pyb.exec_. |
||||
To run a script from the local machine on the board and print out the results: |
||||
|
||||
import pyboard |
||||
pyboard.execfile('test.py', device='/dev/ttyACM0') |
||||
|
||||
This script can also be run directly. To execute a local script, use: |
||||
|
||||
./pyboard.py test.py |
||||
|
||||
Or: |
||||
|
||||
python pyboard.py test.py |
||||
|
||||
""" |
||||
|
||||
import sys |
||||
import time |
||||
import os |
||||
|
||||
try: |
||||
stdout = sys.stdout.buffer |
||||
except AttributeError: |
||||
# Python2 doesn't have buffer attr |
||||
stdout = sys.stdout |
||||
|
||||
def stdout_write_bytes(b): |
||||
b = b.replace(b"\x04", b"") |
||||
stdout.write(b) |
||||
stdout.flush() |
||||
|
||||
class PyboardError(BaseException): |
||||
pass |
||||
|
||||
class TelnetToSerial: |
||||
def __init__(self, ip, user, password, read_timeout=None): |
||||
import telnetlib |
||||
self.tn = telnetlib.Telnet(ip, timeout=15) |
||||
self.read_timeout = read_timeout |
||||
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): |
||||
self.tn.write(bytes(user, 'ascii') + b"\r\n") |
||||
|
||||
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): |
||||
# needed because of internal implementation details of the telnet server |
||||
time.sleep(0.2) |
||||
self.tn.write(bytes(password, 'ascii') + b"\r\n") |
||||
|
||||
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): |
||||
# login successful |
||||
from collections import deque |
||||
self.fifo = deque() |
||||
return |
||||
|
||||
raise PyboardError('Failed to establish a telnet connection with the board') |
||||
|
||||
def __del__(self): |
||||
self.close() |
||||
|
||||
def close(self): |
||||
try: |
||||
self.tn.close() |
||||
except: |
||||
# the telnet object might not exist yet, so ignore this one |
||||
pass |
||||
|
||||
def read(self, size=1): |
||||
while len(self.fifo) < size: |
||||
timeout_count = 0 |
||||
data = self.tn.read_eager() |
||||
if len(data): |
||||
self.fifo.extend(data) |
||||
timeout_count = 0 |
||||
else: |
||||
time.sleep(0.25) |
||||
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: |
||||
break |
||||
timeout_count += 1 |
||||
|
||||
data = b'' |
||||
while len(data) < size and len(self.fifo) > 0: |
||||
data += bytes([self.fifo.popleft()]) |
||||
return data |
||||
|
||||
def write(self, data): |
||||
self.tn.write(data) |
||||
return len(data) |
||||
|
||||
def inWaiting(self): |
||||
n_waiting = len(self.fifo) |
||||
if not n_waiting: |
||||
data = self.tn.read_eager() |
||||
self.fifo.extend(data) |
||||
return len(data) |
||||
else: |
||||
return n_waiting |
||||
|
||||
|
||||
class ProcessToSerial: |
||||
"Execute a process and emulate serial connection using its stdin/stdout." |
||||
|
||||
def __init__(self, cmd): |
||||
import subprocess |
||||
self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid, |
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
||||
|
||||
# Initially was implemented with selectors, but that adds Python3 |
||||
# dependency. However, there can be race conditions communicating |
||||
# with a particular child process (like QEMU), and selectors may |
||||
# still work better in that case, so left inplace for now. |
||||
# |
||||
#import selectors |
||||
#self.sel = selectors.DefaultSelector() |
||||
#self.sel.register(self.subp.stdout, selectors.EVENT_READ) |
||||
|
||||
import select |
||||
self.poll = select.poll() |
||||
self.poll.register(self.subp.stdout.fileno()) |
||||
|
||||
def close(self): |
||||
import signal |
||||
os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) |
||||
|
||||
def read(self, size=1): |
||||
data = b"" |
||||
while len(data) < size: |
||||
data += self.subp.stdout.read(size - len(data)) |
||||
return data |
||||
|
||||
def write(self, data): |
||||
self.subp.stdin.write(data) |
||||
return len(data) |
||||
|
||||
def inWaiting(self): |
||||
#res = self.sel.select(0) |
||||
res = self.poll.poll(0) |
||||
if res: |
||||
return 1 |
||||
return 0 |
||||
|
||||
|
||||
class ProcessPtyToTerminal: |
||||
"""Execute a process which creates a PTY and prints slave PTY as |
||||
first line of its output, and emulate serial connection using |
||||
this PTY.""" |
||||
|
||||
def __init__(self, cmd): |
||||
import subprocess |
||||
import re |
||||
import serial |
||||
self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=False, preexec_fn=os.setsid, |
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
||||
pty_line = self.subp.stderr.readline().decode("utf-8") |
||||
m = re.search(r"/dev/pts/[0-9]+", pty_line) |
||||
if not m: |
||||
print("Error: unable to find PTY device in startup line:", pty_line) |
||||
self.close() |
||||
sys.exit(1) |
||||
pty = m.group() |
||||
# rtscts, dsrdtr params are to workaround pyserial bug: |
||||
# http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port |
||||
self.ser = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True) |
||||
|
||||
def close(self): |
||||
import signal |
||||
os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) |
||||
|
||||
def read(self, size=1): |
||||
return self.ser.read(size) |
||||
|
||||
def write(self, data): |
||||
return self.ser.write(data) |
||||
|
||||
def inWaiting(self): |
||||
return self.ser.inWaiting() |
||||
|
||||
|
||||
class Pyboard: |
||||
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0): |
||||
if device.startswith("exec:"): |
||||
self.serial = ProcessToSerial(device[len("exec:"):]) |
||||
elif device.startswith("execpty:"): |
||||
self.serial = ProcessPtyToTerminal(device[len("qemupty:"):]) |
||||
elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: |
||||
# device looks like an IP address |
||||
self.serial = TelnetToSerial(device, user, password, read_timeout=10) |
||||
else: |
||||
import serial |
||||
delayed = False |
||||
for attempt in range(wait + 1): |
||||
try: |
||||
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) |
||||
break |
||||
except (OSError, IOError): # Py2 and Py3 have different errors |
||||
if wait == 0: |
||||
continue |
||||
if attempt == 0: |
||||
sys.stdout.write('Waiting {} seconds for pyboard '.format(wait)) |
||||
delayed = True |
||||
time.sleep(1) |
||||
sys.stdout.write('.') |
||||
sys.stdout.flush() |
||||
else: |
||||
if delayed: |
||||
print('') |
||||
raise PyboardError('failed to access ' + device) |
||||
if delayed: |
||||
print('') |
||||
|
||||
def close(self): |
||||
self.serial.close() |
||||
|
||||
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): |
||||
data = self.serial.read(min_num_bytes) |
||||
if data_consumer: |
||||
data_consumer(data) |
||||
timeout_count = 0 |
||||
while True: |
||||
if data.endswith(ending): |
||||
break |
||||
elif self.serial.inWaiting() > 0: |
||||
new_data = self.serial.read(1) |
||||
data = data + new_data |
||||
if data_consumer: |
||||
data_consumer(new_data) |
||||
timeout_count = 0 |
||||
else: |
||||
timeout_count += 1 |
||||
if timeout is not None and timeout_count >= 100 * timeout: |
||||
break |
||||
time.sleep(0.01) |
||||
return data |
||||
|
||||
def enter_raw_repl(self): |
||||
self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program |
||||
|
||||
# flush input (without relying on serial.flushInput()) |
||||
n = self.serial.inWaiting() |
||||
while n > 0: |
||||
self.serial.read(n) |
||||
n = self.serial.inWaiting() |
||||
|
||||
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL |
||||
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') |
||||
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): |
||||
print(data) |
||||
raise PyboardError('could not enter raw repl') |
||||
|
||||
self.serial.write(b'\x04') # ctrl-D: soft reset |
||||
data = self.read_until(1, b'soft reboot\r\n') |
||||
if not data.endswith(b'soft reboot\r\n'): |
||||
print(data) |
||||
raise PyboardError('could not enter raw repl') |
||||
# By splitting this into 2 reads, it allows boot.py to print stuff, |
||||
# which will show up after the soft reboot and before the raw REPL. |
||||
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') |
||||
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): |
||||
print(data) |
||||
raise PyboardError('could not enter raw repl') |
||||
|
||||
def exit_raw_repl(self): |
||||
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL |
||||
|
||||
def follow(self, timeout, data_consumer=None): |
||||
# wait for normal output |
||||
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) |
||||
if not data.endswith(b'\x04'): |
||||
raise PyboardError('timeout waiting for first EOF reception') |
||||
data = data[:-1] |
||||
|
||||
# wait for error output |
||||
data_err = self.read_until(1, b'\x04', timeout=timeout) |
||||
if not data_err.endswith(b'\x04'): |
||||
raise PyboardError('timeout waiting for second EOF reception') |
||||
data_err = data_err[:-1] |
||||
|
||||
# return normal and error output |
||||
return data, data_err |
||||
|
||||
def exec_raw_no_follow(self, command): |
||||
if isinstance(command, bytes): |
||||
command_bytes = command |
||||
else: |
||||
command_bytes = bytes(command, encoding='utf8') |
||||
|
||||
# check we have a prompt |
||||
data = self.read_until(1, b'>') |
||||
if not data.endswith(b'>'): |
||||
raise PyboardError('could not enter raw repl') |
||||
|
||||
# write command |
||||
for i in range(0, len(command_bytes), 256): |
||||
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) |
||||
time.sleep(0.01) |
||||
self.serial.write(b'\x04') |
||||
|
||||
# check if we could exec command |
||||
data = self.serial.read(2) |
||||
if data != b'OK': |
||||
raise PyboardError('could not exec command (response: %r)' % data) |
||||
|
||||
def exec_raw(self, command, timeout=10, data_consumer=None): |
||||
self.exec_raw_no_follow(command); |
||||
return self.follow(timeout, data_consumer) |
||||
|
||||
def eval(self, expression): |
||||
ret = self.exec_('print({})'.format(expression)) |
||||
ret = ret.strip() |
||||
return ret |
||||
|
||||
def exec_(self, command): |
||||
ret, ret_err = self.exec_raw(command) |
||||
if ret_err: |
||||
raise PyboardError('exception', ret, ret_err) |
||||
return ret |
||||
|
||||
def execfile(self, filename): |
||||
with open(filename, 'rb') as f: |
||||
pyfile = f.read() |
||||
return self.exec_(pyfile) |
||||
|
||||
def get_time(self): |
||||
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') |
||||
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) |
||||
|
||||
# in Python2 exec is a keyword so one must use "exec_" |
||||
# but for Python3 we want to provide the nicer version "exec" |
||||
setattr(Pyboard, "exec", Pyboard.exec_) |
||||
|
||||
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): |
||||
pyb = Pyboard(device, baudrate, user, password) |
||||
pyb.enter_raw_repl() |
||||
output = pyb.execfile(filename) |
||||
stdout_write_bytes(output) |
||||
pyb.exit_raw_repl() |
||||
pyb.close() |
||||
|
||||
def main(): |
||||
import argparse |
||||
cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') |
||||
cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') |
||||
cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') |
||||
cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') |
||||
cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') |
||||
cmd_parser.add_argument('-c', '--command', help='program passed in as string') |
||||
cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') |
||||
cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') |
||||
cmd_parser.add_argument('files', nargs='*', help='input files') |
||||
args = cmd_parser.parse_args() |
||||
|
||||
# open the connection to the pyboard |
||||
try: |
||||
pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) |
||||
except PyboardError as er: |
||||
print(er) |
||||
sys.exit(1) |
||||
|
||||
# run any command or file(s) |
||||
if args.command is not None or len(args.files): |
||||
# we must enter raw-REPL mode to execute commands |
||||
# this will do a soft-reset of the board |
||||
try: |
||||
pyb.enter_raw_repl() |
||||
except PyboardError as er: |
||||
print(er) |
||||
pyb.close() |
||||
sys.exit(1) |
||||
|
||||
def execbuffer(buf): |
||||
try: |
||||
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) |
||||
except PyboardError as er: |
||||
print(er) |
||||
pyb.close() |
||||
sys.exit(1) |
||||
except KeyboardInterrupt: |
||||
sys.exit(1) |
||||
if ret_err: |
||||
pyb.exit_raw_repl() |
||||
pyb.close() |
||||
stdout_write_bytes(ret_err) |
||||
sys.exit(1) |
||||
|
||||
# run the command, if given |
||||
if args.command is not None: |
||||
execbuffer(args.command.encode('utf-8')) |
||||
|
||||
# run any files |
||||
for filename in args.files: |
||||
with open(filename, 'rb') as f: |
||||
pyfile = f.read() |
||||
execbuffer(pyfile) |
||||
|
||||
# exiting raw-REPL just drops to friendly-REPL mode |
||||
pyb.exit_raw_repl() |
||||
|
||||
# if asked explicitly, or no files given, then follow the output |
||||
if args.follow or (args.command is None and len(args.files) == 0): |
||||
try: |
||||
ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) |
||||
except PyboardError as er: |
||||
print(er) |
||||
sys.exit(1) |
||||
except KeyboardInterrupt: |
||||
sys.exit(1) |
||||
if ret_err: |
||||
pyb.close() |
||||
stdout_write_bytes(ret_err) |
||||
sys.exit(1) |
||||
|
||||
# close the connection to the pyboard |
||||
pyb.close() |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
@ -0,0 +1,100 @@ |
||||
from pyboard import Pyboard, PyboardError |
||||
import glob, sys, pyboard |
||||
|
||||
_pyb = None |
||||
|
||||
def get_pyb(args): |
||||
global _pyb |
||||
if not _pyb: |
||||
if not args.device: |
||||
args.device = find_tty() |
||||
|
||||
# open the connection to the pyboard |
||||
try: |
||||
_pyb = Pyboard(args.device, args.baudrate, None, None, args.wait) |
||||
except PyboardError as er: |
||||
print(er) |
||||
sys.exit(1) |
||||
print("Connected to badge.") |
||||
|
||||
return _pyb |
||||
|
||||
def close_pyb(): |
||||
global _pyb |
||||
if _pyb: |
||||
_pyb.close() |
||||
|
||||
def stop_badge(args): |
||||
pyb = get_pyb(args) |
||||
print("stopping running app") |
||||
write_command(pyb, b'\r\x03\x03') # ctrl-C twice: interrupt any running program |
||||
|
||||
def write_command(pyb, command): |
||||
flush_input(pyb) |
||||
pyb.serial.write(command) |
||||
flush_input(pyb) |
||||
|
||||
def flush_input(pyb): |
||||
n = pyb.serial.inWaiting() |
||||
while n > 0: |
||||
pyb.serial.read(n) |
||||
n = pyb.serial.inWaiting() |
||||
|
||||
def soft_reset(args): |
||||
pyb = get_pyb(args) |
||||
print("trying to soft reboot badge") |
||||
write_command(pyb, b'\x04') # ctrl-D: soft reset |
||||
#print("1") |
||||
data = pyb.read_until(1, b'soft reboot\r\n') |
||||
#print("2") |
||||
if data.endswith(b'soft reboot\r\n'): |
||||
print("Soft reboot was successful.") |
||||
else: |
||||
raise PyboardError('could not soft reboot') |
||||
|
||||
def find_tty(): |
||||
# Todo: find solution for windows, test in linux |
||||
for pattern in ['/dev/ttyACM*', '/dev/tty.usbmodem*']: |
||||
for path in glob.glob(pattern): |
||||
return path |
||||
print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary") |
||||
sys.exit(1) |
||||
|
||||
def run(args): |
||||
pyb = get_pyb(args) |
||||
print("executing %s" % args.paths) |
||||
print("----------------") |
||||
# run any command or file(s) - this is mostly a copy from pyboard.py |
||||
if args.command is not None or len(args.paths): |
||||
# we must enter raw-REPL mode to execute commands |
||||
# this will do a soft-reset of the board |
||||
try: |
||||
pyb.enter_raw_repl() |
||||
except PyboardError as er: |
||||
print(er) |
||||
pyb.close() |
||||
sys.exit(1) |
||||
|
||||
def execbuffer(buf): |
||||
try: |
||||
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=pyboard.stdout_write_bytes) |
||||
except PyboardError as er: |
||||
print(er) |
||||
pyb.close() |
||||
sys.exit(1) |
||||
except KeyboardInterrupt: |
||||
sys.exit(1) |
||||
if ret_err: |
||||
pyb.exit_raw_repl() |
||||
pyb.close() |
||||
pyboard.stdout_write_bytes(ret_err) |
||||
sys.exit(1) |
||||
|
||||
# run any files |
||||
for filename in args.paths: |
||||
with open(filename, 'rb') as f: |
||||
pyfile = f.read() |
||||
execbuffer(pyfile) |
||||
|
||||
# exiting raw-REPL just drops to friendly-REPL mode |
||||
pyb.exit_raw_repl() |
@ -0,0 +1,57 @@ |
||||
import os, glob, shutil, sys |
||||
|
||||
def sync(storage, patterns): |
||||
root = get_root() |
||||
|
||||
# Add all paths that are already files |
||||
paths = [os.path.join(root, p) for p in (patterns or []) if os.path.isfile(os.path.join(root, p))] |
||||
|
||||
if patterns: |
||||
new_patterns = [] |
||||
patterns = [os.path.join(root, p, "**") for p in patterns] |
||||
else: |
||||
patterns = ["**/**", "boot.py"] |
||||
|
||||
for pattern in patterns: |
||||
for path in glob.glob(pattern): |
||||
paths.append(path) |
||||
|
||||
if len(paths) == 0: |
||||
print("No files to copy found for pattern %s" % patterns) |
||||
sys.exit(1) |
||||
|
||||
for path in paths: |
||||
rel_path = os.path.relpath(path, root) |
||||
if rel_path.startswith("."): |
||||
continue |
||||
print("Copying %s..." % rel_path) |
||||
|
||||
target = os.path.join(storage, rel_path) |
||||
target_dir = os.path.dirname(target) |
||||
if os.path.isfile(target_dir): |
||||
# micropython has the tendency to sometimes corrupt directories into files |
||||
os.remove(target_dir) |
||||
if not os.path.exists(target_dir): |
||||
os.makedirs(target_dir) |
||||
shutil.copy2(path, target) |
||||
|
||||
else: |
||||
print("Files copied successfully") |
||||
|
||||
|
||||
def set_boot_app(storage, app_to_boot): |
||||
path = os.path.join(storage, 'once.txt') |
||||
try: |
||||
os.remove(path) |
||||
except OSError: |
||||
pass |
||||
with open(path, 'w') as f: |
||||
f.write(app_to_boot + "\n") |
||||
print("setting next boot to %s" % app_to_boot) |
||||
|
||||
def get_root(): |
||||
root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..')) |
||||
if not os.path.isfile(os.path.join(root, "boot.py")): |
||||
print("Path %s doesn't contain a boot.py, aborting. Something is probably wrong with your setup.") |
||||
sys.exit(1) |
||||
return root |
@ -0,0 +1,101 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
"""Toolchain for working with the TiLDA Mk4 |
||||
|
||||
Usage (currently more of a wishlist) |
||||
------------------------------------ |
||||
|
||||
Reboot badge |
||||
$ tilda_tools reset |
||||
|
||||
Soft reboot badge and start specific app |
||||
$ tilda_tools reset --boot my_app |
||||
|
||||
Update files on the badge to match the current local version, restarts afterwards |
||||
$ tilda_tools sync |
||||
|
||||
Update files in folder(s) to match current local version |
||||
$ tilda_tools sync my_game shared |
||||
$ tilda_tools sync <folder1> <folder2> ... |
||||
|
||||
Sync (as above), but execute my_app after reboot |
||||
$ tilda_toold.py sync --boot my_app [<other sync parameter>] |
||||
|
||||
Sync (as above), but execute a single file afterwards without copying it to the badge |
||||
$ tilda_toold.py sync --run some_other_file.py |
||||
|
||||
Executes a single file on the badge without copying anything (Using pyboard.py) |
||||
$ tilda_tools run my_app/main.py |
||||
|
||||
Runs local validation against metadata (doesn't require a badge) |
||||
$ tilda_tools validate |
||||
|
||||
Runs local validation and badge-side tests |
||||
$ tilda_tools test |
||||
|
||||
Update firmware on badge (warning, this will delete all settings etc. stored on the badge!) |
||||
$ tilda_tools firmware-update |
||||
|
||||
Common parameters |
||||
----------------- |
||||
|
||||
-d --device : serial interface (default: auto) |
||||
-s --storage : path to flash storage |
||||
|
||||
""" |
||||
|
||||
import sys, glob |
||||
import sync, pyboard_util |
||||
|
||||
def main(): |
||||
import argparse |
||||
cmd_parser = argparse.ArgumentParser(description='Toolchain for working with the TiLDA Mk4') |
||||
cmd_parser.add_argument('command', nargs=1, help='command') |
||||
cmd_parser.add_argument('-d', '--device', help='the serial device of the badge') |
||||
cmd_parser.add_argument('-s', '--storage', help='the usb mass storage path of the badge') |
||||
cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') |
||||
cmd_parser.add_argument('--boot', help='defines which app to boot into after reboot') |
||||
cmd_parser.add_argument('--run', help='like run, but after a sync') |
||||
cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') |
||||
cmd_parser.add_argument('paths', nargs='*', help='input files') |
||||
args = cmd_parser.parse_args() |
||||
command = args.command[0] |
||||
|
||||
if command in ["reset", "sync"]: |
||||
pyboard_util.stop_badge(args) |
||||
|
||||
if command == "sync": |
||||
paths = args.paths if len(args.paths) else None |
||||
sync.sync(get_storage(args), paths) |
||||
|
||||
if command in ["reset", "sync"]: |
||||
sync.set_boot_app(get_storage(args), args.boot or "") |
||||
pyboard_util.soft_reset(args) |
||||
if args.run: |
||||
command = "run" |
||||
args.paths = [args.run] |
||||
|
||||
if command == "run": |
||||
pyboard_util.run(args) |
||||
|
||||
|
||||
pyboard_util.close_pyb() |
||||
|
||||
def find_storage(): |
||||
# todo: find solution for windows and linux |
||||
for pattern in ['/Volumes/PYBFLASH']: |
||||
for path in glob.glob(pattern): |
||||
return path |
||||
print("Couldn't find badge storage - Please make it's plugged in and reset it if necessary") |
||||
sys.exit(1) |
||||
|
||||
def get_storage(args): |
||||
if not args.storage: |
||||
args.storage = find_storage() |
||||
return args.storage |
||||
|
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
|
@ -1 +1,2 @@ |
||||
.DS_Store |
||||
__pycache__ |
||||
|
@ -0,0 +1,9 @@ |
||||
TiLDA Mk4 App Library |
||||
------------------- |
||||
|
||||
* How to use tilda_tools.py |
||||
* How to transfer this to badge |
||||
* How to fork |
||||
* How to make your own changes |
||||
* How to run tests |
||||
* How to send a PR |
@ -0,0 +1,61 @@ |
||||
"""This app's purpose is to run a series of tests against library code |
||||
|
||||
Once successful it displays and prints 'ok' on the screen. |
||||
|
||||
Please make sure that all tests pass before sending a PR. You can easily |
||||
do this by running "tilda_tools test". Thank you for keeping all the |
||||
tests green! *face-throwing-a-kiss-emoji* |
||||
""" |
||||
|
||||
___license___ = "MIT" |
||||
___dependencies___ = ["unittest", "database"] |
||||
|
||||
import database, unittest |
||||
|
||||
class TestDatabase(unittest.TestCase): |
||||
def setUp(self): |
||||
self.filename = "test/tmp.testdb.json" |
||||
self.database = database.Database(filename = self.filename) |
||||
self._remove_test_db() |
||||
|
||||
def tearDown(self): |
||||
self._remove_test_db(); |
||||
|
||||
def test_convenience_get_default(self): |
||||
self.assertEqual( |
||||
database.get("does_not_exist", "default_value", filename=self.filename), |
||||
"default_value" |
||||
) |
||||
|
||||
def test_convenience_set_and_get(self): |
||||
database.set("foo", "bar", filename=self.filename) |
||||
self.assertEqual(database.get("foo", filename=self.filename), "bar") |
||||
|
||||
def test_convenience_delete(self): |
||||
database.set("foo", "bar", filename=self.filename) |
||||
database.delete("foo", filename=self.filename) |
||||
self.assertEqual(database.get("foo", filename=self.filename), None) |
||||
|
||||
def test_get_default(self): |
||||
self.assertEqual( |
||||
self.database.get("does_not_exist", "default_value"), |
||||
"default_value" |
||||
) |
||||
|
||||
def test_set_and_get(self): |
||||
self.database.set("foo", "bar") |
||||
self.assertEqual(self.database.get("foo"), "bar") |
||||
|
||||
def test_delete(self): |
||||
self.database.set("foo", "bar") |
||||
self.database.delete("foo") |
||||
self.assertEqual(self.database.get("foo"), None) |
||||
|
||||
def _remove_test_db(self): |
||||
try: |
||||
os.remove(self.filename) |
||||
except Exception as e: |
||||
pass |
||||
|
||||
if __name__ == "__main__": |
||||
TestDatabase().run_standalone() |
@ -0,0 +1,22 @@ |
||||
"""This app's purpose is to run a series of tests against library code |
||||
|
||||
Once successful it displays and prints 'ok' on the screen. |
||||
|
||||
Please make sure that all tests pass before sending a PR. You can easily |
||||
do this by running "tilda_tools test". Thank you for keeping all the |
||||
tests green! *face-throwing-a-kiss-emoji* |
||||
""" |
||||
|
||||
___license___ = "MIT" |
||||
___dependencies___ = ["unittest"] |
||||
|
||||
import unittest |
||||
|
||||
class TestHttp(unittest.TestCase): |
||||
|
||||
def test_foo(self): |
||||
pass |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
TestHttp().run_standalone() |
@ -0,0 +1,67 @@ |
||||
import sys |
||||
|
||||
class SkipTest(Exception): |
||||
"""Indicates a test has been skipped""" |
||||
|
||||
class FailTest(Exception): |
||||
"""Inidcates a failing test""" |
||||
def __init__(self, message=None): |
||||
self.message = message |
||||
|
||||
class TestCase(object): |
||||
def run(self): |
||||
test_class = type(self).__name__ |
||||
self.setUp() |
||||
self.count_pass = 0 |
||||
self.count_fail = 0 |
||||
self.count_skip = 0 |
||||
for method in dir(self): |
||||
if not method.startswith("test"): |
||||
continue |
||||
try: |
||||
getattr(self, method)() |
||||
print("%s.%s [PASS]" % (test_class, method)) |
||||
self.count_pass += 1 |
||||
except SkipTest as e: |
||||
print("%s.%s [SKIP]" % (test_class, method)) |
||||
self.count_skip += 1 |
||||
except FailTest as e: |
||||
print("%s.%s [FAIL]" % (test_class, method)) |
||||
print(e.message + "\n") |
||||
self.count_fail += 1 |
||||
except Exception as e: |
||||
print("%s.%s [FATAL]" % (test_class, method)) |
||||
sys.print_exception(e) |
||||
print("") |
||||
self.count_fail += 1 |
||||
self.tearDown() |
||||
return self.count_fail == 0 |
||||
|
||||
def run_standalone(self): |
||||
self.run() |
||||
print_result(self.count_pass, self.count_fail, self.count_skip) |
||||
|
||||
def runSingle(self, function): |
||||
print(self) |
||||
|
||||
def setUp(self): |
||||
pass |
||||
|
||||
def tearDown(self): |
||||
pass |
||||
|
||||
def assertEqual(self, actual, expected): |
||||
if not actual == expected: |
||||
raise FailTest("Expected %s to equal %s" % (actual, expected)) |
||||
|
||||
def assertTrue(self, actual): |
||||
self.assertEqual(actual, True) |
||||
|
||||
def assertFalse(self, actual): |
||||
self.assertEqual(actual, False) |
||||
|
||||
def skip(self): |
||||
raise SkipTest() |
||||
|
||||
def print_result(count_pass, count_fail, count_skip): |
||||
print("PASS: %s FAIL: %s SKIP: %s" % (count_pass, count_fail, count_skip)) |
@ -0,0 +1,45 @@ |
||||
"""This app's purpose is to run a series of tests against library code |
||||
|
||||
Once successful it displays and prints 'ok' on the screen. |
||||
|
||||
Please make sure that all tests pass before sending a PR. You can easily |
||||
do this by running "tilda_tools test". Thank you for keeping all the |
||||
tests green! *face-throwing-a-kiss-emoji* |
||||
""" |
||||
|
||||
___license___ = "MIT" |
||||
___categories___ = ["Development"] |
||||
___name___ = "Integration test app" |
||||
___dependencies___ = ["unittest", "test_database", "test_http"] |
||||
|
||||
# Add all tests that need to be run here: |
||||
import test_database |
||||
import test_http |
||||
|
||||
# run |
||||
import sys, unittest |
||||
|
||||
count_pass = 0 |
||||
count_fail = 0 |
||||
count_skip = 0 |
||||
log = "" |
||||
|
||||
for name, module in sys.modules.items(): |
||||
if not name.startswith("test"): |
||||
continue |
||||
for element_name in dir(module): |
||||
element = getattr(module, element_name) |
||||
if not isinstance(element, type): |
||||
continue |
||||
if not issubclass(element, unittest.TestCase): |
||||
continue |
||||
test_case = element() |
||||
test_case.run() |
||||
count_pass += test_case.count_pass |
||||
count_fail += test_case.count_fail |
||||
count_skip += test_case.count_skip |
||||
|
||||
unittest.print_result(count_pass, count_fail, count_skip) |
||||
|
||||
|
||||
|
@ -0,0 +1 @@ |
||||
.development/tilda_tools.py |
Loading…
Reference in new issue