Badge store lib
parent
1510dbb4ac
commit
b4da6f49f0
|
@ -30,10 +30,10 @@ class BadgeStore:
|
||||||
installers = []
|
installers = []
|
||||||
url = "%s/download" % (self.url)
|
url = "%s/download" % (self.url)
|
||||||
for path, hash in files.items():
|
for path, hash in files.items():
|
||||||
if self._is_file_up_to_date(path, hash):
|
if hash == get_hash(path):
|
||||||
continue
|
continue
|
||||||
params = {"repo": self.repo, "ref": self.ref, "path": path}
|
params = {"repo": self.repo, "ref": self.ref, "path": path}
|
||||||
installers.append(Installer(path, url, params))
|
installers.append(Installer(path, url, params, hash))
|
||||||
return installers
|
return installers
|
||||||
|
|
||||||
def _call(self, command, params = {}):
|
def _call(self, command, params = {}):
|
||||||
|
@ -43,27 +43,44 @@ class BadgeStore:
|
||||||
return response.json() # todo: error handling
|
return response.json() # todo: error handling
|
||||||
|
|
||||||
def _is_file_up_to_date(self, path, hash):
|
def _is_file_up_to_date(self, path, hash):
|
||||||
if not isfile(path):
|
return hash == _get_hash(path)
|
||||||
return False
|
|
||||||
|
|
||||||
with open(path, "rb") as file:
|
|
||||||
sha256 = hashlib.sha256()
|
TEMP_FILE = ".tmp.download"
|
||||||
buf = file.read(128)
|
|
||||||
while len(buf) > 0:
|
|
||||||
sha256.update(buf)
|
|
||||||
buf = file.read(128)
|
|
||||||
current = str(binascii.hexlify(sha256.digest()), "utf8")[:10]
|
|
||||||
return current == hash
|
|
||||||
|
|
||||||
class Installer:
|
class Installer:
|
||||||
def __init__(self, path, url, params):
|
def __init__(self, path, url, params, hash):
|
||||||
self.path = path
|
self.path = path
|
||||||
self.url = url
|
self.url = url
|
||||||
self.params = params
|
self.params = params
|
||||||
|
self.hash = hash
|
||||||
|
|
||||||
def download(self):
|
def download(self):
|
||||||
with get(self.url, params=self.params).raise_for_status() as response:
|
count = 0
|
||||||
response.download(path)
|
while get_hash(TEMP_FILE) != self.hash:
|
||||||
|
count += 1
|
||||||
|
if count > 5:
|
||||||
|
os.remove(TEMP_FILE)
|
||||||
|
raise OSError("Aborting download of %s after 5 unsuccessful attempts" % self.path)
|
||||||
|
try:
|
||||||
|
get(self.url, params=self.params).raise_for_status().download_to(TEMP_FILE)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
os.remove(self.path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
os.rename(TEMP_FILE, self.path)
|
||||||
|
|
||||||
|
def get_hash(path):
|
||||||
|
if not isfile(path):
|
||||||
|
return None
|
||||||
|
|
||||||
|
with open(path, "rb") as file:
|
||||||
|
sha256 = hashlib.sha256()
|
||||||
|
buf = file.read(128)
|
||||||
|
while len(buf) > 0:
|
||||||
|
sha256.update(buf)
|
||||||
|
buf = file.read(128)
|
||||||
|
return str(binascii.hexlify(sha256.digest()), "utf8")[:10]
|
||||||
|
|
||||||
|
|
|
@ -3,13 +3,18 @@
|
||||||
___license___ = "MIT"
|
___license___ = "MIT"
|
||||||
___dependencies___ = ["upip:unittest", "badge_store", "shared/test/file.txt"]
|
___dependencies___ = ["upip:unittest", "badge_store", "shared/test/file.txt"]
|
||||||
|
|
||||||
import unittest
|
import unittest, os
|
||||||
from lib.badge_store import *
|
from lib.badge_store import *
|
||||||
|
from ospath import *
|
||||||
|
|
||||||
class TestBadgeStore(unittest.TestCase):
|
class TestBadgeStore(unittest.TestCase):
|
||||||
|
|
||||||
def setUpClass(self):
|
def setUpClass(self):
|
||||||
self.store = BadgeStore(url="http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="ee144e8")
|
self.store = BadgeStore(url="http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="ee144e8")
|
||||||
|
self.download_file = "shared/test/download.txt"
|
||||||
|
|
||||||
|
def tearDownClass(self):
|
||||||
|
self._remove_download_file()
|
||||||
|
|
||||||
def test_apps(self):
|
def test_apps(self):
|
||||||
response = self.store.get_apps()
|
response = self.store.get_apps()
|
||||||
|
@ -23,14 +28,24 @@ class TestBadgeStore(unittest.TestCase):
|
||||||
response = self.store.get_app("launcher")
|
response = self.store.get_app("launcher")
|
||||||
self.assertEqual(response["description"], "Launcher for apps currently installed")
|
self.assertEqual(response["description"], "Launcher for apps currently installed")
|
||||||
|
|
||||||
def test_is_file_up_to_date(self):
|
def test_get_hash(self):
|
||||||
self.assertFalse(self.store._is_file_up_to_date("shared/test/file.txt", "1234567890"))
|
self.assertEqual(get_hash("shared/test/file.txt"), "182d04f8ee")
|
||||||
self.assertFalse(self.store._is_file_up_to_date("does/not/exist.txt", "1234567890"))
|
self.assertEqual(get_hash("does/not/exist.txt"), None)
|
||||||
self.assertTrue(self.store._is_file_up_to_date("shared/test/file.txt", "182d04f8ee"))
|
|
||||||
|
def test_install_integration(self):
|
||||||
|
self._remove_download_file()
|
||||||
|
store = BadgeStore(url="http://badge.marekventur.com", repo="emfcamp/Mk4-Apps", ref="dont-delete-test-download-branch")
|
||||||
|
for installer in store.install(["launcher"]):
|
||||||
|
if installer.path == "shared/test/download.txt":
|
||||||
|
installer.download()
|
||||||
|
|
||||||
|
with open(self.download_file, "rt") as response:
|
||||||
|
self.assertIn("I'm a download test", response.read())
|
||||||
|
|
||||||
|
def _remove_download_file(self):
|
||||||
|
if isdir(self.download_file) or isfile(self.download_file):
|
||||||
|
os.remove(self.download_file)
|
||||||
|
|
||||||
def test_install(self):
|
|
||||||
installers = self.store.install(["launcher", "badge_store"])
|
|
||||||
self.assertTrue(len(installers) > 0)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue