From 71e1ec63ffa0dee0dbfd89b90ebfad2996d7cab7 Mon Sep 17 00:00:00 2001 From: Marek Ventur Date: Sun, 29 Jul 2018 17:33:46 +0100 Subject: [PATCH] * app lib working now * refactored tests * added ospath lib * deleted test app * added metadata reader --- .development/pyboard_util.py | 37 ++-- .development/sync.py | 3 + .development/tilda_tools.py | 20 +- {app_library => badge_store}/main.py | 6 +- launcher/main.py | 2 +- lib/app.py | 99 ++++++++++ lib/apps.py | 179 ----------------- lib/metadata_reader.py | 138 +++++++++++++ lib/ospath.py | 63 ++++++ lib/test_app.py | 44 +++++ lib/test_database.py | 6 +- lib/test_http.py | 6 +- lib/test_metadata_reader.py | 23 +++ lib/test_ospath.py | 29 +++ lib/test_urlencode.py | 6 +- lib/{unittest.py => unittest_2.py} | 0 test/main.py | 46 ----- upip/os/__init__.py | 280 --------------------------- upip/os/path.py | 63 ------ upip/unittest.py | 59 +++--- 20 files changed, 484 insertions(+), 625 deletions(-) rename {app_library => badge_store}/main.py (90%) create mode 100644 lib/app.py delete mode 100644 lib/apps.py create mode 100644 lib/metadata_reader.py create mode 100644 lib/ospath.py create mode 100644 lib/test_app.py create mode 100644 lib/test_metadata_reader.py create mode 100644 lib/test_ospath.py rename lib/{unittest.py => unittest_2.py} (100%) delete mode 100644 test/main.py delete mode 100644 upip/os/__init__.py delete mode 100644 upip/os/path.py diff --git a/.development/pyboard_util.py b/.development/pyboard_util.py index 6c07e3b..783e062 100644 --- a/.development/pyboard_util.py +++ b/.development/pyboard_util.py @@ -44,17 +44,20 @@ def flush_input(pyb): pyb.serial.read(n) n = pyb.serial.inWaiting() -def soft_reset(args): +def soft_reset(args, verbose = True): pyb = get_pyb(args) - print("Soft reboot:", end="") + if verbose: + print("Soft reboot:", end="") write_command(pyb, b'\x04') # ctrl-D: soft reset #print("1") data = pyb.read_until(1, b'soft reboot\r\n') #print("2") if data.endswith(b'soft reboot\r\n'): - print(" DONE") + if verbose: + print(" DONE") else: - print(" FAIL") + if verbose: + print(" FAIL") raise PyboardError('could not soft reboot') def find_tty(): @@ -65,29 +68,31 @@ def find_tty(): print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary") sys.exit(1) -def check_run(args): - if args.command is not None or len(args.paths): - for filename in args.paths: - with open(filename, 'r') as f: - pyfile = f.read() - compile(pyfile + '\n', filename, 'exec') +def check_run(paths): + for filename in paths: + with open(filename, 'r') as f: + pyfile = f.read() + compile(pyfile + '\n', filename, 'exec') -def run(args): +def run(args, paths, verbose=True): pyb = get_pyb(args) - print("Preparing execution:", end="") + if verbose: + print("Preparing execution:", end="") # run any command or file(s) - this is mostly a copy from pyboard.py - if args.command is not None or len(args.paths): + if len(paths): # we must enter raw-REPL mode to execute commands # this will do a soft-reset of the board try: pyb.enter_raw_repl() except PyboardError as er: - print(" FAIL") + if verbose: + print(" FAIL") print(er) pyb.close() sys.exit(1) - print(" DONE") + if verbose: + print(" DONE") def execbuffer(buf): try: @@ -105,7 +110,7 @@ def run(args): sys.exit(1) # run any files - for filename in args.paths: + for filename in paths: with open(filename, 'rb') as f: print("-------- %s --------" % filename) pyfile = f.read() diff --git a/.development/sync.py b/.development/sync.py index 9ea6e2f..9d9dfdf 100644 --- a/.development/sync.py +++ b/.development/sync.py @@ -17,11 +17,13 @@ def sync(storage, patterns, resources, verbose): if not patterns: patterns = ["*"] + synced_resources = [] for pattern in patterns: found = False for key, resource in resources.items(): if fnmatch.fnmatch(key, pattern): found = True + synced_resources.append(key) if verbose: print("Resource %s is going to be synced" % key) for path in resource['files'].keys(): @@ -55,6 +57,7 @@ def sync(storage, patterns, resources, verbose): print("Files copied successfully") else: print(" DONE") + return synced_resources def set_boot_app(storage, app_to_boot): path = os.path.join(storage, 'once.txt') diff --git a/.development/tilda_tools.py b/.development/tilda_tools.py index 693a213..b59aba4 100755 --- a/.development/tilda_tools.py +++ b/.development/tilda_tools.py @@ -67,6 +67,7 @@ def main(): args = cmd_parser.parse_args() command = args.command[0] path = sync.get_root() + run_tests = command == "test" if command == "firmware-update": import pydfu_util # to avoid having a "usb" dependency for other calls @@ -92,11 +93,9 @@ def main(): if command == "test": command = "sync" if len(args.paths) == 0: - args.run = "test/main.py" + args.paths = ["lib/test_*"] else: - if "." not in args.paths[0]: - args.paths[0] = "lib/%s.py" % args.paths[0] - args.run = args.paths[0] + args.paths = ["lib/test_%s.py" % p for p in args.paths] if command in ["reset", "sync"]: @@ -104,7 +103,7 @@ def main(): if command == "sync": paths = args.paths if len(args.paths) else None - sync.sync(get_storage(args), paths, resources, args.verbose) + synced_resources = sync.sync(get_storage(args), paths, resources, args.verbose) if command in ["reset", "sync"]: sync.set_boot_app(get_storage(args), args.boot or "") @@ -114,8 +113,15 @@ def main(): args.paths = [args.run] if command == "run": - pyboard_util.check_run(args) - pyboard_util.run(args) + pyboard_util.check_run(args.paths) + pyboard_util.run(args, args.paths) + + if run_tests: + for resource in synced_resources: + pyboard_util.check_run([resource]) + pyboard_util.run(args, [resource], False) + pyboard_util.soft_reset(args, False) + pyboard_util.close_pyb() diff --git a/app_library/main.py b/badge_store/main.py similarity index 90% rename from app_library/main.py rename to badge_store/main.py index 78c2f20..2a287dc 100644 --- a/app_library/main.py +++ b/badge_store/main.py @@ -1,9 +1,11 @@ -"""switches between app libraries, updates and installs apps. +"""Official TiLDA MK4 Badge Store App + +switches between app libraries, updates and installs apps. To publish apps use https://badge.emfcamp.org""" ___license___ = "MIT" -___name___ = "App Library" +___title___ = "Badge Store" ___dependencies___ = ["wifi", "dialogs"] ___categories___ = ["System"] ___bootstrapped___ = True diff --git a/launcher/main.py b/launcher/main.py index 81d737b..7c59e73 100644 --- a/launcher/main.py +++ b/launcher/main.py @@ -1,7 +1,7 @@ """Launcher for apps currently installed""" ___name___ = "Launcher" -___license___ = "GPL" +___license___ = "MIT" ___categories___ = ["System"] ___launchable___ = False ___bootstrapped___ = True diff --git a/lib/app.py b/lib/app.py new file mode 100644 index 0000000..c8758e5 --- /dev/null +++ b/lib/app.py @@ -0,0 +1,99 @@ +"""Model and Helpers for local apps + +This is useful for the launcher and other apps. +""" + +___license___ = "MIT" +___dependencies___ = ["metadata_reader", "ospath"] + +from ospath import * +from metadata_reader import read_metadata + +class App: + """Models an app and provides some helper functions""" + def __init__(self, name): + self.name = name + self._attributes = None # Load lazily + + @property + def folder_path(self): + return self.name + + @property + def main_path(self): + return self.folder_path + "/main.py" + + @property + def loadable(self): + return isfile(self.main_path) + + @property + def description(self): + return self.get_attribute("doc") + + @property + def title(self): + return self.get_attribute("title", self.name) + + @property + def categories(self): + return self.get_attribute("categories") + + def matches_category(self, target): + return target in self.categories + + @property + def attributes(self): + if self._attributes == None: + try: + with open(self.main_path) as file: + self._attributes = read_metadata(file) + except OSError: + raise Exception("File %s not found in on badge" % self.main_path) + return self._attributes + + def get_attribute(self, attribute, default=None): + if attribute in self.attributes: + return self.attributes[attribute] + return default + + def __str__(self): + return self.title + + def __repr__(self): + return "" % (self.name) + + def __eq__(self, other): + if isinstance(other, App): + return self.name == other.name + return False + + def __hash__(self): + return hash(self.name) + + +_apps = None +def get_apps(category=None): + global _apps + if _apps == None: + _apps = [] + for path in os.listdir(): + if path.startswith(".") or (not isdir(path)) or path in ["lib", "shared", "upip"]: + continue + app = App(path) + if app.loadable: + _apps.append(app) + + if category: + return [app for app in _apps if app.matches_category(category)] + return _apps + +_categories = None +def get_categories(): + global _categories + if _categories == None: + _categories = set() + for app in get_apps(): + _categories.update(app.categories) + return _categories + diff --git a/lib/apps.py b/lib/apps.py deleted file mode 100644 index c6d2b73..0000000 --- a/lib/apps.py +++ /dev/null @@ -1,179 +0,0 @@ -"""Model and Helpers for TiLDA apps and the App Library API""" - -___license___ = "MIT" -___dependencies___ = ["http"] - -import os -import ure -import http_client -import filesystem -import gc - -ATTRIBUTE_MATCHER = ure.compile("^\s*###\s*([^:]*?)\s*:\s*(.*)\s*$") # Yeah, regex! -CATEGORY_ALL = "all" -CATEGORY_NOT_SET = "uncategorised" - -class App: - """Models an app and provides some helper functions""" - def __init__(self, folder_name, api_information = None): - self.folder_name = self.name = folder_name.lower() - self.user = EMF_USER - if USER_NAME_SEPARATOR in folder_name: - [self.user, self.name] = folder_name.split(USER_NAME_SEPARATOR, 1) - self.user = self.user.lower() - self.name = self.name.lower() - - self._attributes = None # Load lazily - self.api_information = api_information - - @property - def folder_path(self): - return "apps/" + self.folder_name - - @property - def main_path(self): - return self.folder_path + "/main.py" - - @property - def loadable(self): - return filesystem.is_file(self.main_path) and os.stat(self.main_path)[6] > 0 - - @property - def description(self): - """either returns a local attribute or uses api_information""" - if self.api_information and "description" in self.api_information: - return self.api_information["description"] - return self.get_attribute("description") or "" - - @property - def files(self): - """returns a list of file dicts or returns False if the information is not available""" - if self.api_information and "files" in self.api_information: - return self.api_information["files"] - return False - - @property - def category(self): - return self.get_attribute("Category", CATEGORY_NOT_SET).lower() - - @property - def title(self): - return self.get_attribute("appname") or self.name - - @property - def user_and_title(self): - if self.user == EMF_USER: - return self.name - else: - return "%s by %s" % (self.title, self.user) - - def matches_category(self, category): - """returns True if provided category matches the category of this app""" - category = category.lower() - return category == CATEGORY_ALL or category == self.category - - @property - def attributes(self): - """Returns all attribues of this app - - The result is cached for the lifetime of this object - """ - if self._attributes == None: - self._attributes = {} - if self.loadable: - with open(self.main_path) as file: - for line in file: - match = ATTRIBUTE_MATCHER.match(line) - if match: - self._attributes[match.group(1).strip().lower()] = match.group(2).strip() - else: - break - return self._attributes - - def get_attribute(self, attribute, default=None): - """Returns the value of an attribute, or a specific default value if attribute is not found""" - attribute = attribute.lower() # attributes are case insensitive - if attribute in self.attributes: - return self.attributes[attribute] - else: - return default - - def fetch_api_information(self): - """Queries the API for information about this app, returns False if app is not publicly listed""" - with http_client.get("http://api.badge.emfcamp.org/api/app/%s/%s" % (self.user, self.name)) as response: - if response.status == 404: - return False - self.api_information = response.raise_for_status().json() - return self.api_information - - def __str__(self): - return self.user_and_title - - def __repr__(self): - return "" % (self.folder_name) - - -def app_by_name_and_user(name, user): - """Returns an user object""" - if user.lower() == EMF_USER: - return App(name) - else: - return App(user + USER_NAME_SEPARATOR + name) - -def app_by_api_response(response): - if response["user"].lower() == EMF_USER: - return App(response["name"], response) - else: - return App(response["user"] + USER_NAME_SEPARATOR + response["name"], response) - -def get_local_apps(category=CATEGORY_ALL): - """Returns a list of apps that can be found in the apps folder""" - apps = [App(folder_name) for folder_name in os.listdir("apps") if filesystem.is_dir("apps/" + folder_name)] - return [app for app in apps if app.matches_category(category)] - -_public_apps_cache = None -def fetch_public_app_api_information(uncached=False): - """Returns a dict category => list of apps - - Uses cached version unless the uncached parameter is set - """ - global _public_apps_cache - if not _public_apps_cache or uncached: - response = {} - for category, apps in http_client.get("http://api.badge.emfcamp.org/api/apps").raise_for_status().json().items(): - response[category] = [app_by_api_response(app) for app in apps] - - _public_apps_cache = response - return _public_apps_cache - -def get_public_app_categories(uncached=False): - """Returns a list of all categories used on the app library""" - return list(fetch_public_app_api_information(uncached).keys()) - -def get_public_apps(category=CATEGORY_ALL, uncached=False): - """Returns a list of all public apps in one category""" - category = category.lower() - api_information = fetch_public_app_api_information(uncached) - return api_information[category] if category in api_information else [] - -_category_cache = None -def get_local_app_categories(uncached=False): - """Returns a list of all app categories the user's apps are currently using - - Uses cached version unless the uncached parameter is set - """ - global _category_cache - if not _category_cache or uncached: - _category_cache = ["all"] - for app in get_local_apps(): - if app.category not in _category_cache: - _category_cache.append(app.category) - - return _category_cache - -def empty_local_app_cache(): - """If you're tight on memory you can clean up the local cache""" - global _public_apps_cache, _category_cache - _public_apps_cache = None - _category_cache = None - gc.collect() diff --git a/lib/metadata_reader.py b/lib/metadata_reader.py new file mode 100644 index 0000000..9b18ba6 --- /dev/null +++ b/lib/metadata_reader.py @@ -0,0 +1,138 @@ +""" Consumes a stream and returns a dict + +However, the dict won't contain "__doc__", "___version___" etc, but +the shortened versions without underscores: "doc", "version". + +Currently not supported: +* Dicts +* Floating points +* ints in list +* Strings in any other format then "x" or 'y' +* Docstrings with any other delimiter other than triple-" or triple=' +* Comments + +Feel free to expand if necessary +""" + +class ParseException(Exception): + """Indicates a parsing exception""" + def __init__(self, message = ""): + super().__init__(message) + +def read_metadata(s): + result = {} + + result["doc"] = _read_docstring(s) + + while True: + key = _read_key(s) + if key: + result[key] = _read_value(s) + else: + break + + return result + +def _read_docstring(s): + delimiter = _read_non_whitespace(s, 3) + if delimiter not in ["'''", '"""']: + raise ParseException("Docstring delimiter expected") + result = _read(s, 3); + while result[-3:] != delimiter: + result += _read(s) + return result[:-3] + +def _read_value(s): + char = _read_non_whitespace(s) + return _read_value_given_first_char(s, char) + + +def _read_value_given_first_char(s, first_char): + if first_char in ["'", '"']: + return _read_string(s, first_char) + if first_char in "0123456789": + return _read_int(s, first_char) + if first_char in "TF": + return _read_bool(s, first_char) + if first_char == "[": + return _read_list(s) + raise ParseException("Invalid character %s found" % first_char) + +def _read_string(s, delimiter): + result = _read(s) + try: + while result[-1:] != delimiter: + result += _read(s) + except ParseException: + raise ParseException("Invalid string or not terminated: %s" % result) + return result[:-1] + +def _read_int(s, char): + result = char + while not char.isspace(): + char = s.read(1) + if not char: + break + result += char + if not char in "0123456789": + raise ParseException("Invalid int: %s" % result) + return int(result) + +def _read_bool(s, char): + if char == "T": + _assert(char + _read(s, 3), "True", "Invalid boolean") + return True + else: + _assert(char + _read(s, 4), "False", "Invalid boolean") + return False + +def _read_list(s): + result = [] + while True: + char = _read_non_whitespace(s) + if char == "]": + break + if result: + if char != ",": + raise ParseException("Expected comma, got '%s'" % char) + result.append(_read_value(s)) + else: + result.append(_read_value_given_first_char(s, char)) + + return result + +def _read_key(s): + delimiter = _read_non_whitespace(s, 3) + if delimiter != "___": + return None + try: + result = _read(s, 3); + while result[-3:] != delimiter: + char = _read(s) + if char in [" ", "="]: + raise ParseException() + result += char + except ParseException: + raise ParseException("Invalid key: ___%s" % result) + _assert(_read_non_whitespace(s), "=", "Expected equals") + return result[:-3] + +def _read(s, l=1): + result = s.read(l) + if len(result) 0)