Skip to content

Commit

Permalink
metrics: add more metrics for state/cache/miner;
Browse files Browse the repository at this point in the history
  • Loading branch information
galaio committed Aug 9, 2023
1 parent c208d28 commit 225aed9
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 9 deletions.
28 changes: 24 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
processGasGauge = metrics.NewRegisteredGauge("chain/process/gas", nil)

justifiedBlockGauge = metrics.NewRegisteredGauge("chain/head/justified", nil)
finalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
Expand All @@ -72,6 +73,14 @@ var (
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

processGasUsedHistogram = metrics.NewRegisteredHistogram("chain/process/gas/used", nil, metrics.NewExpDecaySample(1028, 0.015))
accountReadsHistogram = metrics.NewRegisteredHistogram("chain/account/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
storageReadsHistogram = metrics.NewRegisteredHistogram("chain/storage/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
snapshotAccountReadsHistogram = metrics.NewRegisteredHistogram("chain/snapshot/account/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
snapshotStorageReadsHistogram = metrics.NewRegisteredHistogram("chain/snapshot/storage/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
accountUpdatesHistogram = metrics.NewRegisteredHistogram("chain/account/updates/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
storageUpdatesHistogram = metrics.NewRegisteredHistogram("chain/storage/updates/hist", nil, metrics.NewExpDecaySample(1028, 0.015))

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
Expand Down Expand Up @@ -1975,10 +1984,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
if metrics.EnabledExpensive {
accountReadsHistogram.Update(statedb.AccountReadsCount)
storageReadsHistogram.Update(statedb.StorageReadsCount)
snapshotAccountReadsHistogram.Update(statedb.SnapshotAccountReadsCount)
snapshotStorageReadsHistogram.Update(statedb.SnapshotStorageReadsCount)
}
processGasUsedHistogram.Update(int64(usedGas))

blockExecutionTimer.Update(time.Since(substart))

Expand All @@ -1997,8 +2011,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
proctime := time.Since(start)

// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
if metrics.EnabledExpensive {
accountUpdatesHistogram.Update(statedb.AccountUpdatesCount)
storageUpdatesHistogram.Update(statedb.StorageUpdatesCount)
}

blockValidationTimer.Update(time.Since(substart))

Expand Down
4 changes: 3 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
end := chain[index]

// Assemble the log context and send it to the logger
mgasps := float64(st.usedGas) * 1000 / float64(elapsed)
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"elapsed", common.PrettyDuration(elapsed), "mgasps", mgasps,
"number", end.Number(), "hash", end.Hash(),
}
processGasGauge.Update(int64(mgasps))
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
Expand Down
18 changes: 16 additions & 2 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
// If we have a dirty value for this state entry, return it
value, dirty := s.dirtyStorage[key]
if dirty {
storageMeter.Mark(1)
storageDirtyHitMeter.Mark(1)
return value
}
// Otherwise return the entry's original value
Expand All @@ -200,6 +202,7 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {

func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
if value, cached := s.originStorage[key]; cached {
storageOriginHitMeter.Mark(1)
return value, true
}
// if L1 cache miss, try to get it from shared pool
Expand All @@ -210,6 +213,7 @@ func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
}
storage := val.(common.Hash)
s.originStorage[key] = storage
storageShareHitMeter.Mark(1)
return storage, true
}
return common.Hash{}, false
Expand All @@ -224,12 +228,14 @@ func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) {

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
storageMeter.Mark(1)
// If the fake storage is set, only lookup the state here(in the debugging mode)
if s.fakeStorage != nil {
return s.fakeStorage[key]
}
// If we have a pending write or clean cached, return that
if value, pending := s.pendingStorage[key]; pending {
storagePendingHitMeter.Mark(1)
return value
}

Expand All @@ -255,6 +261,10 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
s.db.SnapshotStorageReadsCount++
}
if err == nil {
storageSnapHitMeter.Mark(1)
}
}

Expand All @@ -267,11 +277,13 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
enc, err = s.getTrie(db).TryGet(key.Bytes())
if metrics.EnabledExpensive {
s.db.StorageReads += time.Since(start)
s.db.StorageReadsCount++
}
if err != nil {
s.setError(err)
return common.Hash{}
}
storageTrieHitMeter.Mark(1)
}
var value common.Hash
if len(enc) > 0 {
Expand Down Expand Up @@ -356,19 +368,21 @@ func (s *StateObject) updateTrie(db Database) Trie {
if len(s.pendingStorage) == 0 {
return s.trie
}
usedStorage := make([][]byte, 0, len(s.pendingStorage))
dirtyStorage := make(map[common.Hash][]byte)

// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.db.MetricsMux.Lock()
s.db.StorageUpdates += time.Since(start)
s.db.StorageUpdatesCount += int64(len(dirtyStorage))
s.db.MetricsMux.Unlock()
}(time.Now())
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)

usedStorage := make([][]byte, 0, len(s.pendingStorage))
dirtyStorage := make(map[common.Hash][]byte)
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand Down
37 changes: 35 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ var (
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")

emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes())

// metrics
accountMeter = metrics.NewRegisteredMeter("state/db/account", nil)
accountOriginHitMeter = metrics.NewRegisteredMeter("state/db/account/origin", nil)
accountSnapHitMeter = metrics.NewRegisteredMeter("state/db/account/snap", nil)
accountTrieHitMeter = metrics.NewRegisteredMeter("state/db/account/trie", nil)

storageMeter = metrics.NewRegisteredMeter("state/db/storage", nil)
storageDirtyHitMeter = metrics.NewRegisteredMeter("state/db/storage/dirty", nil)
storagePendingHitMeter = metrics.NewRegisteredMeter("state/db/storage/pending", nil)
storageOriginHitMeter = metrics.NewRegisteredMeter("state/db/storage/origin", nil)
storageShareHitMeter = metrics.NewRegisteredMeter("state/db/storage/share", nil)
storageSnapHitMeter = metrics.NewRegisteredMeter("state/db/storage/snap", nil)
storageTrieHitMeter = metrics.NewRegisteredMeter("state/db/storage/trie", nil)
)

type proofList [][]byte
Expand Down Expand Up @@ -142,6 +156,13 @@ type StateDB struct {
SnapshotStorageReads time.Duration
SnapshotCommits time.Duration

AccountReadsCount int64
StorageReadsCount int64
SnapshotAccountReadsCount int64
SnapshotStorageReadsCount int64
AccountUpdatesCount int64
StorageUpdatesCount int64

AccountUpdated int
StorageUpdated int
AccountDeleted int
Expand Down Expand Up @@ -639,7 +660,10 @@ func (s *StateDB) updateStateObject(obj *StateObject) {
}
// Track the amount of time wasted on updating the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.AccountUpdates += time.Since(start)
s.AccountUpdatesCount++
}(time.Now())
}
// Encode the account and update the account trie
addr := obj.Address()
Expand All @@ -655,7 +679,10 @@ func (s *StateDB) deleteStateObject(obj *StateObject) {
}
// Track the amount of time wasted on deleting the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.AccountUpdates += time.Since(start)
s.AccountUpdatesCount++
}(time.Now())
}
// Delete the account from the trie
addr := obj.Address()
Expand All @@ -679,8 +706,10 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
// flag set. This is needed by the state journal to revert to the correct s-
// destructed object instead of wiping all knowledge about the state object.
func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
accountMeter.Mark(1)
// Prefer live objects if any is available
if obj := s.stateObjects[addr]; obj != nil {
accountOriginHitMeter.Mark(1)
return obj
}
// If no live objects are available, attempt to use snapshots
Expand All @@ -690,8 +719,10 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
acc, err := s.snap.Account(crypto.HashData(s.hasher, addr.Bytes()))
if metrics.EnabledExpensive {
s.SnapshotAccountReads += time.Since(start)
s.SnapshotAccountReadsCount++
}
if err == nil {
accountSnapHitMeter.Mark(1)
if acc == nil {
return nil
}
Expand Down Expand Up @@ -724,11 +755,13 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
enc, err := s.trie.TryGet(addr.Bytes())
if metrics.EnabledExpensive {
s.AccountReads += time.Since(start)
s.AccountReadsCount++
}
if err != nil {
s.setError(fmt.Errorf("getDeleteStateObject (%x) error: %v", addr.Bytes(), err))
return nil
}
accountTrieHitMeter.Mark(1)
if len(enc) == 0 {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type triePrefetcher struct {
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

fetchTrieMeter metrics.Meter
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand Down Expand Up @@ -92,6 +93,7 @@ func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace stri
closeMainDoneChan: make(chan struct{}),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),

fetchTrieMeter: metrics.GetOrRegisterMeter(prefix+"/fetch", nil),
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
Expand Down Expand Up @@ -278,6 +280,7 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it.
func (p *triePrefetcher) trie(root common.Hash) Trie {
p.fetchTrieMeter.Mark(1)
// If the prefetcher is inactive, return from existing deep copies
if p.fetches != nil {
trie := p.fetches[root]
Expand Down
16 changes: 16 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ const (
var (
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)
packageBlockTimer = metrics.NewRegisteredTimer("worker/packageblock", nil)
processBlockTimer = metrics.NewRegisteredTimer("worker/processblock", nil)
orderTxsTimer = metrics.NewRegisteredTimer("worker/ordertxs", nil)
prepareWorkTimer = metrics.NewRegisteredTimer("worker/preparework", nil)

errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
Expand Down Expand Up @@ -476,7 +480,9 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
start := time.Now()
w.commitWork(req.interruptCh, req.timestamp)
packageBlockTimer.Update(time.Since(start))

case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
Expand Down Expand Up @@ -777,6 +783,9 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) error {
defer func(start time.Time) {
processBlockTimer.Update(time.Since(start))
}(time.Now())
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand Down Expand Up @@ -931,6 +940,9 @@ type generateParams struct {
// either based on the last chain head or specified parent. In this function
// the pending transactions are not filled yet, only the empty task returned.
func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
defer func(start time.Time) {
prepareWorkTimer.Update(time.Since(start))
}(time.Now())
w.mu.RLock()
defer w.mu.RUnlock()

Expand Down Expand Up @@ -1026,7 +1038,9 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop

err = nil
if len(localTxs) > 0 {
start := time.Now()
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
orderTxsTimer.Update(time.Since(start))
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
// we will abort here when:
// 1.new block was imported
Expand All @@ -1039,7 +1053,9 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop
}
}
if len(remoteTxs) > 0 {
start := time.Now()
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
orderTxsTimer.Update(time.Since(start))
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
}

Expand Down

0 comments on commit 225aed9

Please sign in to comment.