Skip to content

Commit

Permalink
Aggregator message TTL (#273)
Browse files Browse the repository at this point in the history
* Add message timestamp validation to Aggregator

* Add `Clock` abstraction to core

* Add `MESSAGE_SUBMISSION_TIMEOUT` constant

* Validate message against current system timestamp

- Ignores old messages

* Change checkpoint task interval

* Use proper timestamp as seconds

* Use `SystemClock` in tests by default

- Preserves old behavior

* Fix tests

- Use valid timestamps

* Remove aggregator options

- Ended up not being used in tests

* Initialize `clock` to `SystemClock` by default

* Use values instead of pointers for `core.SystemClock`

* Include `MESSAGE_BLS_AGGREGATION_TIMEOUT`

* feat: Map message expired error in aggregator rpc server

---------

Co-authored-by: Franco Barpp Gomes <[email protected]>
  • Loading branch information
emlautarom1 and Hyodar committed Jul 5, 2024
1 parent 4595854 commit 3258e73
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 8 deletions.
30 changes: 29 additions & 1 deletion aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Aggregator struct {
rollupBroadcaster RollupBroadcasterer
httpClient safeclient.SafeClient
wsClient safeclient.SafeClient
clock core.Clock

// TODO(edwin): once rpc & rest decouple from aggregator fome it with them
registry *prometheus.Registry
Expand Down Expand Up @@ -231,6 +232,7 @@ func NewAggregator(
rollupBroadcaster: rollupBroadcaster,
httpClient: ethHttpClient,
wsClient: ethWsClient,
clock: core.SystemClock,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService,
Expand Down Expand Up @@ -388,7 +390,7 @@ func (agg *Aggregator) sendNewCheckpointTask() {
return
}

toTimestamp := block.Time()
toTimestamp := block.Time() - uint64(types.MESSAGE_SUBMISSION_TIMEOUT.Seconds()) - uint64(types.MESSAGE_BLS_AGGREGATION_TIMEOUT.Seconds())
fromTimestamp := lastCheckpointToTimestamp + 1
if lastCheckpointToTimestamp == 0 {
fromTimestamp = toTimestamp - uint64(agg.checkpointInterval.Seconds())
Expand Down Expand Up @@ -555,6 +557,13 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate
return DigestError
}

timestamp := signedStateRootUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

err = agg.stateRootUpdateBlsAggregationService.InitializeMessageIfNotExists(
messageDigest,
coretypes.QUORUM_NUMBERS,
Expand Down Expand Up @@ -585,6 +594,13 @@ func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUp
return DigestError
}

timestamp := signedOperatorSetUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

blockNumber, err := agg.avsReader.GetOperatorSetUpdateBlock(context.Background(), signedOperatorSetUpdateMessage.Message.Id)
if err != nil {
agg.logger.Error("Failed to get operator set update block", "err", err)
Expand Down Expand Up @@ -674,3 +690,15 @@ func (agg *Aggregator) GetCheckpointMessages(fromTimestamp, toTimestamp uint64)
CheckpointMessages: *checkpointMessages,
}, nil
}

func (agg *Aggregator) validateMessageTimestamp(messageTimestamp uint64) error {
now := agg.clock.Now().Unix()
timeoutInSeconds := types.MESSAGE_SUBMISSION_TIMEOUT.Seconds()

// Prevent possible underflow (specially in testing)
if uint64(now) > uint64(timeoutInSeconds) && messageTimestamp < uint64(now)-uint64(timeoutInSeconds) {
return MessageExpiredError
}

return nil
}
52 changes: 49 additions & 3 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
aggmocks "github.com/NethermindEth/near-sffl/aggregator/mocks"
"github.com/NethermindEth/near-sffl/aggregator/types"
taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager"
"github.com/NethermindEth/near-sffl/core"
chainiomocks "github.com/NethermindEth/near-sffl/core/chainio/mocks"
safeclientmocks "github.com/NethermindEth/near-sffl/core/safeclient/mocks"
coretypes "github.com/NethermindEth/near-sffl/core/types"
Expand Down Expand Up @@ -59,8 +60,8 @@ func TestSendNewTask(t *testing.T) {

var TASK_INDEX = uint32(0)
var BLOCK_NUMBER = uint32(100)
var FROM_TIMESTAMP = uint64(3)
var TO_TIMESTAMP = uint64(4)
var FROM_TIMESTAMP = uint64(30_000)
var TO_TIMESTAMP = uint64(40_000)

mockClient.EXPECT().BlockNumber(context.Background()).Return(uint64(BLOCK_NUMBER), nil)
mockClient.EXPECT().BlockByNumber(context.Background(), big.NewInt(int64(BLOCK_NUMBER))).Return(
Expand All @@ -69,7 +70,11 @@ func TestSendNewTask(t *testing.T) {
)

mockAvsWriterer.EXPECT().SendNewCheckpointTask(
context.Background(), FROM_TIMESTAMP, TO_TIMESTAMP, types.TASK_QUORUM_THRESHOLD, coretypes.QUORUM_NUMBERS,
context.Background(),
FROM_TIMESTAMP,
TO_TIMESTAMP-uint64(types.MESSAGE_SUBMISSION_TIMEOUT.Seconds())-uint64(types.MESSAGE_BLS_AGGREGATION_TIMEOUT.Seconds()),
types.TASK_QUORUM_THRESHOLD,
coretypes.QUORUM_NUMBERS,
).Return(aggmocks.MockSendNewCheckpointTask(BLOCK_NUMBER, TASK_INDEX, FROM_TIMESTAMP, TO_TIMESTAMP))
mockAvsReaderer.EXPECT().GetLastCheckpointToTimestamp(context.Background()).Return(FROM_TIMESTAMP-1, nil)

Expand Down Expand Up @@ -149,6 +154,46 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
assert.NotContains(t, aggregator.operatorSetUpdates, msgDigest)
}

func TestExpiredStateRootUpdateMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(6000)
aggregator.clock = core.Clock{Now: func() time.Time { return time.Unix(int64(nowTimestamp), 0) }}
messageTimestamp := nowTimestamp - 60 - 1 // 60 seconds for message submission timeout and 1 second to be out of range

err = aggregator.ProcessSignedStateRootUpdateMessage(&messages.SignedStateRootUpdateMessage{
Message: messages.StateRootUpdateMessage{
Timestamp: messageTimestamp,
},
})

assert.Equal(t, MessageExpiredError, err)
}

func TestExpiredOperatorSetUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(8000)
aggregator.clock = core.Clock{Now: func() time.Time { return time.Unix(int64(nowTimestamp), 0) }}
messageTimestamp := nowTimestamp - 60 - 1 // 60 seconds for message submission timeout and 1 second to be out of range

err = aggregator.ProcessSignedOperatorSetUpdateMessage(&messages.SignedOperatorSetUpdateMessage{
Message: messages.OperatorSetUpdateMessage{
Timestamp: messageTimestamp,
},
})

assert.Equal(t, MessageExpiredError, err)
}

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
Expand Down Expand Up @@ -178,6 +223,7 @@ func createMockAggregator(
httpClient: mockClient,
wsClient: mockClient,
aggregatorListener: &SelectiveAggregatorListener{},
clock: core.SystemClock,
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
}
2 changes: 2 additions & 0 deletions aggregator/rpc_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ var (
UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature")
SignatureVerificationFailed400 = errors.New("400. Signature verification failed")
CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices")
MessageExpiredError500 = errors.New("500. Message expired")
UnknownError400 = errors.New("400. Unknown error")

errorsMap = map[error]error{
aggregator.DigestError: MessageDigestNotFoundError500,
aggregator.TaskResponseDigestError: TaskResponseDigestNotFoundError500,
aggregator.GetOperatorSetUpdateBlockError: OperatorSetUpdateBlockNotFoundError500,
aggregator.MessageExpiredError: MessageExpiredError500,
}
)

Expand Down
11 changes: 7 additions & 4 deletions aggregator/rpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"math/big"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
Expand All @@ -14,6 +15,7 @@ import (
eigentypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/NethermindEth/near-sffl/aggregator/types"
"github.com/NethermindEth/near-sffl/core"
coretypes "github.com/NethermindEth/near-sffl/core/types"
"github.com/NethermindEth/near-sffl/core/types/messages"
)
Expand Down Expand Up @@ -56,10 +58,11 @@ func TestProcessSignedStateRootUpdateMessage(t *testing.T) {
aggregator, _, _, _, mockMessageBlsAggServ, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

aggregator.clock = core.Clock{Now: func() time.Time { return time.Unix(10_000, 0) }}
message := messages.StateRootUpdateMessage{
RollupId: 1,
BlockHeight: 2,
Timestamp: 3,
Timestamp: 9_995,
NearDaCommitment: keccak256(4),
NearDaTransactionId: keccak256(5),
StateRoot: keccak256(6),
Expand All @@ -70,8 +73,7 @@ func TestProcessSignedStateRootUpdateMessage(t *testing.T) {
messageDigest, err := signedMessage.Message.Digest()
assert.Nil(t, err)

mockMessageBlsAggServ.EXPECT().ProcessNewSignature(context.Background(), messageDigest,
&signedMessage.BlsSignature, signedMessage.OperatorId)
mockMessageBlsAggServ.EXPECT().ProcessNewSignature(context.Background(), messageDigest, &signedMessage.BlsSignature, signedMessage.OperatorId)
mockMessageBlsAggServ.EXPECT().InitializeMessageIfNotExists(messageDigest, coretypes.QUORUM_NUMBERS, []eigentypes.QuorumThresholdPercentage{types.MESSAGE_AGGREGATION_QUORUM_THRESHOLD}, types.MESSAGE_TTL, types.MESSAGE_BLS_AGGREGATION_TIMEOUT, uint64(0))
err = aggregator.ProcessSignedStateRootUpdateMessage(signedMessage)
assert.Nil(t, err)
Expand All @@ -84,9 +86,10 @@ func TestProcessOperatorSetUpdateMessage(t *testing.T) {
aggregator, mockAvsReader, _, _, _, mockMessageBlsAggServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

aggregator.clock = core.Clock{Now: func() time.Time { return time.Unix(10_000, 0) }}
message := messages.OperatorSetUpdateMessage{
Id: 1,
Timestamp: 2,
Timestamp: 9_995,
Operators: []coretypes.RollupOperator{
{Pubkey: bls.NewG1Point(big.NewInt(3), big.NewInt(4)), Weight: big.NewInt(5)},
},
Expand Down
1 change: 1 addition & 0 deletions aggregator/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const QUERY_FILTER_FROM_BLOCK = uint64(1)

const MESSAGE_TTL = 1 * time.Minute
const MESSAGE_BLS_AGGREGATION_TIMEOUT = 30 * time.Second
const MESSAGE_SUBMISSION_TIMEOUT = 1 * time.Minute

type OperatorInfo struct {
OperatorPubkeys eigentypes.OperatorPubkeys
Expand Down
7 changes: 7 additions & 0 deletions core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"math/big"
"time"

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
Expand Down Expand Up @@ -86,3 +87,9 @@ func ConvertBytesToQuorumNumbers(input []byte) []eigentypes.QuorumNum {
}
return output
}

type Clock struct {
Now func() time.Time
}

var SystemClock = Clock{Now: time.Now}

0 comments on commit 3258e73

Please sign in to comment.