* app lib working now

* refactored tests
* added ospath lib
* deleted test app
* added metadata reader
dont-delete-test-download-branch
Marek Ventur 2018-07-29 17:33:46 +01:00
parent 034433c194
commit 71e1ec63ff
20 changed files with 484 additions and 625 deletions

View File

@ -44,17 +44,20 @@ def flush_input(pyb):
pyb.serial.read(n) pyb.serial.read(n)
n = pyb.serial.inWaiting() n = pyb.serial.inWaiting()
def soft_reset(args): def soft_reset(args, verbose = True):
pyb = get_pyb(args) pyb = get_pyb(args)
print("Soft reboot:", end="") if verbose:
print("Soft reboot:", end="")
write_command(pyb, b'\x04') # ctrl-D: soft reset write_command(pyb, b'\x04') # ctrl-D: soft reset
#print("1") #print("1")
data = pyb.read_until(1, b'soft reboot\r\n') data = pyb.read_until(1, b'soft reboot\r\n')
#print("2") #print("2")
if data.endswith(b'soft reboot\r\n'): if data.endswith(b'soft reboot\r\n'):
print(" DONE") if verbose:
print(" DONE")
else: else:
print(" FAIL") if verbose:
print(" FAIL")
raise PyboardError('could not soft reboot') raise PyboardError('could not soft reboot')
def find_tty(): def find_tty():
@ -65,29 +68,31 @@ def find_tty():
print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary") print("Couldn't find badge tty - Please make it's plugged in and reset it if necessary")
sys.exit(1) sys.exit(1)
def check_run(args): def check_run(paths):
if args.command is not None or len(args.paths): for filename in paths:
for filename in args.paths: with open(filename, 'r') as f:
with open(filename, 'r') as f: pyfile = f.read()
pyfile = f.read() compile(pyfile + '\n', filename, 'exec')
compile(pyfile + '\n', filename, 'exec')
def run(args): def run(args, paths, verbose=True):
pyb = get_pyb(args) pyb = get_pyb(args)
print("Preparing execution:", end="") if verbose:
print("Preparing execution:", end="")
# run any command or file(s) - this is mostly a copy from pyboard.py # run any command or file(s) - this is mostly a copy from pyboard.py
if args.command is not None or len(args.paths): if len(paths):
# we must enter raw-REPL mode to execute commands # we must enter raw-REPL mode to execute commands
# this will do a soft-reset of the board # this will do a soft-reset of the board
try: try:
pyb.enter_raw_repl() pyb.enter_raw_repl()
except PyboardError as er: except PyboardError as er:
print(" FAIL") if verbose:
print(" FAIL")
print(er) print(er)
pyb.close() pyb.close()
sys.exit(1) sys.exit(1)
print(" DONE") if verbose:
print(" DONE")
def execbuffer(buf): def execbuffer(buf):
try: try:
@ -105,7 +110,7 @@ def run(args):
sys.exit(1) sys.exit(1)
# run any files # run any files
for filename in args.paths: for filename in paths:
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
print("-------- %s --------" % filename) print("-------- %s --------" % filename)
pyfile = f.read() pyfile = f.read()

View File

@ -17,11 +17,13 @@ def sync(storage, patterns, resources, verbose):
if not patterns: if not patterns:
patterns = ["*"] patterns = ["*"]
synced_resources = []
for pattern in patterns: for pattern in patterns:
found = False found = False
for key, resource in resources.items(): for key, resource in resources.items():
if fnmatch.fnmatch(key, pattern): if fnmatch.fnmatch(key, pattern):
found = True found = True
synced_resources.append(key)
if verbose: if verbose:
print("Resource %s is going to be synced" % key) print("Resource %s is going to be synced" % key)
for path in resource['files'].keys(): for path in resource['files'].keys():
@ -55,6 +57,7 @@ def sync(storage, patterns, resources, verbose):
print("Files copied successfully") print("Files copied successfully")
else: else:
print(" DONE") print(" DONE")
return synced_resources
def set_boot_app(storage, app_to_boot): def set_boot_app(storage, app_to_boot):
path = os.path.join(storage, 'once.txt') path = os.path.join(storage, 'once.txt')

View File

