Skip to content

Commit

Permalink
run listeners callbacks in the event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
LoSk-p committed Jul 1, 2024
1 parent 0723400 commit 8dd8e34
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 19 deletions.
39 changes: 26 additions & 13 deletions custom_components/robonomics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, ServiceCall, Event, CoreState
from homeassistant.core import HomeAssistant, ServiceCall, Event, CoreState, callback
from homeassistant.const import MATCH_ALL, EVENT_HOMEASSISTANT_STARTED
from homeassistant.helpers.event import (
async_track_time_interval,
Expand Down Expand Up @@ -160,7 +160,7 @@ async def init_integration(_: Event = None) -> None:
except Exception as e:
_LOGGER.error(f"Exception in first send libp2p states {e}")
try:
hass.async_create_task(UserManager(hass).update_users(start_devices_list))
hass.create_task(UserManager(hass).update_users(start_devices_list))
_LOGGER.debug("Start track state change")
hass.data[DOMAIN][LIBP2P_UNSUB] = async_track_state_change(
hass, MATCH_ALL, hass.data[DOMAIN][HANDLE_LIBP2P_STATE_CHANGED]
Expand Down Expand Up @@ -231,6 +231,10 @@ async def init_integration(_: Event = None) -> None:

hass.data[DOMAIN][TIME_CHANGE_COUNT] = 0

@callback
def handle_time_changed_callback(event):
hass.loop.create_task(handle_time_changed(event))

async def handle_time_changed(event):
"""Callback for time' changing subscription.
It calls every timeout from config to get and send telemtry.
Expand All @@ -256,19 +260,23 @@ async def handle_time_changed(event):
except Exception as e:
_LOGGER.error(f"Exception in handle_time_changed: {e}")

hass.data[DOMAIN][HANDLE_TIME_CHANGE] = handle_time_changed
hass.data[DOMAIN][HANDLE_TIME_CHANGE] = handle_time_changed_callback

@callback
def libp2p_state_changed(changed_entity: str, old_state, new_state):
if LIBP2P not in hass.data[DOMAIN]:
return
if old_state is None or new_state is None:
return
if old_state.state == new_state.state:
return
hass.loop.create_task(add_libp2p_states_to_queue(old_state, new_state))

async def libp2p_state_changed(changed_entity: str, old_state, new_state):
async def add_libp2p_states_to_queue(old_state, new_state):
"""Callback for state changing listener.
It calls every timeout from config to get and send telemtry.
"""
if LIBP2P not in hass.data[DOMAIN]:
return
try:
if old_state is None or new_state is None:
return
if old_state.state == new_state.state:
return
msg = await get_states_libp2p(hass)
async with lock:
if len(libp2p_message_queue) == 0:
Expand All @@ -280,7 +288,11 @@ async def libp2p_state_changed(changed_entity: str, old_state, new_state):

hass.data[DOMAIN][HANDLE_LIBP2P_STATE_CHANGED] = libp2p_state_changed

async def libp2p_time_changed(event):
@callback
def libp2p_time_changed(event):
hass.loop.create_task(libp2p_send_states_from_queue())

async def libp2p_send_states_from_queue():
if len(libp2p_message_queue) > 0:
async with lock:
last_message = libp2p_message_queue[0]
Expand All @@ -289,14 +301,15 @@ async def libp2p_time_changed(event):

hass.data[DOMAIN][HANDLE_TIME_CHANGE_LIBP2P] = libp2p_time_changed

async def ipfs_daemon_state_changed(event: Event):
@callback
def ipfs_daemon_state_changed(event: Event):
old_state = event.data["old_state"]
new_state = event.data["new_state"]
_LOGGER.debug(
f"IPFS Status entity changed state from {old_state} to {new_state}"
)
if old_state.state != new_state.state:
await handle_ipfs_status_change(hass, new_state.state == "OK")
hass.loop.create_task(handle_ipfs_status_change(hass, new_state.state == "OK"))

hass.data[DOMAIN][IPFS_DAEMON_STATUS_STATE_CHANGE] = async_track_state_change_event(
hass, f"sensor.{IPFS_STATUS_ENTITY}", ipfs_daemon_state_changed
Expand Down
2 changes: 1 addition & 1 deletion custom_components/robonomics/backup_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async def create_secure_backup_hassio(
slug = resp_create["slug"]
response = await _send_command_hassio(hass, f"/backups/{slug}/download", "get")
backup = await response.read()
_LOGGER.error(f"Backup {slug} downloaded")
_LOGGER.debug(f"Backup {slug} downloaded")
encrypted_data = encrypt_message(
backup, admin_keypair, admin_keypair.public_key
)
Expand Down
2 changes: 1 addition & 1 deletion custom_components/robonomics/get_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def get_and_send_data(hass: HomeAssistant):
"""

if TWIN_ID not in hass.data[DOMAIN]:
_LOGGER.warning("Trying to send data before creating twin id")
_LOGGER.debug("Trying to send data before creating twin id")
return
_LOGGER.debug(
f"Get states request, another getting states: {hass.data[DOMAIN][GETTING_STATES]}"
Expand Down
6 changes: 3 additions & 3 deletions custom_components/robonomics/ipfs_helpers/get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ async def get_directory_to_given_path(
delete_temp_dir_if_exists(dir_with_path)
res = await self._get_ipfs_data(is_directory=True)
if res is not None:
await self._extract_archive(res, dir_with_path)
tar_content = await res.content.read()
await self.hass.async_add_executor_job(self._extract_archive, tar_content, dir_with_path)
return True
else:
return False
Expand Down Expand Up @@ -162,9 +163,8 @@ async def _get_request(
else:
return None

async def _extract_archive(self, response: ClientResponse, dir_with_path: str):
def _extract_archive(self, tar_content: bytes, dir_with_path: str):
tar_buffer = BytesIO()
tar_content = await response.content.read()
tar_buffer.write(tar_content)
tar_buffer.seek(0)
with tarfile.open(fileobj=tar_buffer, mode="r:*") as tar:
Expand Down
2 changes: 1 addition & 1 deletion custom_components/robonomics/robonomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ async def resubscribe(self) -> None:
self.subscriber.cancel()
await self.subscribe()

@callback

def callback_new_event(self, data: tp.Tuple[tp.Union[str, tp.List[str]]]) -> None:
"""Check the event and call handlers
Expand Down

0 comments on commit 8dd8e34

Please sign in to comment.