Skip to content

Commit

Permalink
Device Tracker sync across gateways
Browse files Browse the repository at this point in the history
Device Tracker sync across gateways - OpenMQTTGateway & Theengs Gateway
  • Loading branch information
DigiH committed Jun 28, 2024
1 parent 5756332 commit f352339
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 27 deletions.
7 changes: 7 additions & 0 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import sys
import uuid
from pathlib import Path

from .ble_gateway import run
Expand Down Expand Up @@ -48,6 +49,12 @@ def main() -> None:
if configuration["discovery_topic"].endswith("/sensor"):
configuration["discovery_topic"] = configuration["discovery_topic"][:-7]

# Get the MAC address of the gateway.
mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:]
configuration["gateway_id"] = ":".join(
[mac_address[i : i + 2] for i in range(0, 12, 2)]
).upper()

if not configuration["host"]:
sys.exit("MQTT host is not specified")

Expand Down
92 changes: 67 additions & 25 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def on_connect(
retain=True,
)
self.subscribe(self.configuration["subscribe_topic"])
self.subscribe("theengs/internal/#")
else:
logger.error(
"Failed to connect to MQTT broker %s:%d reason code: %s",
Expand Down Expand Up @@ -226,28 +227,51 @@ def subscribe(self, sub_topic: str) -> None:
"""Subscribe to MQTT topic <sub_topic>."""

def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return
# Theengs internal
if "theengs/internal/" in msg.topic:
# Evaluate trackersync messages
if msg.topic == "theengs/internal/trackersync":
msg_json = json.loads(msg.payload)
logger.debug("trackersync message: %s", msg_json)

if (
msg_json["gatewayid"] != self.configuration["gateway_id"]
and msg_json["tracker"] in self.discovered_trackers
and self.discovered_trackers[msg_json["tracker"]].time != 0
):
self.discovered_trackers[msg_json["tracker"]].time = 0
logger.debug(
"Tracker %s disassociated by gateway %s",
msg_json["tracker"],
msg_json["gatewayid"],
)

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
logger.debug(
"[DIS] Discovered Trackers: %s", self.discovered_trackers
)
else:
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
return
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
)
return

self.decode_advertisement(msg_json)
self.decode_advertisement(msg_json)

self.client.subscribe(sub_topic)
self.client.on_message = on_message
Expand Down Expand Up @@ -369,14 +393,11 @@ def check_tracker_timeout(self) -> None:
if (
round(time()) - time_model.time >= self.configuration["tracker_timeout"]
and time_model.time != 0
and (
self.configuration["discovery"]
or self.configuration["general_presence"]
)
):
if (
time_model.model_id in ("APPLEWATCH", "APPLEDEVICE")
and not self.configuration["discovery"]
and self.configuration["general_presence"]
):
message = json.dumps(
{"id": address, "presence": "absent", "unlocked": False}
Expand All @@ -390,9 +411,12 @@ def check_tracker_timeout(self) -> None:
+ "/"
+ address.replace(":", ""),
)

time_model.time = 0
self.discovered_trackers[address] = time_model
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

logger.info("Tracker %s timed out", address)
logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers)

async def ble_scan_loop(self) -> None:
"""Scan for BLE devices."""
Expand Down Expand Up @@ -439,6 +463,10 @@ async def ble_scan_loop(self) -> None:
"Sent %s messages to MQTT",
self.published_messages,
)

# Check tracker timeouts
self.check_tracker_timeout()

await asyncio.sleep(
self.configuration["ble_time_between_scans"],
)
Expand Down Expand Up @@ -610,11 +638,25 @@ def publish_json(
+ "/"
+ get_address(data_json).replace(":", ""),
)

# Update tracker last received time
self.discovered_trackers[str(data_json["id"])] = TnM(
round(time()),
str(data_json["model_id"]),
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)
# Publish trackersync message
message = json.dumps(
{
"gatewayid": self.configuration["gateway_id"],
"tracker": data_json["id"],
}
)
self.publish(
message,
"theengs/internal/trackersync",
)

logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers)

# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in data_json:
Expand Down
2 changes: 1 addition & 1 deletion TheengsGateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"port": 1883,
"user": "",
"pass": "",
"ble_scan_time": 5,
"ble_scan_time": 7,
"ble_time_between_scans": 5,
"publish_topic": "home/TheengsGateway/BTtoMQTT",
"lwt_topic": "home/TheengsGateway/LWT",
Expand Down
11 changes: 10 additions & 1 deletion TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,17 @@ def copy_pub_device(self, device: dict) -> dict:
self.discovered_trackers[device["id"]] = TnM(
round(time()), device["model_id"]
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

# Publish trackersync message
message = json.dumps(
{"gatewayid": self.configuration["gateway_id"], "tracker": device["id"]}
)
self.publish(
message,
"theengs/internal/trackersync",
)

logger.debug(" Discovered Trackers: %s", self.discovered_trackers)
pub_device_copy = device.copy()
# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in pub_device_copy:
Expand Down

0 comments on commit f352339

Please sign in to comment.