@ -67,6 +67,7 @@ def main():
args = cmd_parser.parse_args() args = cmd_parser.parse_args()
command = args.command[0] command = args.command[0]
path = sync.get_root() path = sync.get_root()
run_tests = command == "test"
if command == "firmware-update": if command == "firmware-update":
import pydfu_util # to avoid having a "usb" dependency for other calls import pydfu_util # to avoid having a "usb" dependency for other calls
@ -92,11 +93,9 @@ def main():
if command == "test": if command == "test":
command = "sync" command = "sync"
if len(args.paths) == 0: if len(args.paths) == 0:
args.run = "test/main.py" args.paths = ["lib/test_*"]
else: else:
if "." not in args.paths[0]: args.paths = ["lib/test_%s.py" % p for p in args.paths]
args.paths[0] = "lib/%s.py" % args.paths[0]
args.run = args.paths[0]
if command in ["reset", "sync"]: if command in ["reset", "sync"]:
@ -104,7 +103,7 @@ def main():
if command == "sync": if command == "sync":
paths = args.paths if len(args.paths) else None paths = args.paths if len(args.paths) else None
sync.sync(get_storage(args), paths, resources, args.verbose) synced_resources = sync.sync(get_storage(args), paths, resources, args.verbose)
if command in ["reset", "sync"]: if command in ["reset", "sync"]:
sync.set_boot_app(get_storage(args), args.boot or "") sync.set_boot_app(get_storage(args), args.boot or "")
@ -114,8 +113,15 @@ def main():
args.paths = [args.run] args.paths = [args.run]
if command == "run": if command == "run":
pyboard_util.check_run(args) pyboard_util.check_run(args.paths)
pyboard_util.run(args) pyboard_util.run(args, args.paths)
if run_tests:
for resource in synced_resources:
pyboard_util.check_run([resource])
pyboard_util.run(args, [resource], False)
pyboard_util.soft_reset(args, False)
pyboard_util.close_pyb() pyboard_util.close_pyb()

View File

@ -1,9 +1,11 @@
"""switches between app libraries, updates and installs apps. """Official TiLDA MK4 Badge Store App
switches between app libraries, updates and installs apps.
To publish apps use https://badge.emfcamp.org""" To publish apps use https://badge.emfcamp.org"""
___license___ = "MIT" ___license___ = "MIT"
___name___ = "App Library" ___title___ = "Badge Store"
___dependencies___ = ["wifi", "dialogs"] ___dependencies___ = ["wifi", "dialogs"]
___categories___ = ["System"] ___categories___ = ["System"]
___bootstrapped___ = True ___bootstrapped___ = True

View File

@ -1,7 +1,7 @@
"""Launcher for apps currently installed""" """Launcher for apps currently installed"""
___name___ = "Launcher" ___name___ = "Launcher"
___license___ = "GPL" ___license___ = "MIT"
___categories___ = ["System"] ___categories___ = ["System"]
___launchable___ = False ___launchable___ = False
___bootstrapped___ = True ___bootstrapped___ = True

99
lib/app.py Normal file
View File

@ -0,0 +1,99 @@
"""Model and Helpers for local apps
This is useful for the launcher and other apps.
"""
___license___ = "MIT"
___dependencies___ = ["metadata_reader", "ospath"]
from ospath import *
from metadata_reader import read_metadata
class App:
"""Models an app and provides some helper functions"""
def __init__(self, name):
self.name = name
self._attributes = None # Load lazily
@property
def folder_path(self):
return self.name
@property
def main_path(self):
return self.folder_path + "/main.py"
@property
def loadable(self):
return isfile(self.main_path)
@property
def description(self):
return self.get_attribute("doc")
@property
def title(self):
return self.get_attribute("title", self.name)
@property
def categories(self):
return self.get_attribute("categories")
def matches_category(self, target):
return target in self.categories
@property
def attributes(self):
if self._attributes == None:
try:
with open(self.main_path) as file:
self._attributes = read_metadata(file)
except OSError:
raise Exception("File %s not found in on badge" % self.main_path)
return self._attributes
def get_attribute(self, attribute, default=None):
if attribute in self.attributes:
return self.attributes[attribute]
return default
def __str__(self):
return self.title
def __repr__(self):
return "<App %s>" % (self.name)
def __eq__(self, other):
if isinstance(other, App):
return self.name == other.name
return False
def __hash__(self):
return hash(self.name)
_apps = None
def get_apps(category=None):
global _apps
if _apps == None:
_apps = []
for path in os.listdir():
if path.startswith(".") or (not isdir(path)) or path in ["lib", "shared", "upip"]:
continue
app = App(path)
if app.loadable:
_apps.append(app)
if category:
return [app for app in _apps if app.matches_category(category)]
return _apps
_categories = None
def get_categories():
global _categories
if _categories == None:
_categories = set()
for app in get_apps():
_categories.update(app.categories)
return _categories

View File

@ -1,179 +0,0 @@
"""Model and Helpers for TiLDA apps and the App Library API"""
___license___ = "MIT"
___dependencies___ = ["http"]
import os
import ure
import http_client
import filesystem
import gc
ATTRIBUTE_MATCHER = ure.compile("^\s*###\s*([^:]*?)\s*:\s*(.*)\s*$") # Yeah, regex!
CATEGORY_ALL = "all"
CATEGORY_NOT_SET = "uncategorised"
class App:
"""Models an app and provides some helper functions"""
def __init__(self, folder_name, api_information = None):
self.folder_name = self.name = folder_name.lower()
self.user = EMF_USER
if USER_NAME_SEPARATOR in folder_name:
[self.user, self.name] = folder_name.split(USER_NAME_SEPARATOR, 1)
self.user = self.user.lower()
self.name = self.name.lower()
self._attributes = None # Load lazily
self.api_information = api_information
@property
def folder_path(self):
return "apps/" + self.folder_name
@property
def main_path(self):
return self.folder_path + "/main.py"
@property
def loadable(self):
return filesystem.is_file(self.main_path) and os.stat(self.main_path)[6] > 0
@property
def description(self):
"""either returns a local attribute or uses api_information"""
if self.api_information and "description" in self.api_information:
return self.api_information["description"]
return self.get_attribute("description") or ""
@property
def files(self):
"""returns a list of file dicts or returns False if the information is not available"""
if self.api_information and "files" in self.api_information:
return self.api_information["files"]
return False
@property
def category(self):
return self.get_attribute("Category", CATEGORY_NOT_SET).lower()
@property
def title(self):
return self.get_attribute("appname") or self.name
@property
def user_and_title(self):
if self.user == EMF_USER:
return self.name
else:
return "%s by %s" % (self.title, self.user)
def matches_category(self, category):
"""returns True if provided category matches the category of this app"""
category = category.lower()
return category == CATEGORY_ALL or category == self.category
@property
def attributes(self):
"""Returns all attribues of this app
The result is cached for the lifetime of this object
"""
if self._attributes == None:
self._attributes = {}
if self.loadable:
with open(self.main_path) as file:
for line in file:
match = ATTRIBUTE_MATCHER.match(line)
if match:
self._attributes[match.group(1).strip().lower()] = match.group(2).strip()
else:
break
return self._attributes
def get_attribute(self, attribute, default=None):
"""Returns the value of an attribute, or a specific default value if attribute is not found"""
attribute = attribute.lower() # attributes are case insensitive
if attribute in self.attributes:
return self.attributes[attribute]
else:
return default
def fetch_api_information(self):
"""Queries the API for information about this app, returns False if app is not publicly listed"""
with http_client.get("http://api.badge.emfcamp.org/api/app/%s/%s" % (self.user, self.name)) as response:
if response.status == 404:
return False
self.api_information = response.raise_for_status().json()
return self.api_information
def __str__(self):
return self.user_and_title
def __repr__(self):
return "<App %s>" % (self.folder_name)
def app_by_name_and_user(name, user):
"""Returns an user object"""
if user.lower() == EMF_USER:
return App(name)
else:
return App(user + USER_NAME_SEPARATOR + name)
def app_by_api_response(response):
if response["user"].lower() == EMF_USER:
return App(response["name"], response)
else:
return App(response["user"] + USER_NAME_SEPARATOR + response["name"], response)
def get_local_apps(category=CATEGORY_ALL):
"""Returns a list of apps that can be found in the apps folder"""
apps = [App(folder_name) for folder_name in os.listdir("apps") if filesystem.is_dir("apps/" + folder_name)]
return [app for app in apps if app.matches_category(category)]
_public_apps_cache = None
def fetch_public_app_api_information(uncached=False):
"""Returns a dict category => list of apps
Uses cached version unless the uncached parameter is set
"""
global _public_apps_cache
if not _public_apps_cache or uncached:
response = {}
for category, apps in http_client.get("http://api.badge.emfcamp.org/api/apps").raise_for_status().json().items():
response[category] = [app_by_api_response(app) for app in apps]
_public_apps_cache = response
return _public_apps_cache
def get_public_app_categories(uncached=False):
"""Returns a list of all categories used on the app library"""
return list(fetch_public_app_api_information(uncached).keys())
def get_public_apps(category=CATEGORY_ALL, uncached=False):
"""Returns a list of all public apps in one category"""
category = category.lower()
api_information = fetch_public_app_api_information(uncached)
return api_information[category] if category in api_information else []
_category_cache = None
def get_local_app_categories(uncached=False):
"""Returns a list of all app categories the user's apps are currently using
Uses cached version unless the uncached parameter is set
"""
global _category_cache
if not _category_cache or uncached:
_category_cache = ["all"]
for app in get_local_apps():
if app.category not in _category_cache:
_category_cache.append(app.category)
return _category_cache
def empty_local_app_cache():
"""If you're tight on memory you can clean up the local cache"""
global _public_apps_cache, _category_cache
_public_apps_cache = None
_category_cache = None
gc.collect()

138
lib/metadata_reader.py Normal file
View File

@ -0,0 +1,138 @@
""" Consumes a stream and returns a dict
However, the dict won't contain "__doc__", "___version___" etc, but
the shortened versions without underscores: "doc", "version".
Currently not supported:
* Dicts
* Floating points
* ints in list
* Strings in any other format then "x" or 'y'
* Docstrings with any other delimiter other than triple-" or triple='
* Comments
Feel free to expand if necessary
"""
class ParseException(Exception):
"""Indicates a parsing exception"""
def __init__(self, message = ""):
super().__init__(message)
def read_metadata(s):
result = {}
result["doc"] = _read_docstring(s)
while True:
key = _read_key(s)
if key:
result[key] = _read_value(s)
else:
break
return result
def _read_docstring(s):
delimiter = _read_non_whitespace(s, 3)
if delimiter not in ["'''", '"""']:
raise ParseException("Docstring delimiter expected")
result = _read(s, 3);
while result[-3:] != delimiter:
result += _read(s)
return result[:-3]
def _read_value(s):
char = _read_non_whitespace(s)
return _read_value_given_first_char(s, char)
def _read_value_given_first_char(s, first_char):
if first_char in ["'", '"']:
return _read_string(s, first_char)
if first_char in "0123456789":
return _read_int(s, first_char)
if first_char in "TF":
return _read_bool(s, first_char)
if first_char == "[":
return _read_list(s)
raise ParseException("Invalid character %s found" % first_char)
def _read_string(s, delimiter):
result = _read(s)
try:
while result[-1:] != delimiter:
result += _read(s)
except ParseException:
raise ParseException("Invalid string or not terminated: %s" % result)
return result[:-1]
def _read_int(s, char):
result = char
while not char.isspace():
char = s.read(1)
if not char:
break
result += char
if not char in "0123456789":
raise ParseException("Invalid int: %s" % result)
return int(result)
def _read_bool(s, char):
if char == "T":
_assert(char + _read(s, 3), "True", "Invalid boolean")
return True
else:
_assert(char + _read(s, 4), "False", "Invalid boolean")
return False
def _read_list(s):
result = []
while True:
char = _read_non_whitespace(s)
if char == "]":
break
if result:
if char != ",":
raise ParseException("Expected comma, got '%s'" % char)
result.append(_read_value(s))
else:
result.append(_read_value_given_first_char(s, char))
return result
def _read_key(s):
delimiter = _read_non_whitespace(s, 3)
if delimiter != "___":
return None
try:
result = _read(s, 3);
while result[-3:] != delimiter:
char = _read(s)
if char in [" ", "="]:
raise ParseException()
result += char
except ParseException:
raise ParseException("Invalid key: ___%s" % result)
_assert(_read_non_whitespace(s), "=", "Expected equals")
return result[:-3]
def _read(s, l=1):
result = s.read(l)
if len(result)<l:
raise ParseException("Expected to read at least %s characters, got '%s'" % (l, result))
return result
def _assert(input, expected, message):
if not input == expected:
raise ParseException(message + " ('%s' expected, '%s' found)" % (expected, input))
def _read_non_whitespace(s, l=1):
result = s.read(1)
while result.isspace():
result = s.read(1)
if l == 1:
return result
else:
return result + s.read(l - 1)

63
lib/ospath.py Normal file
View File

