From c35ebe5609963e794a3a0eb542a1a199b2416ed0 Mon Sep 17 00:00:00 2001 From: LoSk-p Date: Thu, 27 Jun 2024 15:13:18 +0300 Subject: [PATCH] add encryption for sr25519 and change all crypto types to sr25519 --- custom_components/robonomics/__init__.py | 3 +- custom_components/robonomics/config_flow.py | 6 +- custom_components/robonomics/const.py | 3 + .../encryption_utils/sr25519/sr25519_const.py | 10 +++ .../sr25519/sr25519_decrypt.py | 59 ++++++++++++++++ .../sr25519/sr25519_encrypt.py | 70 +++++++++++++++++++ .../encryption_utils/sr25519/sr25519_utils.py | 66 +++++++++++++++++ custom_components/robonomics/get_states.py | 7 +- custom_components/robonomics/ipfs.py | 3 +- custom_components/robonomics/manifest.json | 4 +- custom_components/robonomics/robonomics.py | 21 +++--- custom_components/robonomics/services.py | 3 +- custom_components/robonomics/utils.py | 25 ++++--- 13 files changed, 253 insertions(+), 27 deletions(-) create mode 100644 custom_components/robonomics/encryption_utils/sr25519/sr25519_const.py create mode 100644 custom_components/robonomics/encryption_utils/sr25519/sr25519_decrypt.py create mode 100644 custom_components/robonomics/encryption_utils/sr25519/sr25519_encrypt.py create mode 100644 custom_components/robonomics/encryption_utils/sr25519/sr25519_utils.py diff --git a/custom_components/robonomics/__init__.py b/custom_components/robonomics/__init__.py index 2ca6f02..5934d64 100644 --- a/custom_components/robonomics/__init__.py +++ b/custom_components/robonomics/__init__.py @@ -65,6 +65,7 @@ HANDLE_TIME_CHANGE_LIBP2P, TIME_CHANGE_LIBP2P_UNSUB, CONTROLLER_ADDRESS, + CRYPTO_TYPE, ) from .get_states import get_and_send_data, get_states_libp2p from .ipfs import ( @@ -189,7 +190,7 @@ async def init_integration(_: Event = None) -> None: hass.data[DOMAIN][WAIT_IPFS_DAEMON] = False sub_admin_acc = Account( - hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=KeypairType.ED25519 + hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=CRYPTO_TYPE ) hass.data[DOMAIN][CONTROLLER_ADDRESS] = sub_admin_acc.get_address() _LOGGER.debug(f"Controller: {sub_admin_acc.get_address()}") diff --git a/custom_components/robonomics/config_flow.py b/custom_components/robonomics/config_flow.py index 93b1e23..b68baf3 100644 --- a/custom_components/robonomics/config_flow.py +++ b/custom_components/robonomics/config_flow.py @@ -4,6 +4,7 @@ from __future__ import annotations +import asyncio import logging from typing import Any, Optional @@ -31,6 +32,7 @@ CONF_WARN_ACCOUNT_MANAGMENT, CONF_WARN_DATA_SENDING, DOMAIN, + CRYPTO_TYPE, ) from .exceptions import ( CantConnectToIPFS, @@ -88,6 +90,7 @@ async def _has_sub_owner_subscription(hass: HomeAssistant, sub_owner_address: st rws = RWS(Account()) res = await hass.async_add_executor_job(rws.get_ledger, sub_owner_address) + # res = asyncio.run_coroutine_threadsafe(rws.get_ledger(sub_owner_address), hass).result() if res is None: return False else: @@ -103,8 +106,9 @@ async def _is_sub_admin_in_subscription(hass: HomeAssistant, sub_admin_seed: str :return: True if controller account is in subscription devices, false otherwise """ - rws = RWS(Account(sub_admin_seed, crypto_type=KeypairType.ED25519)) + rws = RWS(Account(sub_admin_seed, crypto_type=CRYPTO_TYPE)) res = await hass.async_add_executor_job(rws.is_in_sub, sub_owner_address) + # res = rws.is_in_sub(sub_owner_address) return res diff --git a/custom_components/robonomics/const.py b/custom_components/robonomics/const.py index c5a3684..320f05f 100644 --- a/custom_components/robonomics/const.py +++ b/custom_components/robonomics/const.py @@ -1,6 +1,7 @@ """Constants for the Robonomics Control integration.""" from homeassistant.const import Platform +from substrateinterface import KeypairType DOMAIN = "robonomics" PLATFORMS = [Platform.BUTTON, Platform.SENSOR] @@ -135,3 +136,5 @@ DAPP_HASH_DATALOG_ADDRESS = "4G7qwXRFqUt1V1VxQejue2k9kV7CzcKCnmDDUeQ8Ed52BcSU" IPFS_DAPP_FILE_NAME = "robonomics_dapp" LIBP2P_MULTIADDRESS = "libp2p_multiaddress" + +CRYPTO_TYPE = KeypairType.SR25519 \ No newline at end of file diff --git a/custom_components/robonomics/encryption_utils/sr25519/sr25519_const.py b/custom_components/robonomics/encryption_utils/sr25519/sr25519_const.py new file mode 100644 index 0000000..1e1c218 --- /dev/null +++ b/custom_components/robonomics/encryption_utils/sr25519/sr25519_const.py @@ -0,0 +1,10 @@ +# Consts for encryption +ENCRYPTION_KEY_SIZE = 32 +PUBLIC_KEY_SIZE = 32 +MAC_KEY_SIZE = 32 +MAC_VALUE_SIZE = 32 +DERIVATION_KEY_SALT_SIZE = 32 +DERIVATION_KEY_ROUNDS = 2048 # Number of iterations +DERIVATION_KEY_SIZE = 64 # Desired length of the derived key +PBKDF2_HASH_ALGORITHM = 'sha512' # Hashing algorithm +NONCE_SIZE = 24 \ No newline at end of file diff --git a/custom_components/robonomics/encryption_utils/sr25519/sr25519_decrypt.py b/custom_components/robonomics/encryption_utils/sr25519/sr25519_decrypt.py new file mode 100644 index 0000000..c4595d5 --- /dev/null +++ b/custom_components/robonomics/encryption_utils/sr25519/sr25519_decrypt.py @@ -0,0 +1,59 @@ +from .sr25519_const import NONCE_SIZE, DERIVATION_KEY_SALT_SIZE, PUBLIC_KEY_SIZE, MAC_VALUE_SIZE +from .sr25519_utils import get_sr25519_agreement, bytes_concat, derive_key, generate_mac_data +from nacl.secret import SecretBox +import typing as tp + + +def sr25519_decrypt(encrypted_message: bytes, receiver_private_key: bytes) -> tp.Tuple: + # Split encrypted_message to parts + message_public_key, salt, mac_value, nonce, sealed_message = _decapsulate_encrypted_message(encrypted_message) + + # Repeat encryption_key and mac_key generation based on encrypted_message + agreement_key = get_sr25519_agreement(secret_key=receiver_private_key, + public_key=message_public_key) + master_secret = bytes_concat(message_public_key, agreement_key) + encryption_key, mac_key = derive_key(master_secret, salt) + + # Get decrypted MAC value + decrypted_mac_value = generate_mac_data( + nonce=nonce, + encrypted_message=sealed_message, + message_public_key=message_public_key, + mac_key=mac_key + ) + + # Check if MAC values the same + assert mac_value == decrypted_mac_value, "MAC values do not match" + + # Decrypt the message + decrypted_message = _nacl_decrypt(sealed_message, nonce, encryption_key) + return decrypted_message, message_public_key + + +def _decapsulate_encrypted_message(encrypted_message: bytes): + assert len(encrypted_message) > NONCE_SIZE + DERIVATION_KEY_SALT_SIZE + PUBLIC_KEY_SIZE + MAC_VALUE_SIZE, \ + "Wrong encrypted message length" + + message_public_key = encrypted_message[ + NONCE_SIZE + DERIVATION_KEY_SALT_SIZE: NONCE_SIZE + DERIVATION_KEY_SALT_SIZE + PUBLIC_KEY_SIZE] + + salt = encrypted_message[NONCE_SIZE: NONCE_SIZE + DERIVATION_KEY_SALT_SIZE] + mac_value = encrypted_message[ + NONCE_SIZE + DERIVATION_KEY_SALT_SIZE + PUBLIC_KEY_SIZE: NONCE_SIZE + DERIVATION_KEY_SALT_SIZE + + PUBLIC_KEY_SIZE + MAC_VALUE_SIZE] + + nonce = encrypted_message[:NONCE_SIZE] + sealed_message = encrypted_message[NONCE_SIZE + DERIVATION_KEY_SALT_SIZE + PUBLIC_KEY_SIZE + MAC_VALUE_SIZE:] + + return message_public_key, salt, mac_value, nonce, sealed_message + + +def _nacl_decrypt(sealed_message: bytes, nonce: bytes, encryption_key: bytes): + # Create a nacl SecretBox using the encryption key + box = SecretBox(encryption_key) + try: + # Decrypt the message + decrypted_message = box.decrypt(sealed_message, nonce) + return decrypted_message + except Exception as e: + raise ValueError("Invalid secret or pubkey provided") from e \ No newline at end of file diff --git a/custom_components/robonomics/encryption_utils/sr25519/sr25519_encrypt.py b/custom_components/robonomics/encryption_utils/sr25519/sr25519_encrypt.py new file mode 100644 index 0000000..dab715f --- /dev/null +++ b/custom_components/robonomics/encryption_utils/sr25519/sr25519_encrypt.py @@ -0,0 +1,70 @@ +import os +import typing as tp +from substrateinterface import KeypairType, Keypair +from nacl.secret import SecretBox + +from .sr25519_const import NONCE_SIZE, DERIVATION_KEY_SALT_SIZE +from .sr25519_utils import get_sr25519_agreement, derive_key, bytes_concat, generate_mac_data + + +def sr25519_encrypt(message: str | bytes, + receiver_public_key: bytes, + sender_keypair: tp.Optional[Keypair] = None + ) -> bytes: + + # 1. Ephemeral key generation if no sender keypair is provided + if sender_keypair is not None: + message_keypair = sender_keypair + else: + message_keypair = Keypair.create_from_mnemonic( + mnemonic=Keypair.generate_mnemonic(), + crypto_type=KeypairType.SR25519 + ) + + # 2. Key agreement + agreement_key = get_sr25519_agreement(secret_key=message_keypair.private_key, + public_key=receiver_public_key) + + # 2.5 Master secret and cryptographic random salt with KEY_DERIVATION_SALT_SIZE bytes + master_secret = bytes_concat(message_keypair.public_key, agreement_key) + salt = os.urandom(DERIVATION_KEY_SALT_SIZE) + + # 3. Key derivation + encryption_key, mac_key = derive_key(master_secret, salt) + + # 4 Encryption + nonce = os.urandom(NONCE_SIZE) + encrypted_message = _nacl_encrypt(message, encryption_key, nonce) + + # 5 MAC Generation + mac_value = generate_mac_data( + nonce=nonce, + encrypted_message=encrypted_message, + message_public_key=message_keypair.public_key, + mac_key=mac_key + ) + + return bytes_concat(nonce, salt, message_keypair.public_key, mac_value, encrypted_message) + +def _nacl_encrypt(message: str | bytes, encryption_key: bytes, nonce: bytes) -> bytes: + # Ensure the encryption key is 32 bytes + if len(encryption_key) != 32: + raise ValueError("Encryption key must be 32 bytes long.") + + # Create a nacl SecretBox using the encryption key + box = SecretBox(encryption_key) + + try: + # Encrypt the message + encrypted_message = box.encrypt(_message_to_bytes(message), nonce) + return encrypted_message.ciphertext + except Exception as e: + raise ValueError("Invalid secret or pubkey provided") from e + +def _message_to_bytes(value): + if isinstance(value, (bytes, bytearray)): + return value + elif isinstance(value, str): + return value.encode('utf-8') + else: + raise TypeError("Unsupported message type for encryption") \ No newline at end of file diff --git a/custom_components/robonomics/encryption_utils/sr25519/sr25519_utils.py b/custom_components/robonomics/encryption_utils/sr25519/sr25519_utils.py new file mode 100644 index 0000000..74b73b7 --- /dev/null +++ b/custom_components/robonomics/encryption_utils/sr25519/sr25519_utils.py @@ -0,0 +1,66 @@ +import hmac +import hashlib +import rbcl + +from .sr25519_const import PBKDF2_HASH_ALGORITHM, DERIVATION_KEY_ROUNDS, DERIVATION_KEY_SIZE, MAC_KEY_SIZE, ENCRYPTION_KEY_SIZE + +def get_sr25519_agreement(secret_key: bytes, public_key: bytes) -> bytes: + try: + # Get canonical part of secret key + canonical_secret_key = secret_key[:32] + + # Perform elliptic curve point multiplication + # Since secret and public key are already in sr25519, that can be used as scalar and Ristretto point + shared_secret = rbcl.crypto_scalarmult_ristretto255(s=canonical_secret_key, p=public_key) + + return shared_secret + except Exception as e: + raise ValueError("Invalid secret or pubkey provided") from e + + +def derive_key(master_secret: bytes, salt: bytes) -> tuple: + # Derive a 64-byte key using PBKDF2 + password = hashlib.pbkdf2_hmac( + PBKDF2_HASH_ALGORITHM, + master_secret, + salt, + DERIVATION_KEY_ROUNDS, # Number of iterations + dklen=DERIVATION_KEY_SIZE # Desired length of the derived key + ) + + assert len(password) >= MAC_KEY_SIZE + ENCRYPTION_KEY_SIZE, "Wrong derived key length" + + # Split the derived password into encryption key and MAC key + mac_key = password[:MAC_KEY_SIZE] + encryption_key = password[MAC_KEY_SIZE: MAC_KEY_SIZE + ENCRYPTION_KEY_SIZE] + + return encryption_key, mac_key + + + +def generate_mac_data(nonce: bytes, encrypted_message: bytes, message_public_key: bytes, mac_key: bytes) -> bytes: + if len(mac_key) != 32: + raise ValueError("MAC key must be 32 bytes long.") + + # Concatenate nonce, message public key, and encrypted message + data_to_mac = bytes_concat(nonce, message_public_key, encrypted_message) + + # Generate HMAC-SHA256 + mac_data = hmac.new( + key=mac_key, + msg=data_to_mac, + digestmod=hashlib.sha256).digest() + return mac_data + + +def bytes_concat(*arrays) -> bytes: + """ + Concatenate multiple byte arrays into a single byte array. + + Args: + *arrays: Variable length argument list of byte arrays to concatenate. + + Returns: + bytes: A single concatenated byte array. + """ + return b''.join(arrays) diff --git a/custom_components/robonomics/get_states.py b/custom_components/robonomics/get_states.py index df4dbf8..94a8e7b 100644 --- a/custom_components/robonomics/get_states.py +++ b/custom_components/robonomics/get_states.py @@ -48,6 +48,7 @@ GETTING_STATES_QUEUE, PEER_ID_LOCAL, LIBP2P_MULTIADDRESS, + CRYPTO_TYPE, ) from .utils import ( encrypt_for_devices, @@ -103,7 +104,7 @@ async def get_and_send_data(hass: HomeAssistant): try: sender_acc = Account( - seed=hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=KeypairType.ED25519 + seed=hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=CRYPTO_TYPE ) sender_kp = sender_acc.keypair except Exception as e: @@ -132,7 +133,7 @@ async def get_states_libp2p(hass: HomeAssistant) -> str: states_json = await _get_states(hass, False) states_string = json.dumps(states_json) sender_acc = Account( - seed=hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=KeypairType.ED25519 + seed=hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=CRYPTO_TYPE ) sender_kp = sender_acc.keypair devices_list_with_admin = hass.data[DOMAIN][ROBONOMICS].devices_list.copy() @@ -267,7 +268,7 @@ async def _get_dashboard_and_services(hass: HomeAssistant) -> None: await hass.async_add_executor_job(write_file_data, config_filename, json.dumps(new_config)) sender_acc = Account( seed=hass.data[DOMAIN][CONF_ADMIN_SEED], - crypto_type=KeypairType.ED25519, + crypto_type=CRYPTO_TYPE, ) sender_kp = sender_acc.keypair devices_list_with_admin = hass.data[DOMAIN][ diff --git a/custom_components/robonomics/ipfs.py b/custom_components/robonomics/ipfs.py index 3b15505..3af5d19 100644 --- a/custom_components/robonomics/ipfs.py +++ b/custom_components/robonomics/ipfs.py @@ -48,6 +48,7 @@ WAIT_IPFS_DAEMON, IPFS_USERS_PATH, IPFS_DAPP_FILE_NAME, + CRYPTO_TYPE, ) from .utils import ( get_hash, @@ -815,7 +816,7 @@ def _upload_to_crust( """ seed: str = hass.data[DOMAIN][CONF_ADMIN_SEED] - mainnet = Mainnet(seed=seed, crypto_type=KeypairType.ED25519) + mainnet = Mainnet(seed=seed, crypto_type=CRYPTO_TYPE) try: # Check balance balance = mainnet.get_balance() diff --git a/custom_components/robonomics/manifest.json b/custom_components/robonomics/manifest.json index 77c7ffe..9a4bdd6 100644 --- a/custom_components/robonomics/manifest.json +++ b/custom_components/robonomics/manifest.json @@ -8,6 +8,6 @@ "documentation": "https://wiki.robonomics.network/en/", "iot_class": "cloud_push", "issue_tracker": "https://github.com/airalab/homeassistant-robonomics-integration/issues", - "requirements": ["pycryptodome==3.15.0", "wheel", "IPFS-Toolkit==0.4.0", "robonomics-interface==1.6.2", "pinatapy-vourhey==0.1.9", "aenum==3.1.11", "ipfs-api==0.2.3", "crust-interface-patara==0.1.1", "tenacity==8.2.2", "py-ws-libp2p-proxy==0.1.4"], - "version": "1.8.6" + "requirements": ["wheel", "PyNaCl>=1.5.0", "rbcl>=1.0.1","pycryptodome==3.15.0", "IPFS-Toolkit==0.4.0", "robonomics-interface==1.6.2", "pinatapy-vourhey==0.1.9", "aenum==3.1.11", "ipfs-api==0.2.3", "crust-interface-patara==0.1.1", "tenacity==8.2.2", "py-ws-libp2p-proxy==0.1.4"], + "version": "1.8.5" } diff --git a/custom_components/robonomics/robonomics.py b/custom_components/robonomics/robonomics.py index 747349e..c35aa84 100644 --- a/custom_components/robonomics/robonomics.py +++ b/custom_components/robonomics/robonomics.py @@ -30,7 +30,6 @@ DOMAIN, HANDLE_IPFS_REQUEST, IPFS_CONFIG_PATH, - MAX_NUMBER_OF_REQUESTS, MEDIA_ACC, ROBONOMICS, ROBONOMICS_WSS, @@ -40,6 +39,8 @@ ZERO_ACC, LAUNCH_REGISTRATION_COMMAND, DAPP_HASH_DATALOG_ADDRESS, + CRYPTO_TYPE, + CRYPTO_TYPE, ) from .get_states import get_and_send_data from .ipfs import ( @@ -101,7 +102,7 @@ async def get_or_create_twin_id(hass: HomeAssistant) -> None: _LOGGER.debug("Start getting info about telemetry") sub_admin_kp = Account( hass.data[DOMAIN][CONF_ADMIN_SEED], - crypto_type=KeypairType.ED25519, + crypto_type=CRYPTO_TYPE, ).keypair decrypted = decrypt_message_devices( res, sub_admin_kp.public_key, sub_admin_kp @@ -174,10 +175,10 @@ async def _run_launch_command( message = literal_eval(encrypted_command) else: kp_sender = Keypair( - ss58_address=sender_address, crypto_type=KeypairType.ED25519 + ss58_address=sender_address, crypto_type=CRYPTO_TYPE ) sub_admin_kp = Keypair.create_from_mnemonic( - hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=KeypairType.ED25519 + hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=CRYPTO_TYPE ) try: decrypted = decrypt_message( @@ -237,7 +238,7 @@ def __init__( self.controller_seed: str = controller_seed self.controller_account: Account = Account( seed=self.controller_seed, - crypto_type=KeypairType.ED25519, + crypto_type=CRYPTO_TYPE, remote_ws=self.current_wss, ) self.controller_address: str = self.controller_account.get_address() @@ -270,7 +271,7 @@ def decrypt_message_for_devices(self, data: str, sender_address: str = None) -> if sender_address is None: sender_address = self.controller_address sender_public_key = Keypair( - ss58_address=sender_address, crypto_type=KeypairType.ED25519 + ss58_address=sender_address, crypto_type=CRYPTO_TYPE ).public_key return decrypt_message_devices( data, sender_public_key, self.controller_account.keypair @@ -280,7 +281,7 @@ def encrypt_message(self, data: str, recepient_address: str = None) -> str: if recepient_address is None: recepient_address = self.controller_address recepient_public_key = Keypair( - ss58_address=recepient_address, crypto_type=KeypairType.ED25519 + ss58_address=recepient_address, crypto_type=CRYPTO_TYPE ).public_key return encrypt_message( data, self.controller_account.keypair, recepient_public_key @@ -290,7 +291,7 @@ def decrypt_message(self, encrypted_data: str, sender_address: str = None) -> st if sender_address is None: sender_address = self.controller_address sender_public_key = Keypair( - ss58_address=sender_address, crypto_type=KeypairType.ED25519 + ss58_address=sender_address, crypto_type=CRYPTO_TYPE ).public_key return decrypt_message( encrypted_data, sender_public_key, self.controller_account.keypair @@ -308,7 +309,7 @@ def _change_current_wss(self) -> None: _LOGGER.debug(f"New Robonomics ws is {self.current_wss}") self.controller_account: Account = Account( seed=self.controller_seed, - crypto_type=KeypairType.ED25519, + crypto_type=CRYPTO_TYPE, remote_ws=self.current_wss, ) @@ -787,7 +788,7 @@ def send_datalog(self, data: str, seed: str, subscription: bool) -> str: ): with attempt: try: - account = Account(seed=seed, crypto_type=KeypairType.ED25519) + account = Account(seed=seed, crypto_type=CRYPTO_TYPE) _LOGGER.debug(f"Start creating rws datalog") datalog = Datalog(account, rws_sub_owner=self.sub_owner_address) receipt = datalog.record(data) diff --git a/custom_components/robonomics/services.py b/custom_components/robonomics/services.py index 4f3b7f7..91a10fa 100644 --- a/custom_components/robonomics/services.py +++ b/custom_components/robonomics/services.py @@ -32,6 +32,7 @@ IPFS_MEDIA_PATH, ROBONOMICS, TWIN_ID, + CRYPTO_TYPE, ) from .ipfs import add_backup_to_ipfs, add_media_to_ipfs, get_folder_hash, get_ipfs_data from .utils import delete_temp_file, encrypt_message, read_file_data, write_file_data @@ -152,7 +153,7 @@ async def restore_from_backup_service_call( backup_path = f"{tempfile.gettempdir()}/{DATA_BACKUP_ENCRYPTED_NAME}" await hass.async_add_executor_job(write_file_data, backup_path, result) sub_admin_kp = Keypair.create_from_mnemonic( - hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=KeypairType.ED25519 + hass.data[DOMAIN][CONF_ADMIN_SEED], crypto_type=CRYPTO_TYPE ) if is_hassio(hass): await restore_backup_hassio(hass, Path(backup_path), sub_admin_kp) diff --git a/custom_components/robonomics/utils.py b/custom_components/robonomics/utils.py index 8e21c6c..dd831e8 100644 --- a/custom_components/robonomics/utils.py +++ b/custom_components/robonomics/utils.py @@ -20,7 +20,9 @@ from homeassistant.helpers.json import JSONEncoder from homeassistant.helpers.storage import Store -from .const import DOMAIN +from .const import DOMAIN, CRYPTO_TYPE +from .encryption_utils.sr25519.sr25519_encrypt import sr25519_encrypt +from .encryption_utils.sr25519.sr25519_decrypt import sr25519_decrypt _LOGGER = logging.getLogger(__name__) VERSION_STORAGE = 6 @@ -54,8 +56,10 @@ def encrypt_message( :return: encrypted message """ - - encrypted = sender_keypair.encrypt_message(message, recipient_public_key) + if CRYPTO_TYPE == KeypairType.ED25519: + encrypted = sender_keypair.encrypt_message(message, recipient_public_key) + else: + encrypted = sr25519_encrypt(message, recipient_public_key) return f"0x{encrypted.hex()}" @@ -75,7 +79,12 @@ def decrypt_message( encrypted_message = encrypted_message[2:] bytes_encrypted = bytes.fromhex(encrypted_message) - return recipient_keypair.decrypt_message(bytes_encrypted, sender_public_key) + if CRYPTO_TYPE == KeypairType.ED25519: + decrypted = recipient_keypair.decrypt_message(bytes_encrypted, sender_public_key) + else: + decrypted = sr25519_decrypt(bytes_encrypted, recipient_keypair.private_key) + + return decrypted def encrypt_for_devices(data: str, sender_kp: Keypair, devices: tp.List[str]) -> str: @@ -90,7 +99,7 @@ def encrypt_for_devices(data: str, sender_kp: Keypair, devices: tp.List[str]) -> """ try: random_seed = Keypair.generate_mnemonic() - random_acc = Account(random_seed, crypto_type=KeypairType.ED25519) + random_acc = Account(random_seed, crypto_type=CRYPTO_TYPE) encrypted_data = encrypt_message( str(data), sender_kp, random_acc.keypair.public_key ) @@ -99,7 +108,7 @@ def encrypt_for_devices(data: str, sender_kp: Keypair, devices: tp.List[str]) -> for device in devices: try: receiver_kp = Keypair( - ss58_address=device, crypto_type=KeypairType.ED25519 + ss58_address=device, crypto_type=CRYPTO_TYPE ) encrypted_key = encrypt_message( random_seed, sender_kp, receiver_kp.public_key @@ -139,7 +148,7 @@ def decrypt_message_devices( sender_public_key, recipient_keypair, ).decode("utf-8") - decrypted_acc = Account(decrypted_seed, crypto_type=KeypairType.ED25519) + decrypted_acc = Account(decrypted_seed, crypto_type=CRYPTO_TYPE) decrypted_data = decrypt_message( data_json["data"], sender_public_key, decrypted_acc.keypair ).decode("utf-8") @@ -351,7 +360,7 @@ def verify_sign(signature: str, address: str) -> bool: if signature.startswith("0x"): signature = signature[2:] signature_bytes = bytes.fromhex(signature) - keypair = Keypair(ss58_address=address, crypto_type=KeypairType.ED25519) + keypair = Keypair(ss58_address=address, crypto_type=CRYPTO_TYPE) return keypair.verify(address, signature_bytes) except Exception as e: _LOGGER.error(f"Exception during signature verification: {e}")