commit
00151d2d9d
@ -0,0 +1,198 @@
|
||||
"""This app needs an SDS011 sensor attacthed to UART 4 """
|
||||
|
||||
___name___ = "Air Quality"
|
||||
___license___ = "MIT"
|
||||
___dependencies___ = ["sleep", "app", "ugfx_helper", "buttons", "homescreen"]
|
||||
___categories___ = ["EMF"]
|
||||
___bootstrapped___ = False # Whether or not apps get downloaded on first install. Defaults to "False", mostly likely you won't have to use this at all.
|
||||
|
||||
import app
|
||||
import ugfx, os, time, sleep
|
||||
from tilda import Buttons
|
||||
from tilda import Sensors
|
||||
from machine import Pin
|
||||
from machine import UART
|
||||
from machine import Neopix
|
||||
import random
|
||||
|
||||
class DustSensorTester(object):
|
||||
|
||||
verbose = False
|
||||
|
||||
def contains_sequence (self,data, test):
|
||||
""" Checks to see if the data sequence contains the test sewquence
|
||||
|
||||
Args:
|
||||
data: sequence of data items
|
||||
test: test sequence
|
||||
Returns:
|
||||
True if the test sequence is in the data sequence
|
||||
"""
|
||||
|
||||
if len(data)<len(test): return False
|
||||
|
||||
for pos in range(0,len(data)-len(test)+1):
|
||||
if test == data[pos:pos+len(test)]: return True
|
||||
return False
|
||||
|
||||
class DustSensor(object):
|
||||
|
||||
verbose = False
|
||||
fake_sensor = False
|
||||
|
||||
AWAITING_START = 0
|
||||
READING_BLOCK = 1
|
||||
|
||||
def pump_byte(self, b):
|
||||
""" Pump a byte into the block decode. Calls the decode method
|
||||
when the block is complete
|
||||
|
||||
Args:
|
||||
b: byte to pump
|
||||
"""
|
||||
if self.state==self.AWAITING_START:
|
||||
if self.verbose: print(self, "Awaiting start:", b)
|
||||
if b==self.start_sequence[self.start_pos]:
|
||||
# got a match - move to next byte in start sequence
|
||||
self.start_pos = self.start_pos+1
|
||||
if self.start_pos == len(self.start_sequence):
|
||||
# matched the start sequence
|
||||
self.block=self.start_sequence.copy()
|
||||
self.state=self.READING_BLOCK
|
||||
elif self.state==self.READING_BLOCK:
|
||||
if self.verbose: print("Reading block:", b)
|
||||
self.block.append(b)
|
||||
if len(self.block) == self.block_size:
|
||||
self.state=self.AWAITING_START
|
||||
self.start_pos=0
|
||||
self.process_block()
|
||||
|
||||
def __init__(self, display):
|
||||
self.display = display
|
||||
self.state=self.AWAITING_START
|
||||
self.start_pos = 0
|
||||
|
||||
class sds011_sensor(DustSensor):
|
||||
start_sequence = [0xaa,0xc0]
|
||||
block_size = 10
|
||||
|
||||
def process_block(self):
|
||||
""" Process a block of data obtained from the sensor
|
||||
calls the new_reading method on the display to
|
||||
deliver a new reading or the error method
|
||||
on the display to indicate an error
|
||||
"""
|
||||
if self.verbose: print("sds011 process block")
|
||||
if self.verbose: print([hex(x) for x in self.block])
|
||||
check_sum = 0
|
||||
for i in range(2,8):
|
||||
check_sum = check_sum + self.block[i]
|
||||
check_sum = check_sum & 0xff
|
||||
if self.verbose: print("Checksum:",hex(check_sum))
|
||||
if check_sum!=self.block[8]:
|
||||
message = "Rcv:" + hex(self.block[8]) + " Cal:" + hex(check_sum)
|
||||
self.display.error(message)
|
||||
return
|
||||
ppm10 = (self.block[4]+256*self.block[5])/10
|
||||
ppm2_5 = (self.block[2]+256*self.block[3])/10
|
||||
self.display.new_readings(ppm10,ppm2_5)
|
||||
|
||||
class Air_Quality_Display():
|
||||
|
||||
def setup_screen(self):
|
||||
""" Set up the screen and the labels that display
|
||||
values on it.
|
||||
"""
|
||||
ugfx.init()
|
||||
width=ugfx.width()
|
||||
height=ugfx.height()
|
||||
ugfx.clear(ugfx.html_color(0x800080))
|
||||
style = ugfx.Style()
|
||||
style.set_enabled([ugfx.WHITE, ugfx.html_color(0x800080), ugfx.html_color(0x800080), ugfx.html_color(0x800080)])
|
||||
style.set_background(ugfx.html_color(0x800080))
|
||||
ugfx.set_default_style(style)
|
||||
ugfx.orientation(90)
|
||||
ugfx.set_default_font(ugfx.FONT_TITLE)
|
||||
ugfx.Label(0, 0, width, 60,"Air Quality", justification=ugfx.Label.CENTER)
|
||||
label_height=45
|
||||
self.ppm10_label = ugfx.Label(0, label_height, width, label_height,"PPM 10: starting", justification=ugfx.Label.CENTER)
|
||||
self.ppm25_label = ugfx.Label(0, label_height*2, width, label_height,"PPM 2.5: starting", justification=ugfx.Label.CENTER)
|
||||
|
||||
self.temp_label = ugfx.Label(0, label_height*3, width, label_height,"Temp: starting", justification=ugfx.Label.CENTER)
|
||||
self.humid_label = ugfx.Label(0, label_height*4, width, label_height,"Humid: starting", justification=ugfx.Label.CENTER)
|
||||
self.error_label = ugfx.Label(0, label_height*5, width, label_height,"", justification=ugfx.Label.CENTER)
|
||||
self.message_label = ugfx.Label(0, label_height*6, width, label_height,"", justification=ugfx.Label.CENTER)
|
||||
self.error_count = 0
|
||||
self.error_message = ""
|
||||
self.neopix = Neopix()
|
||||
self.p10_decode = ((100,0x00ff00),(250,0xffff00),(350,0xff8000),(430,0xff0000),(-1,0xcc6600))
|
||||
self.p25_decode = ((60, 0x00ff00),(91, 0xffff00), (121,0xff8000),(251,0xff0000),(-1,0xcc6600))
|
||||
|
||||
def get_reading_color(self, value, decode):
|
||||
for item in decode:
|
||||
if item[0] < 0:
|
||||
# reached the upper limit - return
|
||||
return item[1]
|
||||
if value < item[0]:
|
||||
return item[1]
|
||||
|
||||
def new_readings(self,ppm10_value, ppm25_value):
|
||||
""" Called by the sensor to deliver new values to the screen.
|
||||
Will also trigger the reading of the temperature and humidity
|
||||
values.
|
||||
"""
|
||||
self.ppm10_label.text("PPM 10: "+str(ppm10_value))
|
||||
self.ppm25_label.text("PPM 2.5: "+str(ppm25_value))
|
||||
temp = Sensors.get_hdc_temperature()
|
||||
temp_string = "Temp: {0:2.1f}".format(temp)
|
||||
self.temp_label.text(temp_string)
|
||||
humid = Sensors.get_hdc_humidity()
|
||||
humid_string = "Humidity: {0:2.1f}".format(humid)
|
||||
self.humid_label.text(humid_string)
|
||||
# Calculate some colours
|
||||
self.neopix.display((self.get_reading_color(ppm25_value, self.p25_decode),self.get_reading_color(ppm10_value, self.p10_decode)))
|
||||
|
||||
def error(self, error_message):
|
||||
""" Called by the sensor to deliver an error message.
|
||||
Args:
|
||||
error_message: error message string
|
||||
"""
|
||||
self.error_count = self.error_count + 1
|
||||
self.error_label.text( "Errors: " +str(self.error_count))
|
||||
self.message_label.text(str(error_message))
|
||||
|
||||
display = Air_Quality_Display()
|
||||
display.setup_screen()
|
||||
|
||||
sensor_port = UART(2,9600, bits=8, mode=UART.BINARY, parity=None, stop=1)
|
||||
|
||||
sensor = sds011_sensor(display)
|
||||
|
||||
def test_sensor():
|
||||
""" Can be called to pump some test sequences into the sensor
|
||||
"""
|
||||
|
||||
|
||||
test_sequences = [ ['0xaa', '0xc0', '0xf', '0x0', '0x22', '0x0', '0xe1', '0xdb', '0xed', '0xab'],
|
||||
['0xaa', '0xc0', '0x13', '0x0', '0x3e', '0x0', '0xe1', '0xdb', '0xb', '0xab'], # bad checksum
|
||||
['0xaa', '0xc0', '0x13', '0x0', '0x3e', '0x0', '0xe1', '0xdb', '0xa', '0xab'],
|
||||
['0xaa', '0xc0', '0x13', '0x0', '0x3e', '0x0', '0xe1', '0xdb', '0xd', '0xab'] ]
|
||||
|
||||
for test_sequence in test_sequences:
|
||||
for ch in test_sequence:
|
||||
sensor.pump_byte(int(ch))
|
||||
|
||||
# test_sensor()
|
||||
|
||||
buffer = bytearray([0])
|
||||
|
||||
while (not Buttons.is_pressed(Buttons.BTN_A)) and (not Buttons.is_pressed(Buttons.BTN_B)) and (not Buttons.is_pressed(Buttons.BTN_Menu)):
|
||||
while sensor_port.any() > 0:
|
||||
sensor_port.readinto(buffer,1)
|
||||
sensor.pump_byte(buffer[0])
|
||||
sleep.wfi()
|
||||
|
||||
ugfx.clear()
|
||||
|
||||
app.restart_to_default()
|
||||
|
Loading…
Reference in New Issue