Skip to content

Commit

Permalink
feat: add MH-Z19 sensor module
Browse files Browse the repository at this point in the history
This adds support for NDIR CO2 sensor MH-Z19. The sensor module supports
reading CO2 ppm value and configuring the ppm range over the UART/serial
interface.
  • Loading branch information
kleest committed Apr 21, 2024
1 parent 67d455a commit f6cb025
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions mqtt_io/modules/sensor/mhz19.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
MH-Z19 NDIR CO2 sensor
"""

from mqtt_io.modules.sensor import GenericSensor
from mqtt_io.types import CerberusSchemaType, ConfigType, SensorValueType

REQUIREMENTS = ("pyserial",)
CONFIG_SCHEMA: CerberusSchemaType = {
"device": dict(type="string", required=True, empty=False),
"range": dict(type="integer", required=False, empty=False, default=5000, allowed=[2000, 5000, 10000]),
}

class Sensor(GenericSensor):
"""
Implementation of Sensor class for the MH-Z19 CO2 sensor using UART/serial.
"""

@staticmethod
def _calc_checksum(data: bytes) -> bytes:
v = sum(data[1:]) % 0x100
v = (0xff - v + 1) % 0x100
return v.to_bytes(1, "big")

@staticmethod
def _check_checksum(data: bytes) -> bool:
return Sensor._calc_checksum(data[:-1]) == data[-1:]

@staticmethod
def _add_checksum(data: bytes) -> bytes:
return data + Sensor._calc_checksum(data)

def setup_module(self) -> None:
# pylint: disable=import-error,import-outside-toplevel
import serial # type: ignore

self.ser = serial.Serial(
port=self.config["device"],
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)

# setup detection range
cmd = Sensor._add_checksum(b"\xff\x01\x99\x00\x00\x00" + self.config["range"].to_bytes(2, "big"))
self.ser.write(cmd)
# no response

def cleanup(self) -> None:
self.ser.close()

def get_value(self, sens_conf: ConfigType) -> SensorValueType:
self.ser.write(Sensor._add_checksum(b"\xff\x01\x86\x00\x00\x00\x00\x00"))
resp = self.ser.read(9)

if len(resp) == 9:
if resp[0:2] == b"\xff\x86" and Sensor._check_checksum(resp):
return int.from_bytes(resp[2:4], "big")

return None

0 comments on commit f6cb025

Please sign in to comment.