Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
LoSk-p committed Jun 3, 2024
2 parents 2bde388 + 99c7c45 commit 0b1aa65
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 36 deletions.
18 changes: 9 additions & 9 deletions custom_components/robonomics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, ServiceCall, Event, callback
from homeassistant.core import HomeAssistant, ServiceCall, Event, CoreState
from homeassistant.const import MATCH_ALL, EVENT_HOMEASSISTANT_STARTED
from homeassistant.helpers.event import async_track_time_interval, async_track_state_change_event, async_track_state_change
from homeassistant.helpers.typing import ConfigType
Expand Down Expand Up @@ -122,14 +122,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
lock = asyncio.Lock()
libp2p_message_queue = []
hass.data.setdefault(DOMAIN, {})
async def init_integration(_: Event) -> None:
async def init_integration(_: Event = None) -> None:
"""Compare rws devices with users from Home Assistant
:param hass: HomeAssistant instance
"""
_LOGGER.debug(f"hass state: {hass.state}")
start_devices_list = await hass.data[DOMAIN][ROBONOMICS].get_devices_list()
_LOGGER.debug(f"Start devices list is {start_devices_list}")
# await asyncio.sleep(20)
if DOMAIN not in hass.data:
return
try:
Expand All @@ -148,7 +148,7 @@ async def init_integration(_: Event) -> None:

await get_and_send_data(hass)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, init_integration)
_LOGGER.debug(f"Robonomics user control starting set up")
_LOGGER.debug("Robonomics user control starting set up")
conf = entry.data
if CONF_IPFS_GATEWAY in conf:
hass.data[DOMAIN][CONF_IPFS_GATEWAY] = conf[CONF_IPFS_GATEWAY]
Expand All @@ -167,7 +167,7 @@ async def init_integration(_: Event) -> None:
hass.data[DOMAIN][CONTROLLER_ADDRESS] = sub_admin_acc.get_address()
_LOGGER.debug(f"Controller: {sub_admin_acc.get_address()}")
_LOGGER.debug(f"Owner: {hass.data[DOMAIN][CONF_SUB_OWNER_ADDRESS]}")
hass.data[DOMAIN][ROBONOMICS]: Robonomics = Robonomics(
hass.data[DOMAIN][ROBONOMICS] = Robonomics(
hass,
hass.data[DOMAIN][CONF_SUB_OWNER_ADDRESS],
hass.data[DOMAIN][CONF_ADMIN_SEED],
Expand Down Expand Up @@ -256,7 +256,6 @@ async def libp2p_time_changed(event):

hass.data[DOMAIN][HANDLE_TIME_CHANGE_LIBP2P] = libp2p_time_changed

@callback
async def ipfs_daemon_state_changed(event: Event):
old_state = event.data["old_state"]
new_state = event.data["new_state"]
Expand Down Expand Up @@ -328,10 +327,11 @@ async def handle_save_video(call: ServiceCall) -> None:
if TWIN_ID not in hass.data[DOMAIN]:
await get_or_create_twin_id(hass)

asyncio.ensure_future(hass.data[DOMAIN][ROBONOMICS].pin_dapp_to_local_node())
# asyncio.ensure_future(init_integration(hass))
# asyncio.ensure_future(hass.data[DOMAIN][ROBONOMICS].pin_dapp_to_local_node())
if hass.state == CoreState.running:
asyncio.ensure_future(init_integration())

_LOGGER.debug(f"Robonomics user control successfuly set up")
_LOGGER.debug(f"Robonomics user control successfuly set up, hass state: {hass.state}")
return True


Expand Down
3 changes: 1 addition & 2 deletions custom_components/robonomics/get_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ async def get_and_send_data(hass: HomeAssistant):
_LOGGER.debug("Another states are sending. Wait...")
hass.data[DOMAIN][GETTING_STATES_QUEUE] += 1
on_queue = hass.data[DOMAIN][GETTING_STATES_QUEUE]
counter = 0
while hass.data[DOMAIN][GETTING_STATES]:
await asyncio.sleep(5)
if on_queue > 3:
Expand All @@ -93,7 +92,7 @@ async def get_and_send_data(hass: HomeAssistant):
await _get_dashboard_and_services(hass)
data = await _get_states(hass)
data = json.dumps(data)
_LOGGER.debug(f"Got states to send datalog")
_LOGGER.debug("Got states to send datalog")
devices_list_with_admin = hass.data[DOMAIN][ROBONOMICS].devices_list.copy()
devices_list_with_admin.append(sender_acc.get_address())
encrypted_data = encrypt_for_devices(str(data), sender_kp, devices_list_with_admin)
Expand Down
6 changes: 5 additions & 1 deletion custom_components/robonomics/libp2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ async def _run_command(self, received_data: tp.Union[str, dict]) -> None:
except Exception as e:
decrypted_data = self.hass.data[DOMAIN][ROBONOMICS].decrypt_message(received_data)
data = json.loads(decrypted_data)
_LOGGER.debug(f"Got command from libp2p: {data}")
else:
data = received_data
if "sender" in data:
decrypted_data = self.hass.data[DOMAIN][ROBONOMICS].decrypt_message(data["data"], data["sender"])
data = json.loads(decrypted_data)
message_entity_id = data["params"]["entity_id"]
params = data["params"].copy()
del params["entity_id"]
Expand Down
2 changes: 1 addition & 1 deletion custom_components/robonomics/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
"iot_class": "cloud_push",
"issue_tracker": "https://github.com/airalab/homeassistant-robonomics-integration/issues",
"requirements": ["pycryptodome==3.15.0", "wheel", "IPFS-Toolkit==0.4.0", "robonomics-interface==1.6.2", "pinatapy-vourhey==0.1.9", "aenum==3.1.11", "ipfs-api==0.2.3", "crust-interface-patara==0.1.1", "tenacity==8.2.2", "py-ws-libp2p-proxy==0.1.4"],
"version": "1.8.5"
"version": "1.8.5-beta"
}
78 changes: 55 additions & 23 deletions custom_components/robonomics/robonomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
DAPP_HASH_DATALOG_ADDRESS,
)
from .get_states import get_and_send_data
from .ipfs import get_ipfs_data, get_last_file_hash, read_ipfs_local_file, pin_file_to_local_node_by_hash
from .ipfs import (
get_ipfs_data,
get_last_file_hash,
read_ipfs_local_file,
pin_file_to_local_node_by_hash,
)
from .manage_users import UserManager
from .utils import (
create_notification,
Expand All @@ -68,8 +73,10 @@ async def wrapper(obj, *args, **kwargs):
except TimeoutError:
obj._change_current_wss()
raise TimeoutError

return wrapper


async def get_or_create_twin_id(hass: HomeAssistant) -> None:
"""Try to get current twin id from local storage, datalogs or twin list in blockchain.
If no existing twin id, create new one.
Expand All @@ -88,9 +95,7 @@ async def get_or_create_twin_id(hass: HomeAssistant) -> None:
].get_last_telemetry_hash()
if last_telemetry_hash is not None:
hass.data[DOMAIN][HANDLE_IPFS_REQUEST] = True
res = await get_ipfs_data(
hass, last_telemetry_hash, number_of_requests=1
)
res = await get_ipfs_data(hass, last_telemetry_hash, number_of_requests=1)
if res is not None:
try:
_LOGGER.debug("Start getting info about telemetry")
Expand Down Expand Up @@ -192,14 +197,15 @@ def _run_launch_command(
del params["entity_id"]
if params == {}:
params = None
hass.async_create_task(
asyncio.run_coroutine_threadsafe(
hass.services.async_call(
domain=message["platform"],
service=message["name"],
service_data=params,
target={"entity_id": message_entity_id},
)
)
),
hass.loop,
).result()
except Exception as e:
_LOGGER.error(f"Exception in sending command: {e}")

Expand Down Expand Up @@ -254,7 +260,9 @@ def __init__(
except Exception as e:
_LOGGER.error(f"Exception in enum: {e}")

def encrypt_for_devices(self, data: tp.Union[str, dict], devices: tp.List[str] = None) -> str:
def encrypt_for_devices(
self, data: tp.Union[str, dict], devices: tp.List[str] = None
) -> str:
if devices is None:
devices = self.devices_list.copy()
if self.controller_address not in devices:
Expand All @@ -267,7 +275,9 @@ def decrypt_message_for_devices(self, data: str, sender_address: str = None) ->
sender_public_key = Keypair(
ss58_address=sender_address, crypto_type=KeypairType.ED25519
).public_key
return decrypt_message_devices(data, sender_public_key, self.controller_account.keypair)
return decrypt_message_devices(
data, sender_public_key, self.controller_account.keypair
)

def encrypt_message(self, data: str, recepient_address: str = None) -> str:
if recepient_address is None:
Expand Down Expand Up @@ -331,7 +341,9 @@ async def _handle_launch(self, data: tp.Tuple[str]) -> None:
json_result = json.loads(self.decrypt_message(result, data[0]))
if "password" in json_result:
_LOGGER.debug(f"Got registration command with password from {data[0]}")
await UserManager(self.hass).create_user(data[0], json_result["password"])
await UserManager(self.hass).create_user(
data[0], json_result["password"]
)
elif "platform" in json_result:
_LOGGER.debug(f"Got call service command {json_result}")
_run_launch_command(self.hass, result, data[0])
Expand Down Expand Up @@ -474,7 +486,9 @@ async def set_backup_topic(self, ipfs_hash: str, twin_number: int) -> None:
"""

try:
await self.set_twin_topic_with_remove_old(ipfs_hash, twin_number, self.sub_owner_address)
await self.set_twin_topic_with_remove_old(
ipfs_hash, twin_number, self.sub_owner_address
)
except Exception as e:
_LOGGER.error(f"Exception in set backup topic {e}")

Expand All @@ -486,7 +500,9 @@ async def set_config_topic(self, ipfs_hash: str, twin_number: int) -> None:
"""

try:
await self.set_twin_topic_with_remove_old(ipfs_hash, twin_number, self.controller_address)
await self.set_twin_topic_with_remove_old(
ipfs_hash, twin_number, self.controller_address
)
except Exception as e:
_LOGGER.error(f"Exception in set config topic {e}")

Expand Down Expand Up @@ -515,19 +531,22 @@ async def remove_twin_topic_for_address(self, twin_number: int, address: str):
return
await self._set_twin_topic(bytes_hash, twin_number, address)

async def set_twin_topic_with_remove_old(self, ipfs_hash: str, twin_number: int, address: str):
async def set_twin_topic_with_remove_old(
self, ipfs_hash: str, twin_number: int, address: str
):
bytes_hash = ipfs_qm_hash_to_32_bytes(ipfs_hash)
info = await self._get_twin_info(twin_number)
if info is not None:
for topic in info:
if topic[0] == bytes_hash:
if topic[1] == address:
_LOGGER.debug(f"Topic for address {address} with this ipfs hash exists")
_LOGGER.debug(
f"Topic for address {address} with this ipfs hash exists"
)
return
if topic[1] == address:
await self._set_twin_topic(topic[0], twin_number, ZERO_ACC)
await self._set_twin_topic(bytes_hash, twin_number, address)


@to_thread
def _get_twin_info(self, twin_number: int):
Expand Down Expand Up @@ -580,7 +599,9 @@ def _find_dapp_hash(self) -> tp.Optional[str]:
try:
datalog = Datalog(Account())
last_datalog_item = datalog.get_index(DAPP_HASH_DATALOG_ADDRESS)["end"]
last_datalog = datalog.get_item(DAPP_HASH_DATALOG_ADDRESS, last_datalog_item - 1)
last_datalog = datalog.get_item(
DAPP_HASH_DATALOG_ADDRESS, last_datalog_item - 1
)
if last_datalog is not None:
ipfs_hash = last_datalog[1]
return ipfs_hash
Expand Down Expand Up @@ -712,10 +733,13 @@ def callback_new_event(self, data: tp.Tuple[tp.Union[str, tp.List[str]]]) -> Non

try:
# _LOGGER.debug(f"Data from subscription callback: {data}")
if isinstance(data[1], str) and data[1] == self.controller_address: ## Launch
if (
isinstance(data[1], str) and data[1] == self.controller_address
): ## Launch
if data[0] in self.devices_list or data[0] == self.controller_address:
asyncio.run_coroutine_threadsafe(self._handle_launch(data), self.hass.loop).result()
# self.hass.async_create_task(self._handle_launch(data))
asyncio.run_coroutine_threadsafe(
self._handle_launch(data), self.hass.loop
).result()
else:
_LOGGER.debug(f"Got launch from not linked device: {data[0]}")
elif isinstance(data[1], int) and len(data) == 4:
Expand All @@ -724,21 +748,29 @@ def callback_new_event(self, data: tp.Tuple[tp.Union[str, tp.List[str]]]) -> Non
data[1] == self.hass.data[DOMAIN][TWIN_ID]
and data[3] == self.sub_owner_address
): ## Change backup topic in Digital Twin
asyncio.run_coroutine_threadsafe(_handle_backup_change(self.hass), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
_handle_backup_change(self.hass), self.hass.loop
).result()
elif (
isinstance(data[1], int) and data[0] in self.devices_list
): ## Datalog to change password
asyncio.run_coroutine_threadsafe(UserManager(self.hass).create_or_update_user(data), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
UserManager(self.hass).create_or_update_user(data), self.hass.loop
).result()
elif (
isinstance(data[1], int) and data[0] == DAPP_HASH_DATALOG_ADDRESS
): ## Change Dapp hash
ipfs_hash = data[2]
asyncio.run_coroutine_threadsafe(pin_file_to_local_node_by_hash(self.hass, ipfs_hash), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
pin_file_to_local_node_by_hash(self.hass, ipfs_hash), self.hass.loop
).result()
elif (
isinstance(data[1], list) and data[0] == self.sub_owner_address
): ## New Device in subscription
self._update_devices_list(data[1])
asyncio.run_coroutine_threadsafe(UserManager(self.hass).update_users(data[1]), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
UserManager(self.hass).update_users(data[1]), self.hass.loop
).result()
except Exception as e:
_LOGGER.warning(f"Exception in subscription callback: {e}")

Expand Down

0 comments on commit 0b1aa65

Please sign in to comment.