@ -0,0 +1,63 @@
""" A TiLDA optimized implementation of os.path
The one in upip requires a modified version of "os" that I don't want to include
"""
___dependencies___ = ["upip:stat"]
from stat import *
import os
sep = "/"
R_OK = const(4)
W_OK = const(2)
X_OK = const(1)
F_OK = const(0)
def join(*args):
# TODO: this is non-compliant
if type(args[0]) is bytes:
return b"/".join(args)
else:
return sep.join(args)
def split(path):
if path == "":
return ("", "")
r = path.rsplit(sep, 1)
if len(r) == 1:
return ("", path)
head = r[0] #.rstrip(sep)
if not head:
head = sep
return (head, r[1])
def dirname(path):
return split(path)[0]
def basename(path):
return split(path)[1]
def exists(path):
try:
os.stat(path)[0]
return True
except OSError:
return False
def isdir(path):
import stat
try:
mode = os.stat(path)[0]
return stat.S_ISDIR(mode)
except OSError:
return False
def isfile(path):
import stat
try:
mode = os.stat(path)[0]
return stat.S_ISREG(mode)
except OSError:
return False

44
lib/test_app.py Normal file
View File

@ -0,0 +1,44 @@
"""Tests for app lib"""
___license___ = "MIT"
___dependencies___ = ["upip:unittest", "app"]
import unittest
from app import *
class TestApp(unittest.TestCase):
def test_app_object(self):
app = App("badge_store")
self.assertEqual(app, App("badge_store"))
self.assertEqual(app.folder_path, "badge_store")
self.assertEqual(app.main_path, "badge_store/main.py")
self.assertEqual(app.loadable, True)
self.assertIn("TiLDA MK4", app.description)
self.assertEqual(app.title, "Badge Store")
self.assertTrue(app.matches_category("System"))
self.assertFalse(app.matches_category("Something"))
self.assertTrue(app.attributes["bootstrapped"], True)
self.assertTrue(app.get_attribute("bootstrapped"), True)
self.assertTrue(app.get_attribute("foobar", "default"), "default")
def test_app_object_with_non_existent_app(self):
app = App("asdfghj")
self.assertEqual(app.folder_path, "asdfghj")
self.assertEqual(app.loadable, False)
with self.assertRaises(Exception) as context:
app.title
self.assertIn("File asdfghj/main.py not found in on badge", str(context.exception))
def test_get_categories(self):
categories = get_categories()
self.assertIn("System", categories)
def test_get_apps(self):
apps = get_apps()
self.assertIn(App("badge_store"), apps)
if __name__ == '__main__':
unittest.main()

View File

@ -1,7 +1,7 @@
"""Tests for database""" """Tests for database"""
___license___ = "MIT" ___license___ = "MIT"
___dependencies___ = ["unittest", "database"] ___dependencies___ = ["upip:unittest", "database"]
import database, unittest import database, unittest
@ -50,5 +50,5 @@ class TestDatabase(unittest.TestCase):
except Exception as e: except Exception as e:
pass pass
if __name__ == "__main__": if __name__ == '__main__':
TestDatabase().run_standalone() unittest.main()

View File

@ -1,7 +1,7 @@
"""Tests for http""" """Tests for http"""
___license___ = "MIT" ___license___ = "MIT"
___dependencies___ = ["unittest", "http", "wifi"] ___dependencies___ = ["upip:unittest", "http", "wifi"]
import unittest import unittest
from http import * from http import *
@ -44,5 +44,5 @@ class TestHttp(unittest.TestCase):
self.assertEqual(content["json"], {"foo":"bar"}) self.assertEqual(content["json"], {"foo":"bar"})
if __name__ == "__main__": if __name__ == '__main__':
TestHttp().run_standalone() unittest.main()

View File

@ -0,0 +1,23 @@
"""Tests for metadata_reader"""
___license___ = "MIT"
___dependencies___ = ["upip:unittest", "metadata_reader"]
___foo___ = "bar"
___flag___ = True
___list___ = ["a", "b", "c"]
import unittest
from metadata_reader import read_metadata
class TestMetadataReader(unittest.TestCase):
def test_reader(self):
with open("lib/test_metadata_reader.py", "rt") as file:
data = read_metadata(file)
self.assertIn("Tests for", data["doc"])
self.assertEqual(data["foo"], "bar")
self.assertEqual(data["flag"], True)
self.assertEqual(data["list"], ["a", "b", "c"])
if __name__ == '__main__':
unittest.main()

29
lib/test_ospath.py Normal file
View File

@ -0,0 +1,29 @@
"""Tests for app lib"""
___license___ = "MIT"
___dependencies___ = ["upip:unittest", "ospath"]
import unittest
from ospath import *
class TestOsPath(unittest.TestCase):
# todo: write more tests!
def test_isdir(self):
self.assertTrue(isdir("lib"))
self.assertFalse(isdir("lib/ospath.py"))
self.assertFalse(isdir("foo/bar/zzz"))
def test_isfile(self):
self.assertFalse(isfile("lib"))
self.assertTrue(isfile("lib/ospath.py"))
self.assertFalse(isfile("foo/bar/zzz"))
def test_exists(self):
self.assertTrue(exists("lib"))
self.assertTrue(exists("lib/ospath.py"))
self.assertFalse(exists("foo/bar/zzz"))
if __name__ == '__main__':
unittest.main()

View File

@ -1,7 +1,7 @@
"""Tests for urlencode""" """Tests for urlencode"""
___license___ = "MIT" ___license___ = "MIT"
___dependencies___ = ["unittest", "urlencode"] ___dependencies___ = ["upip:unittest", "urlencode"]
import unittest import unittest
from urlencode import * from urlencode import *
@ -15,5 +15,5 @@ class TestUrlencode(unittest.TestCase):
) )
if __name__ == "__main__": if __name__ == '__main__':
TestUrlencode().run_standalone() unittest.main()

View File

@ -1,46 +0,0 @@
"""This app's purpose is to run a series of tests against library code
Once successful it displays and prints 'ok' on the screen.
Please make sure that all tests pass before sending a PR. You can easily
do this by running "tilda_tools test". Thank you for keeping all the
tests green! *face-throwing-a-kiss-emoji*
"""
___license___ = "MIT"
___categories___ = ["Development"]
___name___ = "Integration test app"
___dependencies___ = ["unittest", "test_database", "test_http", "test_urlencode"]
# Add all tests that need to be run here:
import test_database
import test_http
import test_urlencode
# run
import sys, unittest
count_pass = 0
count_fail = 0
count_skip = 0
log = ""
for name, module in sys.modules.items():
if not name.startswith("test"):
continue
for element_name in dir(module):
element = getattr(module, element_name)
if not isinstance(element, type):
continue
if not issubclass(element, unittest.TestCase):
continue
test_case = element()
test_case.run()
count_pass += test_case.count_pass
count_fail += test_case.count_fail
count_skip += test_case.count_skip
unittest.print_result(count_pass, count_fail, count_skip)

View File

@ -1,280 +0,0 @@
import array
import ustruct as struct
import errno as errno_
import stat as stat_
import ffilib
import uos
R_OK = const(4)
W_OK = const(2)
X_OK = const(1)
F_OK = const(0)
O_ACCMODE = 0o0000003
O_RDONLY = 0o0000000
O_WRONLY = 0o0000001
O_RDWR = 0o0000002
O_CREAT = 0o0000100
O_EXCL = 0o0000200
O_NOCTTY = 0o0000400
O_TRUNC = 0o0001000
O_APPEND = 0o0002000
O_NONBLOCK = 0o0004000
error = OSError
name = "posix"
sep = "/"
curdir = "."
pardir = ".."
environ = {"WARNING": "NOT_IMPLEMENTED"}
libc = ffilib.libc()
if libc:
chdir_ = libc.func("i", "chdir", "s")
mkdir_ = libc.func("i", "mkdir", "si")
rename_ = libc.func("i", "rename", "ss")
unlink_ = libc.func("i", "unlink", "s")
rmdir_ = libc.func("i", "rmdir", "s")
getcwd_ = libc.func("s", "getcwd", "si")
opendir_ = libc.func("P", "opendir", "s")
readdir_ = libc.func("P", "readdir", "P")
open_ = libc.func("i", "open", "sii")
read_ = libc.func("i", "read", "ipi")
write_ = libc.func("i", "write", "iPi")
close_ = libc.func("i", "close", "i")
dup_ = libc.func("i", "dup", "i")
access_ = libc.func("i", "access", "si")
fork_ = libc.func("i", "fork", "")
pipe_ = libc.func("i", "pipe", "p")
_exit_ = libc.func("v", "_exit", "i")
getpid_ = libc.func("i", "getpid", "")
waitpid_ = libc.func("i", "waitpid", "ipi")
system_ = libc.func("i", "system", "s")
execvp_ = libc.func("i", "execvp", "PP")
kill_ = libc.func("i", "kill", "ii")
getenv_ = libc.func("s", "getenv", "P")
def check_error(ret):
# Return True is error was EINTR (which usually means that OS call
# should be restarted).
if ret == -1:
e = uos.errno()
if e == errno_.EINTR:
return True
raise OSError(e)
def raise_error():
raise OSError(uos.errno())
stat = uos.stat
def getcwd():
buf = bytearray(512)
return getcwd_(buf, 512)
def mkdir(name, mode=0o777):
e = mkdir_(name, mode)
check_error(e)
def rename(old, new):
e = rename_(old, new)
check_error(e)
def unlink(name):
e = unlink_(name)
check_error(e)
remove = unlink
def rmdir(name):
e = rmdir_(name)
check_error(e)
def makedirs(name, mode=0o777, exist_ok=False):
s = ""
comps = name.split("/")
if comps[-1] == "":
comps.pop()
for i, c in enumerate(comps):
s += c + "/"
try:
uos.mkdir(s)
except OSError as e:
if e.args[0] != errno_.EEXIST:
raise
if i == len(comps) - 1:
if exist_ok:
return
raise e
if hasattr(uos, "ilistdir"):
ilistdir = uos.ilistdir
else:
def ilistdir(path="."):
dir = opendir_(path)
if not dir:
raise_error()
res = []
dirent_fmt = "LLHB256s"
while True:
dirent = readdir_(dir)
if not dirent:
break
import uctypes
dirent = uctypes.bytes_at(dirent, struct.calcsize(dirent_fmt))
dirent = struct.unpack(dirent_fmt, dirent)
dirent = (dirent[-1].split(b'\0', 1)[0], dirent[-2], dirent[0])
yield dirent
def listdir(path="."):
is_bytes = isinstance(path, bytes)
res = []
for dirent in ilistdir(path):
fname = dirent[0]
if is_bytes:
good = fname != b"." and fname == b".."
else:
good = fname != "." and fname != ".."
if good:
if not is_bytes:
fname = fsdecode(fname)
res.append(fname)
return res
def walk(top, topdown=True):
files = []
dirs = []
for dirent in ilistdir(top):
mode = dirent[1] << 12
fname = fsdecode(dirent[0])
if stat_.S_ISDIR(mode):
if fname != "." and fname != "..":
dirs.append(fname)
else:
files.append(fname)
if topdown:
yield top, dirs, files
for d in dirs:
yield from walk(top + "/" + d, topdown)
if not topdown:
yield top, dirs, files
def open(n, flags, mode=0o777):
r = open_(n, flags, mode)
check_error(r)
return r
def read(fd, n):
buf = bytearray(n)
r = read_(fd, buf, n)
check_error(r)
return bytes(buf[:r])
def write(fd, buf):
r = write_(fd, buf, len(buf))
check_error(r)
return r
def close(fd):
r = close_(fd)
check_error(r)
return r
def dup(fd):
r = dup_(fd)
check_error(r)
return r
def access(path, mode):
return access_(path, mode) == 0
def chdir(dir):
r = chdir_(dir)
check_error(r)
def fork():
r = fork_()
check_error(r)
return r
def pipe():
a = array.array('i', [0, 0])
r = pipe_(a)
check_error(r)
return a[0], a[1]
def _exit(n):
_exit_(n)
def execvp(f, args):
import uctypes
args_ = array.array("P", [0] * (len(args) + 1))
i = 0
for a in args:
args_[i] = uctypes.addressof(a)
i += 1
r = execvp_(f, uctypes.addressof(args_))
check_error(r)
def getpid():
return getpid_()
def waitpid(pid, opts):
a = array.array('i', [0])
r = waitpid_(pid, a, opts)
check_error(r)
return (r, a[0])
def kill(pid, sig):
r = kill_(pid, sig)
check_error(r)
def system(command):
r = system_(command)
check_error(r)
return r
def getenv(var, default=None):
var = getenv_(var)
if var is None:
return default
return var
def fsencode(s):
if type(s) is bytes:
return s
return bytes(s, "utf-8")
def fsdecode(s):
if type(s) is str:
return s
return str(s, "utf-8")
def urandom(n):
import builtins
with builtins.open("/dev/urandom", "rb") as f:
return f.read(n)
def popen(cmd, mode="r"):
import builtins
i, o = pipe()
if mode[0] == "w":
i, o = o, i
pid = fork()
if not pid:
if mode[0] == "r":
close(1)
else:
close(0)
close(i)
dup(o)
close(o)
s = system(cmd)
_exit(s)
else:
close(o)
return builtins.open(i, mode)

View File

@ -1,63 +0,0 @@
import os
sep = "/"
def normcase(s):
return s
def normpath(s):
return s
def abspath(s):
if s[0] != "/":
return os.getcwd() + "/" + s
return s
def join(*args):
# TODO: this is non-compliant
if type(args[0]) is bytes:
return b"/".join(args)
else:
return "/".join(args)
def split(path):
if path == "":
return ("", "")
r = path.rsplit("/", 1)
if len(r) == 1:
return ("", path)
head = r[0] #.rstrip("/")
if not head:
head = "/"
return (head, r[1])
def dirname(path):
return split(path)[0]
def basename(path):
return split(path)[1]
def exists(path):
return os.access(path, os.F_OK)
# TODO
lexists = exists
def isdir(path):
import stat
try:
mode = os.stat(path)[0]
return stat.S_ISDIR(mode)
except OSError:
return False
def expanduser(s):
if s == "~" or s.startswith("~/"):
h = os.getenv("HOME")
return h + s[1:]
if s[0] == "~":
# Sorry folks, follow conventions
return "/home/" + s[1:]
return s

View File

@ -7,8 +7,9 @@ class SkipTest(Exception):
class AssertRaisesContext: class AssertRaisesContext:
def __init__(self, exc): def __init__(self, exc, msg = None):
self.expected = exc self.expected = exc
self.expected_msg = msg
def __enter__(self): def __enter__(self):
return self return self
@ -17,6 +18,7 @@ class AssertRaisesContext:
if exc_type is None: if exc_type is None:
assert False, "%r not raised" % self.expected assert False, "%r not raised" % self.expected
if issubclass(exc_type, self.expected): if issubclass(exc_type, self.expected):
self.exception = exc_value
return True return True
return False return False
@ -176,35 +178,48 @@ class TestResult:
self.failuresNum = 0 self.failuresNum = 0
self.skippedNum = 0 self.skippedNum = 0
self.testsRun = 0 self.testsRun = 0
self.failures = []
def wasSuccessful(self): def wasSuccessful(self):
return self.errorsNum == 0 and self.failuresNum == 0 return self.errorsNum == 0 and self.failuresNum == 0
def printFailures(self):
for name, exception in self.failures:
print("-------- %s --------\n" % name)
sys.print_exception(exception)
# TODO: Uncompliant # TODO: Uncompliant
def run_class(c, test_result): def run_class(c, test_result):
o = c() o = c()
set_up = getattr(o, "setUp", lambda: None) set_up = getattr(o, "setUp", lambda: None)
tear_down = getattr(o, "tearDown", lambda: None) tear_down = getattr(o, "tearDown", lambda: None)
for name in dir(o): set_up_class = getattr(o, "setUpClass", lambda: None)
if name.startswith("test"): tear_down_class = getattr(o, "tearDownClass", lambda: None)
print("%s (%s) ..." % (name, c.__qualname__), end="") set_up_class()
m = getattr(o, name) try:
set_up() for name in dir(o):
try: if name.startswith("test"):
test_result.testsRun += 1 full_name = "%s (%s)" % (name, c.__qualname__)
m() print("%s ..." % full_name , end="")
print(" ok") m = getattr(o, name)
except SkipTest as e: set_up()
print(" skipped:", e.args[0]) try:
test_result.skippedNum += 1 test_result.testsRun += 1
except: m()
print(" FAIL") print(" ok")
test_result.failuresNum += 1 except SkipTest as e:
# Uncomment to investigate failure in detail print(" skipped:", e.args[0])
#raise test_result.skippedNum += 1
continue except Exception as e:
finally: print(" FAIL")
tear_down() test_result.failuresNum += 1
test_result.failures.append((full_name, e))
continue
finally:
tear_down()
finally:
tear_down_class()
def main(module="__main__"): def main(module="__main__"):
@ -213,12 +228,12 @@ def main(module="__main__"):
c = getattr(m, tn) c = getattr(m, tn)
if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase): if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase):
yield c yield c
m = __import__(module) m = __import__(module)
suite = TestSuite() suite = TestSuite()
for c in test_cases(m): for c in test_cases(m):
suite.addTest(c) suite.addTest(c)
runner = TestRunner() runner = TestRunner()
result = runner.run(suite) result = runner.run(suite)
result.printFailures()
# Terminate with non zero return code in case of failures # Terminate with non zero return code in case of failures
sys.exit(result.failuresNum > 0) sys.exit(result.failuresNum > 0)