Some fixes and added tests
parent
eb1fa19603
commit
7997abc090
19
lib/ntp.py
19
lib/ntp.py
|
@ -6,7 +6,8 @@ Derived from the 2016 implementation.
|
|||
___license___ = "MIT"
|
||||
|
||||
import database
|
||||
import socket
|
||||
import usocket
|
||||
import machine
|
||||
|
||||
# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
|
||||
NTP_DELTA = 3155673600
|
||||
|
@ -14,7 +15,6 @@ NTP_DELTA = 3155673600
|
|||
NTP_HOSTS = ["0.pool.ntp.org", "1.pool.ntp.org", "2.pool.ntp.org"]
|
||||
NTP_PORT = 123
|
||||
|
||||
|
||||
def get_NTP_time():
|
||||
for NTP_HOST in NTP_HOSTS:
|
||||
res = query_NTP_host(NTP_HOST)
|
||||
|
@ -28,15 +28,14 @@ def query_NTP_host(_NTP_HOST):
|
|||
NTP_QUERY[0] = 0x1b
|
||||
# Catch exception when run on a network without working DNS
|
||||
try:
|
||||
addr = socket.getaddrinfo(_NTP_HOST, NTP_PORT)[0][-1]
|
||||
addr = usocket.getaddrinfo(_NTP_HOST, NTP_PORT)[0][-1]
|
||||
except OSError:
|
||||
return None
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
|
||||
s.sendto(NTP_QUERY, addr)
|
||||
|
||||
# Setting timeout for receiving data. Because we're using UDP,
|
||||
# there's no need for a timeout on send.
|
||||
s.settimeout(2)
|
||||
msg = None
|
||||
try:
|
||||
msg = s.recv(48)
|
||||
|
@ -63,7 +62,6 @@ def query_NTP_host(_NTP_HOST):
|
|||
|
||||
def set_NTP_time():
|
||||
import time
|
||||
from pyb import RTC
|
||||
print("Setting time from NTP")
|
||||
|
||||
t = get_NTP_time()
|
||||
|
@ -80,10 +78,9 @@ def set_NTP_time():
|
|||
t += (tz_hours * 3600) + (tz_minutes * 60)
|
||||
|
||||
tm = time.localtime(t)
|
||||
tm = tm[0:3] + (0,) + tm[3:6] + (0,)
|
||||
tm = tm[0:3] + tm[3:6]
|
||||
|
||||
rtc = RTC()
|
||||
rtc.init()
|
||||
rtc.datetime(tm)
|
||||
rtc = machine.RTC()
|
||||
rtc.init(tm)
|
||||
|
||||
return True
|
||||
return True
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
"""Tests for ntp"""
|
||||
|
||||
___license___ = "MIT"
|
||||
___dependencies___ = ["upip:unittest", "ntp", "wifi"]
|
||||
|
||||
import unittest, wifi, ntp, machine
|
||||
|
||||
class TestWifi(unittest.TestCase):
|
||||
|
||||
def setUpClass(self):
|
||||
wifi.connect()
|
||||
|
||||
def test_get_time(self):
|
||||
t = ntp.get_NTP_time()
|
||||
self.assertTrue(t > 588699276)
|
||||
self.assertTrue(t < 1851003302) # 27 August 2028
|
||||
|
||||
def test_set_time(self):
|
||||
ntp.set_NTP_time()
|
||||
rtc = machine.RTC()
|
||||
self.assertTrue(rtc.now()[0] >= 2018)
|
||||
self.assertTrue(rtc.now()[0] <= 2028)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue