From 1510dbb4ac27b67ef487ab672001a5fb1b7fd8ca Mon Sep 17 00:00:00 2001 From: Marek Ventur Date: Wed, 1 Aug 2018 21:42:33 +0100 Subject: [PATCH] wip - badge store lib --- lib/badge_store.py | 69 +++++++++++++++ lib/test_badge_store.py | 36 ++++++++ lib/unittest_2.py | 192 ---------------------------------------- shared/test/file.txt | 1 + 4 files changed, 106 insertions(+), 192 deletions(-) create mode 100644 lib/badge_store.py create mode 100644 lib/test_badge_store.py delete mode 100644 lib/unittest_2.py create mode 100644 shared/test/file.txt diff --git a/lib/badge_store.py b/lib/badge_store.py new file mode 100644 index 0000000..8c04fbb --- /dev/null +++ b/lib/badge_store.py @@ -0,0 +1,69 @@ +"""Library to interact with the badge store""" + +___license___ = "MIT" +___dependencies___ = ["http", "ospath"] + +from ospath import * +from http import * +import hashlib, binascii + +class BadgeStore: + def __init__(self, url = "http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="master"): + self.url = url + self.repo = repo + self.ref = ref + self._apps = None + + def get_apps(self): + if not self._apps: + self._apps = self._call("apps") + return self._apps + + def get_categories(self): + return self.get_apps().keys() + + def get_app(self, app): + return self._call("app", {"app": app}) + + def install(self, apps): + files = self._call("install", {"apps": ",".join(apps)}) + installers = [] + url = "%s/download" % (self.url) + for path, hash in files.items(): + if self._is_file_up_to_date(path, hash): + continue + params = {"repo": self.repo, "ref": self.ref, "path": path} + installers.append(Installer(path, url, params)) + return installers + + def _call(self, command, params = {}): + params["repo"] = self.repo + params["ref"] = self.ref + with get("%s/%s" % (self.url, command), params=params).raise_for_status() as response: + return response.json() # todo: error handling + + def _is_file_up_to_date(self, path, hash): + if not isfile(path): + return False + + with open(path, "rb") as file: + sha256 = hashlib.sha256() + buf = file.read(128) + while len(buf) > 0: + sha256.update(buf) + buf = file.read(128) + current = str(binascii.hexlify(sha256.digest()), "utf8")[:10] + return current == hash + +class Installer: + def __init__(self, path, url, params): + self.path = path + self.url = url + self.params = params + + def download(self): + with get(self.url, params=self.params).raise_for_status() as response: + response.download(path) + + + diff --git a/lib/test_badge_store.py b/lib/test_badge_store.py new file mode 100644 index 0000000..9245f27 --- /dev/null +++ b/lib/test_badge_store.py @@ -0,0 +1,36 @@ +"""Tests for badge_store lib""" + +___license___ = "MIT" +___dependencies___ = ["upip:unittest", "badge_store", "shared/test/file.txt"] + +import unittest +from lib.badge_store import * + +class TestBadgeStore(unittest.TestCase): + + def setUpClass(self): + self.store = BadgeStore(url="http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="ee144e8") + + def test_apps(self): + response = self.store.get_apps() + self.assertEqual(response["System"], ['badge_store', 'launcher', 'settings']) + + def test_categories(self): + categories = self.store.get_categories() + self.assertEqual(set(categories), set(["System", "homescreen"])) + + def test_app(self): + response = self.store.get_app("launcher") + self.assertEqual(response["description"], "Launcher for apps currently installed") + + def test_is_file_up_to_date(self): + self.assertFalse(self.store._is_file_up_to_date("shared/test/file.txt", "1234567890")) + self.assertFalse(self.store._is_file_up_to_date("does/not/exist.txt", "1234567890")) + self.assertTrue(self.store._is_file_up_to_date("shared/test/file.txt", "182d04f8ee")) + + def test_install(self): + installers = self.store.install(["launcher", "badge_store"]) + self.assertTrue(len(installers) > 0) + +if __name__ == '__main__': + unittest.main() diff --git a/lib/unittest_2.py b/lib/unittest_2.py deleted file mode 100644 index 0d857bb..0000000 --- a/lib/unittest_2.py +++ /dev/null @@ -1,192 +0,0 @@ -"""Base libarary for test cases - -See https://github.com/python/cpython/blob/master/Lib/unittest/case.py for -some of the code copied here -""" - -___license___ = "MIT" - -import sys, ugfx - -class SkipTest(Exception): - """Indicates a test has been skipped""" - -class FailTest(Exception): - """Inidcates a failing test""" - def __init__(self, message=None): - self.message = message - -class TestCase(object): - def run(self): - test_class = type(self).__name__ - self.setUpClass() - self.count_pass = 0 - self.count_fail = 0 - self.count_skip = 0 - for method in dir(self): - if not method.startswith("test"): - continue - try: - self.setUp() - getattr(self, method)() - self.tearDown() - print("%s.%s [PASS]" % (test_class, method)) - self.count_pass += 1 - except SkipTest as e: - print("%s.%s [SKIP]" % (test_class, method)) - self.count_skip += 1 - except FailTest as e: - print("%s.%s [FAIL]" % (test_class, method)) - print(e.message + "\n") - self.count_fail += 1 - except Exception as e: - print("%s.%s [FATAL]" % (test_class, method)) - sys.print_exception(e) - print("") - self.count_fail += 1 - self.tearDownClass() - return self.count_fail == 0 - - def run_standalone(self): - ugfx.clear(0xFFFFFF) - self.run() - print_result(self.count_pass, self.count_fail, self.count_skip) - - def runSingle(self, function): - print(self) - - def setUp(self): - pass - - def tearDown(self): - pass - - def setUpClass(self): - pass - - def tearDownClass(self): - pass - - def assertEqual(self, actual, expected): - if not actual == expected: - raise FailTest("Expected %s to equal %s" % (actual, expected)) - - def assertTrue(self, actual): - self.assertEqual(actual, True) - - def assertFalse(self, actual): - self.assertEqual(actual, False) - - def assertRaises(self, expected_exception, *args, **kwargs): - context = _AssertRaisesContext(expected_exception, self) - return context.handle('assertRaises', args, kwargs) - - def assertIn(self, sub, actual): - if not sub in actual: - raise FailTest("Expected %s to be in %s" % (sub, actual)) - - def skip(self): - raise SkipTest() - -def print_result(count_pass, count_fail, count_skip): - print("PASS: %s FAIL: %s SKIP: %s" % (count_pass, count_fail, count_skip)) - -########################################### -#### Bits copied straight from cpython #### -########################################### - -class _BaseTestCaseContext: - - def __init__(self, test_case): - self.test_case = test_case - - def _raiseFailure(self, standardMsg): - msg = self.test_case._formatMessage(self.msg, standardMsg) - raise self.test_case.failureException(msg) - -class _AssertRaisesBaseContext(_BaseTestCaseContext): - - def __init__(self, expected, test_case, expected_regex=None): - _BaseTestCaseContext.__init__(self, test_case) - self.expected = expected - self.test_case = test_case - if expected_regex is not None: - expected_regex = re.compile(expected_regex) - self.expected_regex = expected_regex - self.obj_name = None - self.msg = None - - def handle(self, name, args, kwargs): - """ - If args is empty, assertRaises/Warns is being used as a - context manager, so check for a 'msg' kwarg and return self. - If args is not empty, call a callable passing positional and keyword - arguments. - """ - try: - if not _is_subtype(self.expected, self._base_type): - raise TypeError('%s() arg 1 must be %s' % - (name, self._base_type_str)) - if args and args[0] is None: - warnings.warn("callable is None", - DeprecationWarning, 3) - args = () - if not args: - self.msg = kwargs.pop('msg', None) - if kwargs: - warnings.warn('%r is an invalid keyword argument for ' - 'this function' % next(iter(kwargs)), - DeprecationWarning, 3) - return self - - callable_obj, *args = args - try: - self.obj_name = callable_obj.__name__ - except AttributeError: - self.obj_name = str(callable_obj) - with self: - callable_obj(*args, **kwargs) - finally: - # bpo-23890: manually break a reference cycle - self = None - - - -class _AssertRaisesContext(_AssertRaisesBaseContext): - """A context manager used to implement TestCase.assertRaises* methods.""" - - _base_type = BaseException - _base_type_str = 'an exception type or tuple of exception types' - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, tb): - if exc_type is None: - try: - exc_name = self.expected.__name__ - except AttributeError: - exc_name = str(self.expected) - if self.obj_name: - self._raiseFailure("{} not raised by {}".format(exc_name, - self.obj_name)) - else: - self._raiseFailure("{} not raised".format(exc_name)) - if not issubclass(exc_type, self.expected): - # let unexpected exceptions pass through - return False - # store exception - self.exception = exc_value - if self.expected_regex is None: - return True - - expected_regex = self.expected_regex - if not expected_regex.search(str(exc_value)): - self._raiseFailure('"{}" does not match "{}"'.format( - expected_regex.pattern, str(exc_value))) - return True - -def _is_subtype(expected, basetype): - if isinstance(expected, tuple): - return all(_is_subtype(e, basetype) for e in expected) - return isinstance(expected, type) and issubclass(expected, basetype) diff --git a/shared/test/file.txt b/shared/test/file.txt new file mode 100644 index 0000000..80a11b6 --- /dev/null +++ b/shared/test/file.txt @@ -0,0 +1 @@ +Random content for testing file hash