diff --git a/oracle/networks.py b/oracle/networks.py index 2a21cac..d148fe0 100644 --- a/oracle/networks.py +++ b/oracle/networks.py @@ -1,7 +1,6 @@ from datetime import timedelta from decouple import Csv, config -from ens.constants import EMPTY_ADDR_HEX from eth_typing import HexStr from web3 import Web3 @@ -69,9 +68,6 @@ "0x0100000000000000000000002296e122c1a20fca3cac3371357bdad3be0df079" ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), - ORACLE_STAKEWISE_OPERATOR=Web3.toChecksumAddress( - "0x5fc60576b92c5ce5c341c43e3b2866eb9e0cddd1" - ), AWS_BUCKET_NAME=config("AWS_BUCKET_NAME", default="oracle-votes-mainnet"), AWS_REGION=config("AWS_REGION", default="eu-central-1"), AWS_ACCESS_KEY_ID=config("AWS_ACCESS_KEY_ID", default=""), @@ -142,7 +138,6 @@ "0x0100000000000000000000005c631621b897f467dd6a91855a0bc97d77b78dc0" ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), - ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, AWS_BUCKET_NAME=config( "AWS_BUCKET_NAME", default="oracle-votes-harbour-mainnet", @@ -216,7 +211,6 @@ "0x010000000000000000000000040f15c6b5bfc5f324ecab5864c38d4e1eef4218" ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), - ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, AWS_BUCKET_NAME=config("AWS_BUCKET_NAME", default="oracle-votes-goerli"), AWS_REGION=config("AWS_REGION", default="eu-central-1"), AWS_ACCESS_KEY_ID=config("AWS_ACCESS_KEY_ID", default=""), @@ -287,7 +281,6 @@ "0x0100000000000000000000006dfc9682e3c3263758ad96e2b2ba9822167f81ee" ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), - ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, AWS_BUCKET_NAME=config( "AWS_BUCKET_NAME", default="oracle-votes-perm-goerli", @@ -361,7 +354,6 @@ "0x010000000000000000000000fc9b67b6034f6b306ea9bd8ec1baf3efa2490394" ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), - ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, AWS_BUCKET_NAME=config("AWS_BUCKET_NAME", default="oracle-votes-gnosis"), AWS_REGION=config("AWS_REGION", default="eu-north-1"), AWS_ACCESS_KEY_ID=config("AWS_ACCESS_KEY_ID", default=""), diff --git a/oracle/oracle/common/graphql_queries.py b/oracle/oracle/common/graphql_queries.py index 428fc68..d3d51a0 100644 --- a/oracle/oracle/common/graphql_queries.py +++ b/oracle/oracle/common/graphql_queries.py @@ -413,12 +413,12 @@ LAST_VALIDATORS_QUERY = gql( """ - query getValidators($block_number: Int) { + query getValidators($block_number: Int, $count: Int) { validators( block: { number: $block_number } orderBy: createdAtBlock orderDirection: desc - first: 1 + first: $count ) { operator { id diff --git a/oracle/oracle/validators/controller.py b/oracle/oracle/validators/controller.py index 94e5191..895dc35 100644 --- a/oracle/oracle/validators/controller.py +++ b/oracle/oracle/validators/controller.py @@ -1,5 +1,5 @@ import logging -from typing import List, Set +from typing import List from eth_account.signers.local import LocalAccount from eth_typing import BlockNumber, HexStr @@ -17,8 +17,9 @@ WAD, ) -from .eth1 import get_validators_deposit_root, select_validator +from .eth1 import get_validators_deposit_root from .types import ValidatorDepositData, ValidatorsVote, ValidatorVotingParameters +from .validator import select_validators logger = logging.getLogger(__name__) w3 = Web3() @@ -55,20 +56,10 @@ async def process( # not enough balance to register next validator return - validators_deposit_data: List[ValidatorDepositData] = [] - used_pubkeys: Set[HexStr] = set() - for _ in range(validators_count): - # select next validator - # TODO: implement scoring system based on the operators performance - deposit_data = await select_validator( - block_number=block_number, - used_pubkeys=used_pubkeys, - ) - if deposit_data is None: - break - - used_pubkeys.add(deposit_data["public_key"]) - validators_deposit_data.append(deposit_data) + validators_deposit_data: List[ValidatorDepositData] = await select_validators( + block_number=block_number, + validators_count=validators_count, + ) if not validators_deposit_data: logger.warning("Run out of validator keys") diff --git a/oracle/oracle/validators/eth1.py b/oracle/oracle/validators/eth1.py index ab24d84..1e46335 100644 --- a/oracle/oracle/validators/eth1.py +++ b/oracle/oracle/validators/eth1.py @@ -1,6 +1,5 @@ -from typing import Dict, Set, Union +from typing import Dict -from ens.constants import EMPTY_ADDR_HEX from eth_typing import HexStr from web3 import Web3 from web3.types import BlockNumber @@ -15,76 +14,41 @@ VALIDATOR_REGISTRATIONS_LATEST_INDEX_QUERY, VALIDATOR_REGISTRATIONS_QUERY, ) -from oracle.oracle.common.ipfs import ipfs_fetch -from oracle.settings import NETWORK, NETWORK_CONFIG +from oracle.settings import NETWORK -from .types import ValidatorDepositData +from .types import Operator -async def select_validator( - block_number: BlockNumber, used_pubkeys: Set[HexStr] -) -> Union[None, ValidatorDepositData]: - """Selects the next validator to register.""" +async def get_operators(block_number: BlockNumber) -> list[Operator]: + """Fetch list of registered operators""" result: Dict = await execute_sw_gql_query( network=NETWORK, query=OPERATORS_QUERY, variables=dict(block_number=block_number), ) - operators = result["operators"] + return [ + Operator( + id=Web3.toChecksumAddress(x["id"]), + deposit_data_merkle_proofs=x["depositDataMerkleProofs"], + deposit_data_index=int(x["depositDataIndex"]), + ) + for x in result["operators"] + ] + + +async def get_last_operators( + block_number: BlockNumber, validators_count: int +) -> list[HexStr]: + """Fetch last registered validator's operators addresses.""" result: Dict = await execute_sw_gql_query( network=NETWORK, query=LAST_VALIDATORS_QUERY, - variables=dict(block_number=block_number), + variables=dict(block_number=block_number, count=validators_count), ) - - last_validators = result["validators"] - if last_validators: - last_operator_id = last_validators[0]["operator"]["id"] - index = _find_operator_index(operators, last_operator_id) - if index is not None and index != len(operators) - 1: - operators = operators[index + 1 :] + [operators[index]] + operators[:index] - - _move_to_bottom(operators, NETWORK_CONFIG["ORACLE_STAKEWISE_OPERATOR"]) - - for operator in operators: - merkle_proofs = operator["depositDataMerkleProofs"] - if not merkle_proofs: - continue - - operator_address = Web3.toChecksumAddress(operator["id"]) - deposit_data_index = int(operator["depositDataIndex"]) - deposit_datum = await ipfs_fetch(merkle_proofs) - - max_deposit_data_index = len(deposit_datum) - 1 - if deposit_data_index > max_deposit_data_index: - continue - - selected_deposit_data = deposit_datum[deposit_data_index] - public_key = selected_deposit_data["public_key"] - can_register = public_key not in used_pubkeys and await can_register_validator( - block_number, public_key - ) - while deposit_data_index < max_deposit_data_index and not can_register: - # the edge case when the validator was registered in previous merkle root - # and the deposit data is presented in the same. - deposit_data_index += 1 - selected_deposit_data = deposit_datum[deposit_data_index] - public_key = selected_deposit_data["public_key"] - can_register = ( - public_key not in used_pubkeys - and await can_register_validator(block_number, public_key) - ) - - if can_register: - return ValidatorDepositData( - operator=operator_address, - public_key=selected_deposit_data["public_key"], - withdrawal_credentials=selected_deposit_data["withdrawal_credentials"], - deposit_data_root=selected_deposit_data["deposit_data_root"], - deposit_data_signature=selected_deposit_data["signature"], - proof=selected_deposit_data["proof"], - ) - return None + operators = [] + for validator in result["validators"]: + operators.append(validator["operator"]["id"]) + return operators async def can_register_validator(block_number: BlockNumber, public_key: HexStr) -> bool: @@ -107,22 +71,3 @@ async def get_validators_deposit_root(block_number: BlockNumber) -> HexStr: variables=dict(block_number=block_number), ) return result["validatorRegistrations"][0]["validatorsDepositRoot"] - - -def _move_to_bottom(operators, operator_id): - if operator_id == EMPTY_ADDR_HEX: - return - - index = _find_operator_index(operators, operator_id) - if index is not None: - operators.append(operators.pop(index)) - - -def _find_operator_index(operators, operator_id): - index = None - operator_id = Web3.toChecksumAddress(operator_id) - for i, operator in enumerate(operators): - if Web3.toChecksumAddress(operator["id"]) == operator_id: - index = i - break - return index diff --git a/oracle/oracle/validators/tests/test_controller.py b/oracle/oracle/validators/tests/test_controller.py index 754926d..d25dea2 100644 --- a/oracle/oracle/validators/tests/test_controller.py +++ b/oracle/oracle/validators/tests/test_controller.py @@ -129,7 +129,7 @@ async def test_process_success(self): validatorsDepositRoot=vote["validators_deposit_root"] ), ), patch( - "oracle.oracle.validators.eth1.ipfs_fetch", + "oracle.oracle.validators.validator.ipfs_fetch", side_effect=ipfs_fetch_query( deposit_data_root=vote["deposit_data"][0]["deposit_data_root"], public_key=vote["deposit_data"][0]["public_key"], diff --git a/oracle/oracle/validators/tests/test_validator.py b/oracle/oracle/validators/tests/test_validator.py new file mode 100644 index 0000000..d431c15 --- /dev/null +++ b/oracle/oracle/validators/tests/test_validator.py @@ -0,0 +1,134 @@ +from unittest.mock import patch + +from web3 import Web3 +from web3.types import BlockNumber + +from oracle.oracle.tests.factories import faker + +from ..types import ValidatorDepositData +from ..validator import select_validators + +w3 = Web3() +block_number: BlockNumber = faker.random_int(150000, 250000) + + +def generate_operator(deposit_data_count, deposit_data_index) -> dict: + return { + "ipfs": [ + { + "amount": str(32 * 10**9), + "deposit_data_root": faker.eth_proof(), + "proof": [faker.eth_proof()] * 6, + "public_key": faker.eth_public_key(), + "signature": faker.eth_signature(), + "withdrawal_credentials": faker.eth_address(), + } + for x in range(deposit_data_count) + ], + "deposit_data_merkle_proofs": "/ipfs/" + faker.text(max_nb_chars=20), + "deposit_data_index": deposit_data_index, + "id": faker.eth_address(), + } + + +def _to_validator_deposit_data(operator, deposit_data_index): + return ValidatorDepositData( + operator=operator["id"], + public_key=operator["ipfs"][deposit_data_index]["public_key"], + withdrawal_credentials=operator["ipfs"][deposit_data_index][ + "withdrawal_credentials" + ], + deposit_data_root=operator["ipfs"][deposit_data_index]["deposit_data_root"], + deposit_data_signature=operator["ipfs"][deposit_data_index]["signature"], + proof=operator["ipfs"][deposit_data_index]["proof"], + ) + + +class TestValidatorSelect: + async def _process(self, validators_count, operators, last_operators_ids): + with patch( + "oracle.oracle.validators.validator.can_register_validator", + return_value=True, + ), patch( + "oracle.oracle.validators.validator.get_last_operators", + return_value=last_operators_ids, + ), patch( + "oracle.oracle.validators.validator.get_operators", + return_value=operators, + ), patch( + "oracle.oracle.validators.validator.ipfs_fetch", + side_effect=lambda ipfs_hash: [ + operator["ipfs"] + for operator in operators + if operator["deposit_data_merkle_proofs"] == ipfs_hash + ][0], + ): + return await select_validators( + block_number=faker.random_int(10000000, 15000000), + validators_count=validators_count, + ) + + async def test_single(self): + operators = [ + generate_operator(4, 2), + ] + result = await self._process( + validators_count=1, operators=operators, last_operators_ids=[] + ) + assert result == [_to_validator_deposit_data(operators[0], 2)] + + async def test_none(self): + operators = [ + generate_operator(2, 4), + ] + result = await self._process( + validators_count=1, operators=operators, last_operators_ids=[] + ) + assert result == [] + + async def test_single_several(self): + operators = [ + generate_operator(50, 2), + ] + result = await self._process( + validators_count=3, operators=operators, last_operators_ids=[] + ) + assert result == [ + _to_validator_deposit_data(operators[0], 2), + _to_validator_deposit_data(operators[0], 3), + _to_validator_deposit_data(operators[0], 4), + ] + + async def test_basic_1(self): + operators = [ + generate_operator(50, 2), + generate_operator(50, 2), + ] + result = await self._process( + validators_count=2, + operators=operators, + last_operators_ids=[operators[0]["id"]] * 10, + ) + assert result == [ + _to_validator_deposit_data(operators[1], 2), + _to_validator_deposit_data(operators[1], 3), + ] + + async def test_basic_2(self): + operators = [ + generate_operator(50, 2), + generate_operator(50, 2), + generate_operator(50, 2), + ] + + result = await self._process( + validators_count=3, + operators=operators, + last_operators_ids=[operators[0]["id"]] * 9 + [operators[1]["id"]] * 5, + ) + + assert result == [ + _to_validator_deposit_data(operators[0], 2), + _to_validator_deposit_data(operators[2], 2), + _to_validator_deposit_data(operators[2], 3), + ] diff --git a/oracle/oracle/validators/types.py b/oracle/oracle/validators/types.py index 04a64a6..9124271 100644 --- a/oracle/oracle/validators/types.py +++ b/oracle/oracle/validators/types.py @@ -32,3 +32,9 @@ class ValidatorsVote(TypedDict): validators_deposit_root: HexStr signature: HexStr deposit_data: List[ValidatorDepositData] + + +class Operator(TypedDict): + id: ChecksumAddress + deposit_data_merkle_proofs: str + deposit_data_index: int diff --git a/oracle/oracle/validators/validator.py b/oracle/oracle/validators/validator.py new file mode 100644 index 0000000..6dbf422 --- /dev/null +++ b/oracle/oracle/validators/validator.py @@ -0,0 +1,130 @@ +from typing import Set + +from eth_typing import HexStr +from web3 import Web3 +from web3.types import BlockNumber + +from oracle.oracle.common.ipfs import ipfs_fetch +from oracle.settings import ( + OPERATOR_WEIGHT_FIRST, + OPERATOR_WEIGHT_OTHERS, + OPERATOR_WEIGHT_SECOND, +) + +from .eth1 import can_register_validator, get_last_operators, get_operators +from .types import Operator, ValidatorDepositData + + +async def select_validators( + block_number: BlockNumber, validators_count: int +) -> list[ValidatorDepositData]: + """Selects the next validators to register.""" + used_pubkeys: Set[HexStr] = set() + deposit_datas: list[ValidatorDepositData] = [] + + operators = await get_operators(block_number) + weighted_operators = _get_weighted_operators(operators) + last_operators = await get_last_operators(block_number, len(weighted_operators)) + + discarded_operator_ids = set() + + while len(deposit_datas) < validators_count and len(discarded_operator_ids) < len( + operators + ): + operator = _select_operator( + weighted_operators, last_operators, discarded_operator_ids + ) + + deposit_data = await _process_operator(operator, used_pubkeys, block_number) + if deposit_data: + deposit_datas.append(deposit_data) + last_operators.append(operator["id"]) + used_pubkeys.add(deposit_data["public_key"]) + else: + discarded_operator_ids.add(operator["id"]) + + return deposit_datas + + +def _select_operator( + weighted_operators: list[Operator], + last_operator_ids: list[HexStr], + discarded_operator_ids: set[HexStr], +) -> Operator: + result = weighted_operators.copy() + last_operator_ids = last_operator_ids.copy() + if len(last_operator_ids) > len(weighted_operators): + last_operator_ids = last_operator_ids[:] + for operator_id in last_operator_ids: + index = _find_operator_index(result, operator_id) + if index is not None: + result.pop(index) + for operator in result + weighted_operators: + if operator["id"] not in discarded_operator_ids: + return operator + + +async def _process_operator( + operator: Operator, used_pubkeys: Set[HexStr], block_number: BlockNumber +) -> ValidatorDepositData | None: + merkle_proofs = operator["deposit_data_merkle_proofs"] + if not merkle_proofs: + return + + operator_address = Web3.toChecksumAddress(operator["id"]) + deposit_data_index = int(operator["deposit_data_index"]) + deposit_datum = await ipfs_fetch(merkle_proofs) + + max_deposit_data_index = len(deposit_datum) - 1 + if deposit_data_index > max_deposit_data_index: + return + + selected_deposit_data = deposit_datum[deposit_data_index] + public_key = selected_deposit_data["public_key"] + can_register = public_key not in used_pubkeys and await can_register_validator( + block_number, public_key + ) + while deposit_data_index < max_deposit_data_index and not can_register: + # the edge case when the validator was registered in previous merkle root + # and the deposit data is presented in the same. + deposit_data_index += 1 + selected_deposit_data = deposit_datum[deposit_data_index] + public_key = selected_deposit_data["public_key"] + can_register = public_key not in used_pubkeys and await can_register_validator( + block_number, public_key + ) + + if can_register: + return ValidatorDepositData( + operator=operator_address, + public_key=selected_deposit_data["public_key"], + withdrawal_credentials=selected_deposit_data["withdrawal_credentials"], + deposit_data_root=selected_deposit_data["deposit_data_root"], + deposit_data_signature=selected_deposit_data["signature"], + proof=selected_deposit_data["proof"], + ) + + +def _find_operator_index(operators: list[Operator], operator_id: str) -> int | None: + index = None + operator_id = Web3.toChecksumAddress(operator_id) + for i, operator in enumerate(operators): + if Web3.toChecksumAddress(operator["id"]) == operator_id: + index = i + break + return index + + +def _get_weighted_operators(operators: list[Operator]) -> list[Operator]: + if len(operators) < 2: + return operators + if len(operators) == 2: + return [operators[0]] * OPERATOR_WEIGHT_FIRST + [ + operators[1] + ] * OPERATOR_WEIGHT_SECOND + else: + return ( + [operators[0]] * OPERATOR_WEIGHT_FIRST + + [operators[1]] * OPERATOR_WEIGHT_SECOND + + operators[2:] * OPERATOR_WEIGHT_OTHERS + ) diff --git a/oracle/settings.py b/oracle/settings.py index d255335..2f4b95e 100644 --- a/oracle/settings.py +++ b/oracle/settings.py @@ -29,6 +29,10 @@ # oracle ORACLE_PROCESS_INTERVAL = config("ORACLE_PROCESS_INTERVAL", default=15, cast=int) +OPERATOR_WEIGHT_FIRST = 10 +OPERATOR_WEIGHT_SECOND = 5 +OPERATOR_WEIGHT_OTHERS = 2 + IPFS_FETCH_ENDPOINTS = config( "IPFS_FETCH_ENDPOINTS", cast=Csv(),