Here be dragons: Don't use usb storage for syncing anymore, switch to REPL only
parent
6564ac3681
commit
a31d48749e
|
@ -0,0 +1,51 @@
|
|||
# This is meant to run on the badge
|
||||
import hashlib, binascii, os
|
||||
|
||||
def split(path):
|
||||
if path == "":
|
||||
return ("", "")
|
||||
r = path.rsplit("/", 1)
|
||||
if len(r) == 1:
|
||||
return ("", path)
|
||||
head = r[0]
|
||||
if not head:
|
||||
head = "/"
|
||||
return (head, r[1])
|
||||
|
||||
def dirname(path):
|
||||
return split(path)[0]
|
||||
|
||||
def exists(path):
|
||||
try:
|
||||
os.stat(path)[0]
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def makedirs(path):
|
||||
sub_path = split(path)[0]
|
||||
if sub_path and (not exists(sub_path)):
|
||||
makedirs(sub_path)
|
||||
if not exists(path):
|
||||
os.mkdir(path)
|
||||
|
||||
def h(p):
|
||||
try:
|
||||
with open(p, "rb") as f:
|
||||
h = hashlib.sha256()
|
||||
h.update(f.read())
|
||||
print(str(binascii.hexlify(h.digest()), "utf8")[:10])
|
||||
return
|
||||
except:
|
||||
pass
|
||||
print("nooooooooo")
|
||||
|
||||
def w(p, c):
|
||||
try:
|
||||
makedirs(dirname(p))
|
||||
with open(p, "wb") as f:
|
||||
f.write(binascii.a2b_base64(c))
|
||||
os.sync()
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
return None
|
|
@ -1,5 +1,5 @@
|
|||
from pyboard import Pyboard, PyboardError
|
||||
import glob, sys, pyboard
|
||||
import glob, sys, pyboard, json, binascii, os, hashlib
|
||||
|
||||
_pyb = None
|
||||
|
||||
|
@ -70,6 +70,41 @@ def check_run(paths):
|
|||
pyfile = f.read()
|
||||
compile(pyfile + '\n', filename, 'exec')
|
||||
|
||||
def execbuffer(pyb, buf):
|
||||
try:
|
||||
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=pyboard.stdout_write_bytes)
|
||||
except PyboardError as er:
|
||||
print(er)
|
||||
pyb.close()
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
if ret_err:
|
||||
pyb.exit_raw_repl()
|
||||
pyb.close()
|
||||
pyboard.stdout_write_bytes(ret_err)
|
||||
sys.exit(1)
|
||||
|
||||
def returnbuffer(pyb, buf):
|
||||
res = b''
|
||||
def add_to_res(b):
|
||||
nonlocal res
|
||||
res += b
|
||||
try:
|
||||
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=add_to_res)
|
||||
except PyboardError as er:
|
||||
print(er)
|
||||
pyb.close()
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
if ret_err:
|
||||
pyb.exit_raw_repl()
|
||||
pyb.close()
|
||||
pyboard.stdout_write_bytes(ret_err)
|
||||
sys.exit(1)
|
||||
return res.decode('ascii').strip()
|
||||
|
||||
def run(args, paths, verbose=True):
|
||||
pyb = get_pyb(args)
|
||||
|
||||
|
@ -90,28 +125,13 @@ def run(args, paths, verbose=True):
|
|||
if verbose:
|
||||
print(" DONE")
|
||||
|
||||
def execbuffer(buf):
|
||||
try:
|
||||
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=pyboard.stdout_write_bytes)
|
||||
except PyboardError as er:
|
||||
print(er)
|
||||
pyb.close()
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
if ret_err:
|
||||
pyb.exit_raw_repl()
|
||||
pyb.close()
|
||||
pyboard.stdout_write_bytes(ret_err)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
# run any files
|
||||
for filename in paths:
|
||||
with open(filename, 'rb') as f:
|
||||
print("-------- %s --------" % filename)
|
||||
pyfile = f.read()
|
||||
execbuffer(pyfile)
|
||||
execbuffer(pyb, pyfile)
|
||||
|
||||
# exiting raw-REPL just drops to friendly-REPL mode
|
||||
pyb.exit_raw_repl()
|
||||
|
@ -120,3 +140,51 @@ def run(args, paths, verbose=True):
|
|||
print("Connection to badge lost") # This can happen on a hard rest
|
||||
else:
|
||||
raise e
|
||||
|
||||
# Please don't judge me too harshly for this hack, I had lots of problems with the
|
||||
# USB mass storage protocol and at some point it looked simpler to just avoid it
|
||||
# altogether. This _seems_ to work, so maybe it isn't that terrible after all.
|
||||
|
||||
def init_copy_via_repl(args):
|
||||
pyb = get_pyb(args)
|
||||
print("Init copy via repl:", end=" ", flush=True)
|
||||
try:
|
||||
pyb.enter_raw_repl()
|
||||
with open(os.path.join(os.path.dirname(__file__), "copy_via_repl_header.py"), "rt") as f:
|
||||
execbuffer(pyb, f.read())
|
||||
|
||||
except PyboardError as er:
|
||||
print("FAIL")
|
||||
print(er)
|
||||
pyb.close()
|
||||
sys.exit(1)
|
||||
|
||||
print("DONE")
|
||||
|
||||
def copy_via_repl(args, path, rel_path):
|
||||
with open(path, "rb") as f:
|
||||
return write_via_repl(args, f.read(), rel_path)
|
||||
|
||||
def write_via_repl(args, content, rel_path):
|
||||
pyb = get_pyb(args)
|
||||
h = hashlib.sha256()
|
||||
h.update(content)
|
||||
content = binascii.b2a_base64(content).decode('ascii').strip()
|
||||
rel_path_as_string = json.dumps(rel_path) # make sure quotes are escaped
|
||||
cmd = "h(%s)" % rel_path_as_string
|
||||
badge_hash = returnbuffer(pyb,cmd).splitlines()[0]
|
||||
local_hash = str(binascii.hexlify(h.digest()), "utf8")[:10]
|
||||
if badge_hash == local_hash:
|
||||
# we don't need to update those files
|
||||
return False
|
||||
|
||||
cmd = "w(%s, \"%s\")\n" % (rel_path_as_string, content)
|
||||
execbuffer(pyb,cmd)
|
||||
return True
|
||||
|
||||
def end_copy_via_repl(args):
|
||||
# do we need to do anything?
|
||||
pass
|
||||
|
||||
def clean_via_repl(args):
|
||||
raise Exception("not implemented yet")
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import os, shutil, sys, fnmatch, glob
|
||||
import os, shutil, sys, fnmatch, glob, pyboard_util
|
||||
|
||||
def sync(storage, patterns, resources, verbose, skip_wifi):
|
||||
def sync(args, patterns, resources, verbose, skip_wifi):
|
||||
root = get_root(verbose)
|
||||
|
||||
# Add all paths that are already files
|
||||
|
@ -31,6 +31,7 @@ def sync(storage, patterns, resources, verbose, skip_wifi):
|
|||
paths.add(path)
|
||||
if not found and (pattern not in paths):
|
||||
print("WARN: No resources to copy found for pattern %s" % patterns)
|
||||
pyboard_util.init_copy_via_repl(args)
|
||||
if not verbose:
|
||||
print("Copying %s files: " % len(paths), end="", flush=True)
|
||||
for path in paths:
|
||||
|
@ -39,17 +40,16 @@ def sync(storage, patterns, resources, verbose, skip_wifi):
|
|||
rel_path = os.path.relpath(path, root)
|
||||
if rel_path.startswith(".") or os.path.isdir(path) or os.path.islink(path):
|
||||
continue
|
||||
if verbose:
|
||||
print("Copying %s..." % rel_path)
|
||||
else:
|
||||
print(".", end="", flush=True)
|
||||
|
||||
target = os.path.join(storage, rel_path)
|
||||
target_dir = os.path.dirname(target)
|
||||
ensure_dir(target_dir, storage)
|
||||
if not os.path.exists(target_dir):
|
||||
os.makedirs(target_dir)
|
||||
shutil.copy2(path, target)
|
||||
updated = pyboard_util.copy_via_repl(args, path, rel_path)
|
||||
if verbose:
|
||||
print("Copied %s, updated: %s" % (rel_path, updated))
|
||||
else:
|
||||
if updated:
|
||||
print("+", end="", flush=True)
|
||||
else:
|
||||
print("=", end="", flush=True)
|
||||
pyboard_util.end_copy_via_repl(args)
|
||||
|
||||
if verbose:
|
||||
print("Files copied successfully")
|
||||
|
@ -57,46 +57,19 @@ def sync(storage, patterns, resources, verbose, skip_wifi):
|
|||
print(" DONE")
|
||||
return synced_resources
|
||||
|
||||
def clean(storage):
|
||||
def clean(args):
|
||||
print("Cleaning:", end=" ", flush=True)
|
||||
files = glob.glob(os.path.join(storage, "*"))
|
||||
for f in files:
|
||||
try:
|
||||
if os.path.isfile(f):
|
||||
os.remove(f)
|
||||
else:
|
||||
shutil.rmtree(f)
|
||||
except:
|
||||
pass
|
||||
pyboard_util.clean_via_repl(args)
|
||||
print("DONE")
|
||||
|
||||
def ensure_dir(path, storage):
|
||||
# micropython has a tendecy to turn directories into files
|
||||
if not path or path == storage:
|
||||
return
|
||||
if os.path.isfile(path):
|
||||
os.remove(path)
|
||||
ensure_dir(os.path.dirname(path), storage)
|
||||
|
||||
def set_boot_app(storage, app_to_boot):
|
||||
path = os.path.join(storage, 'once.txt')
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
with open(path, 'w') as f:
|
||||
f.write(app_to_boot + "\n")
|
||||
def set_boot_app(args, app_to_boot):
|
||||
content = app_to_boot + "\n"
|
||||
pyboard_util.write_via_repl(args, content.encode("utf8"), 'once.txt')
|
||||
if app_to_boot:
|
||||
print("setting next boot to %s" % app_to_boot)
|
||||
|
||||
def set_no_boot(storage):
|
||||
path = os.path.join(storage, 'no_boot')
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
with open(path, 'w') as f:
|
||||
f.write("\n")
|
||||
def set_no_boot(args):
|
||||
pyboard_util.write_via_repl(args, b"\n", 'no_boot')
|
||||
|
||||
def get_root(verbose=False):
|
||||
root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..'))
|
||||
|
|
|
@ -117,22 +117,22 @@ def main():
|
|||
pyboard_util.stop_badge(args, args.verbose)
|
||||
|
||||
if command == "bootstrap":
|
||||
sync.clean(get_storage(args))
|
||||
sync.sync(get_storage(args), ["bootstrap.py"], {}, args.verbose, args.skip_wifi)
|
||||
sync.clean(args)
|
||||
sync.sync(args, ["bootstrap.py"], {}, args.verbose, args.skip_wifi)
|
||||
pyboard_util.soft_reset(args)
|
||||
|
||||
if command == "sync":
|
||||
if args.clean:
|
||||
sync.clean(get_storage(args))
|
||||
sync.clean(args)
|
||||
paths = args.paths if len(args.paths) else None
|
||||
synced_resources = sync.sync(get_storage(args), paths, resources, args.verbose, args.skip_wifi)
|
||||
synced_resources = sync.sync(args, paths, resources, args.verbose, args.skip_wifi)
|
||||
|
||||
if (command in ["reset", "sync"]) or run_tests:
|
||||
sync.set_boot_app(get_storage(args), args.boot or "")
|
||||
sync.set_boot_app(args, args.boot or "")
|
||||
if args.run:
|
||||
command = "run"
|
||||
args.paths = [args.run]
|
||||
sync.set_no_boot(get_storage(args))
|
||||
sync.set_no_boot(args)
|
||||
pyboard_util.soft_reset(args)
|
||||
|
||||
|
||||
|
@ -146,24 +146,8 @@ def main():
|
|||
pyboard_util.run(args, [resource], False)
|
||||
pyboard_util.soft_reset(args, False)
|
||||
|
||||
|
||||
|
||||
pyboard_util.close_pyb()
|
||||
|
||||
def find_storage():
|
||||
# todo: find solution for windows and linux
|
||||
for pattern in ['/Volumes/TILDAMK4', '/Volumes/PYBFLASH', '/Volumes/NO NAME']:
|
||||
for path in glob.glob(pattern):
|
||||
return path
|
||||
print("Couldn't find badge storage - Please make it's plugged in and reset it if necessary")
|
||||
sys.exit(1)
|
||||
|
||||
def get_storage(args):
|
||||
if not args.storage:
|
||||
args.storage = find_storage()
|
||||
return args.storage
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
Loading…
Reference in New Issue