Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add more metrics for state/cache/miner; #1811

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
processGasGauge = metrics.NewRegisteredGauge("chain/process/gas", nil)

justifiedBlockGauge = metrics.NewRegisteredGauge("chain/head/justified", nil)
finalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
Expand All @@ -72,6 +73,14 @@ var (
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

processGasUsedHistogram = metrics.NewRegisteredHistogram("chain/process/gas/used", nil, metrics.NewExpDecaySample(1028, 0.015))
accountReadsHistogram = metrics.NewRegisteredHistogram("chain/account/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
storageReadsHistogram = metrics.NewRegisteredHistogram("chain/storage/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
snapshotAccountReadsHistogram = metrics.NewRegisteredHistogram("chain/snapshot/account/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
snapshotStorageReadsHistogram = metrics.NewRegisteredHistogram("chain/snapshot/storage/reads/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
accountUpdatesHistogram = metrics.NewRegisteredHistogram("chain/account/updates/hist", nil, metrics.NewExpDecaySample(1028, 0.015))
storageUpdatesHistogram = metrics.NewRegisteredHistogram("chain/storage/updates/hist", nil, metrics.NewExpDecaySample(1028, 0.015))

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
Expand Down Expand Up @@ -1975,10 +1984,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
if metrics.EnabledExpensive {
accountReadsHistogram.Update(statedb.AccountReadsCount)
storageReadsHistogram.Update(statedb.StorageReadsCount)
snapshotAccountReadsHistogram.Update(statedb.SnapshotAccountReadsCount)
snapshotStorageReadsHistogram.Update(statedb.SnapshotStorageReadsCount)
}
processGasUsedHistogram.Update(int64(usedGas))

blockExecutionTimer.Update(time.Since(substart))

Expand All @@ -1997,8 +2011,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
proctime := time.Since(start)

// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
if metrics.EnabledExpensive {
accountUpdatesHistogram.Update(statedb.AccountUpdatesCount)
storageUpdatesHistogram.Update(statedb.StorageUpdatesCount)
}

blockValidationTimer.Update(time.Since(substart))

Expand Down
4 changes: 3 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
end := chain[index]

// Assemble the log context and send it to the logger
mgasps := float64(st.usedGas) * 1000 / float64(elapsed)
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"elapsed", common.PrettyDuration(elapsed), "mgasps", mgasps,
"number", end.Number(), "hash", end.Hash(),
}
processGasGauge.Update(int64(mgasps))
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
Expand Down
18 changes: 16 additions & 2 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
// If we have a dirty value for this state entry, return it
value, dirty := s.dirtyStorage[key]
if dirty {
storageMeter.Mark(1)
storageDirtyHitMeter.Mark(1)
return value
}
// Otherwise return the entry's original value
Expand All @@ -200,6 +202,7 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {

func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
if value, cached := s.originStorage[key]; cached {
storageOriginHitMeter.Mark(1)
return value, true
}
// if L1 cache miss, try to get it from shared pool
Expand All @@ -210,6 +213,7 @@ func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
}
storage := val.(common.Hash)
s.originStorage[key] = storage
storageShareHitMeter.Mark(1)
return storage, true
}
return common.Hash{}, false
Expand All @@ -224,12 +228,14 @@ func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) {

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
storageMeter.Mark(1)
// If the fake storage is set, only lookup the state here(in the debugging mode)
if s.fakeStorage != nil {
return s.fakeStorage[key]
}
// If we have a pending write or clean cached, return that
if value, pending := s.pendingStorage[key]; pending {
storagePendingHitMeter.Mark(1)
return value
}

Expand All @@ -255,6 +261,10 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
s.db.SnapshotStorageReadsCount++
}
if err == nil {
storageSnapHitMeter.Mark(1)
}
}

Expand All @@ -267,11 +277,13 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
enc, err = s.getTrie(db).TryGet(key.Bytes())
if metrics.EnabledExpensive {
s.db.StorageReads += time.Since(start)
s.db.StorageReadsCount++
}
if err != nil {
s.setError(err)
return common.Hash{}
}
storageTrieHitMeter.Mark(1)
}
var value common.Hash
if len(enc) > 0 {
Expand Down Expand Up @@ -356,19 +368,21 @@ func (s *StateObject) updateTrie(db Database) Trie {
if len(s.pendingStorage) == 0 {
return s.trie
}
usedStorage := make([][]byte, 0, len(s.pendingStorage))
dirtyStorage := make(map[common.Hash][]byte)

// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.db.MetricsMux.Lock()
s.db.StorageUpdates += time.Since(start)
s.db.StorageUpdatesCount += int64(len(dirtyStorage))
s.db.MetricsMux.Unlock()
}(time.Now())
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)

usedStorage := make([][]byte, 0, len(s.pendingStorage))
dirtyStorage := make(map[common.Hash][]byte)
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand Down
37 changes: 35 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ var (
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")

emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes())

// metrics
accountMeter = metrics.NewRegisteredMeter("state/db/account", nil)
accountOriginHitMeter = metrics.NewRegisteredMeter("state/db/account/origin", nil)
accountSnapHitMeter = metrics.NewRegisteredMeter("state/db/account/snap", nil)
accountTrieHitMeter = metrics.NewRegisteredMeter("state/db/account/trie", nil)

storageMeter = metrics.NewRegisteredMeter("state/db/storage", nil)
storageDirtyHitMeter = metrics.NewRegisteredMeter("state/db/storage/dirty", nil)
storagePendingHitMeter = metrics.NewRegisteredMeter("state/db/storage/pending", nil)
storageOriginHitMeter = metrics.NewRegisteredMeter("state/db/storage/origin", nil)
storageShareHitMeter = metrics.NewRegisteredMeter("state/db/storage/share", nil)
storageSnapHitMeter = metrics.NewRegisteredMeter("state/db/storage/snap", nil)
storageTrieHitMeter = metrics.NewRegisteredMeter("state/db/storage/trie", nil)
)

type proofList [][]byte
Expand Down Expand Up @@ -142,6 +156,13 @@ type StateDB struct {
SnapshotStorageReads time.Duration
SnapshotCommits time.Duration

AccountReadsCount int64
StorageReadsCount int64
SnapshotAccountReadsCount int64
SnapshotStorageReadsCount int64
AccountUpdatesCount int64
StorageUpdatesCount int64

AccountUpdated int
StorageUpdated int
AccountDeleted int
Expand Down Expand Up @@ -639,7 +660,10 @@ func (s *StateDB) updateStateObject(obj *StateObject) {
}
// Track the amount of time wasted on updating the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.AccountUpdates += time.Since(start)
s.AccountUpdatesCount++
}(time.Now())
}
// Encode the account and update the account trie
addr := obj.Address()
Expand All @@ -655,7 +679,10 @@ func (s *StateDB) deleteStateObject(obj *StateObject) {
}
// Track the amount of time wasted on deleting the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.AccountUpdates += time.Since(start)
s.AccountUpdatesCount++
}(time.Now())
}
// Delete the account from the trie
addr := obj.Address()
Expand All @@ -679,8 +706,10 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
// flag set. This is needed by the state journal to revert to the correct s-
// destructed object instead of wiping all knowledge about the state object.
func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
accountMeter.Mark(1)
// Prefer live objects if any is available
if obj := s.stateObjects[addr]; obj != nil {
accountOriginHitMeter.Mark(1)
return obj
}
// If no live objects are available, attempt to use snapshots
Expand All @@ -690,8 +719,10 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
acc, err := s.snap.Account(crypto.HashData(s.hasher, addr.Bytes()))
if metrics.EnabledExpensive {
s.SnapshotAccountReads += time.Since(start)
s.SnapshotAccountReadsCount++
}
if err == nil {
accountSnapHitMeter.Mark(1)
if acc == nil {
return nil
}
Expand Down Expand Up @@ -724,11 +755,13 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject {
enc, err := s.trie.TryGet(addr.Bytes())
if metrics.EnabledExpensive {
s.AccountReads += time.Since(start)
s.AccountReadsCount++
}
if err != nil {
s.setError(fmt.Errorf("getDeleteStateObject (%x) error: %v", addr.Bytes(), err))
return nil
}
accountTrieHitMeter.Mark(1)
if len(enc) == 0 {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type triePrefetcher struct {
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

fetchTrieMeter metrics.Meter
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand Down Expand Up @@ -92,6 +93,7 @@ func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace stri
closeMainDoneChan: make(chan struct{}),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),

fetchTrieMeter: metrics.GetOrRegisterMeter(prefix+"/fetch", nil),
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
Expand Down Expand Up @@ -278,6 +280,9 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it.
func (p *triePrefetcher) trie(root common.Hash) Trie {
if p.fetchTrieMeter != nil {
p.fetchTrieMeter.Mark(1)
}
// If the prefetcher is inactive, return from existing deep copies
if p.fetches != nil {
trie := p.fetches[root]
Expand Down
16 changes: 16 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ const (
var (
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)
packageBlockTimer = metrics.NewRegisteredTimer("worker/packageblock", nil)
processBlockTimer = metrics.NewRegisteredTimer("worker/processblock", nil)
orderTxsTimer = metrics.NewRegisteredTimer("worker/ordertxs", nil)
prepareWorkTimer = metrics.NewRegisteredTimer("worker/preparework", nil)

errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
Expand Down Expand Up @@ -476,7 +480,9 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
start := time.Now()
w.commitWork(req.interruptCh, req.timestamp)
packageBlockTimer.Update(time.Since(start))

case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
Expand Down Expand Up @@ -777,6 +783,9 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) error {
defer func(start time.Time) {
processBlockTimer.Update(time.Since(start))
}(time.Now())
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand Down Expand Up @@ -931,6 +940,9 @@ type generateParams struct {
// either based on the last chain head or specified parent. In this function
// the pending transactions are not filled yet, only the empty task returned.
func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
defer func(start time.Time) {
prepareWorkTimer.Update(time.Since(start))
}(time.Now())
w.mu.RLock()
defer w.mu.RUnlock()

Expand Down Expand Up @@ -1026,7 +1038,9 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop

err = nil
if len(localTxs) > 0 {
start := time.Now()
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
orderTxsTimer.Update(time.Since(start))
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
// we will abort here when:
// 1.new block was imported
Expand All @@ -1039,7 +1053,9 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop
}
}
if len(remoteTxs) > 0 {
start := time.Now()
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
orderTxsTimer.Update(time.Since(start))
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
}

Expand Down