@ -23,6 +23,21 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
var (
memcacheFlushTimeTimer = metrics . NewRegisteredResettingTimer ( "trie/memcache/flush/time" , nil )
memcacheFlushNodesMeter = metrics . NewRegisteredMeter ( "trie/memcache/flush/nodes" , nil )
memcacheFlushSizeMeter = metrics . NewRegisteredMeter ( "trie/memcache/flush/size" , nil )
memcacheGCTimeTimer = metrics . NewRegisteredResettingTimer ( "trie/memcache/gc/time" , nil )
memcacheGCNodesMeter = metrics . NewRegisteredMeter ( "trie/memcache/gc/nodes" , nil )
memcacheGCSizeMeter = metrics . NewRegisteredMeter ( "trie/memcache/gc/size" , nil )
memcacheCommitTimeTimer = metrics . NewRegisteredResettingTimer ( "trie/memcache/commit/time" , nil )
memcacheCommitNodesMeter = metrics . NewRegisteredMeter ( "trie/memcache/commit/nodes" , nil )
memcacheCommitSizeMeter = metrics . NewRegisteredMeter ( "trie/memcache/commit/size" , nil )
)
)
// secureKeyPrefix is the database key prefix used to store trie node preimages.
// secureKeyPrefix is the database key prefix used to store trie node preimages.
@ -46,15 +61,22 @@ type DatabaseReader interface {
type Database struct {
type Database struct {
diskdb ethdb . Database // Persistent storage for matured trie nodes
diskdb ethdb . Database // Persistent storage for matured trie nodes
nodes map [ common . Hash ] * cachedNode // Data and references relationships of a node
nodes map [ common . Hash ] * cachedNode // Data and references relationships of a node
preimages map [ common . Hash ] [ ] byte // Preimages of nodes from the secure trie
oldest common . Hash // Oldest tracked node, flush-list head
seckeybuf [ secureKeyLength ] byte // Ephemeral buffer for calculating preimage keys
newest common . Hash // Newest tracked node, flush-list tail
preimages map [ common . Hash ] [ ] byte // Preimages of nodes from the secure trie
seckeybuf [ secureKeyLength ] byte // Ephemeral buffer for calculating preimage keys
gctime time . Duration // Time spent on garbage collection since last commit
gctime time . Duration // Time spent on garbage collection since last commit
gcnodes uint64 // Nodes garbage collected since last commit
gcnodes uint64 // Nodes garbage collected since last commit
gcsize common . StorageSize // Data storage garbage collected since last commit
gcsize common . StorageSize // Data storage garbage collected since last commit
nodesSize common . StorageSize // Storage size of the nodes cache
flushtime time . Duration // Time spent on data flushing since last commit
flushnodes uint64 // Nodes flushed since last commit
flushsize common . StorageSize // Data storage flushed since last commit
nodesSize common . StorageSize // Storage size of the nodes cache (exc. flushlist)
preimagesSize common . StorageSize // Storage size of the preimages cache
preimagesSize common . StorageSize // Storage size of the preimages cache
lock sync . RWMutex
lock sync . RWMutex
@ -66,6 +88,9 @@ type cachedNode struct {
blob [ ] byte // Cached data block of the trie node
blob [ ] byte // Cached data block of the trie node
parents int // Number of live nodes referencing this one
parents int // Number of live nodes referencing this one
children map [ common . Hash ] int // Children referenced by this nodes
children map [ common . Hash ] int // Children referenced by this nodes
flushPrev common . Hash // Previous node in the flush-list
flushNext common . Hash // Next node in the flush-list
}
}
// NewDatabase creates a new trie database to store ephemeral trie content before
// NewDatabase creates a new trie database to store ephemeral trie content before
@ -96,12 +121,20 @@ func (db *Database) Insert(hash common.Hash, blob []byte) {
// insert is the private locked version of Insert.
// insert is the private locked version of Insert.
func ( db * Database ) insert ( hash common . Hash , blob [ ] byte ) {
func ( db * Database ) insert ( hash common . Hash , blob [ ] byte ) {
// If the node's already cached, skip
if _ , ok := db . nodes [ hash ] ; ok {
if _ , ok := db . nodes [ hash ] ; ok {
return
return
}
}
db . nodes [ hash ] = & cachedNode {
db . nodes [ hash ] = & cachedNode {
blob : common . CopyBytes ( blob ) ,
blob : common . CopyBytes ( blob ) ,
children : make ( map [ common . Hash ] int ) ,
children : make ( map [ common . Hash ] int ) ,
flushPrev : db . newest ,
}
// Update the flush-list endpoints
if db . oldest == ( common . Hash { } ) {
db . oldest , db . newest = hash , hash
} else {
db . nodes [ db . newest ] . flushNext , db . newest = hash , hash
}
}
db . nodesSize += common . StorageSize ( common . HashLength + len ( blob ) )
db . nodesSize += common . StorageSize ( common . HashLength + len ( blob ) )
}
}
@ -208,6 +241,10 @@ func (db *Database) Dereference(child common.Hash, parent common.Hash) {
db . gcsize += storage - db . nodesSize
db . gcsize += storage - db . nodesSize
db . gctime += time . Since ( start )
db . gctime += time . Since ( start )
memcacheGCTimeTimer . Update ( time . Since ( start ) )
memcacheGCSizeMeter . Mark ( int64 ( storage - db . nodesSize ) )
memcacheGCNodesMeter . Mark ( int64 ( nodes - len ( db . nodes ) ) )
log . Debug ( "Dereferenced trie from memory database" , "nodes" , nodes - len ( db . nodes ) , "size" , storage - db . nodesSize , "time" , time . Since ( start ) ,
log . Debug ( "Dereferenced trie from memory database" , "nodes" , nodes - len ( db . nodes ) , "size" , storage - db . nodesSize , "time" , time . Since ( start ) ,
"gcnodes" , db . gcnodes , "gcsize" , db . gcsize , "gctime" , db . gctime , "livenodes" , len ( db . nodes ) , "livesize" , db . nodesSize )
"gcnodes" , db . gcnodes , "gcsize" , db . gcsize , "gctime" , db . gctime , "livenodes" , len ( db . nodes ) , "livesize" , db . nodesSize )
}
}
@ -221,7 +258,7 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) {
if node . children [ child ] == 0 {
if node . children [ child ] == 0 {
delete ( node . children , child )
delete ( node . children , child )
}
}
// If the node does not exist, it's a previously committed node.
// If the child does not exist, it's a previously committed node.
node , ok := db . nodes [ child ]
node , ok := db . nodes [ child ]
if ! ok {
if ! ok {
return
return
@ -229,6 +266,14 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) {
// If there are no more references to the child, delete it and cascade
// If there are no more references to the child, delete it and cascade
node . parents --
node . parents --
if node . parents == 0 {
if node . parents == 0 {
// Remove the node from the flush-list
if child == db . oldest {
db . oldest = node . flushNext
} else {
db . nodes [ node . flushPrev ] . flushNext = node . flushNext
db . nodes [ node . flushNext ] . flushPrev = node . flushPrev
}
// Dereference all children and delete the node
for hash := range node . children {
for hash := range node . children {
db . dereference ( hash , child )
db . dereference ( hash , child )
}
}
@ -237,6 +282,107 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) {
}
}
}
}
// Cap iteratively flushes old but still referenced trie nodes until the total
// memory usage goes below the given threshold.
func ( db * Database ) Cap ( limit common . StorageSize ) error {
// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
// memory cache during commit but not yet in persistent storage). This is ensured
// by only uncaching existing data when the database write finalizes.
db . lock . RLock ( )
nodes , storage , start := len ( db . nodes ) , db . nodesSize , time . Now ( )
batch := db . diskdb . NewBatch ( )
// db.nodesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted. For every useful node, we track 2 extra hashes as the flushlist.
size := db . nodesSize + common . StorageSize ( len ( db . nodes ) * 2 * common . HashLength )
// If the preimage cache got large enough, push to disk. If it's still small
// leave for later to deduplicate writes.
flushPreimages := db . preimagesSize > 4 * 1024 * 1024
if flushPreimages {
for hash , preimage := range db . preimages {
if err := batch . Put ( db . secureKey ( hash [ : ] ) , preimage ) ; err != nil {
log . Error ( "Failed to commit preimage from trie database" , "err" , err )
db . lock . RUnlock ( )
return err
}
if batch . ValueSize ( ) > ethdb . IdealBatchSize {
if err := batch . Write ( ) ; err != nil {
db . lock . RUnlock ( )
return err
}
batch . Reset ( )
}
}
}
// Keep committing nodes from the flush-list until we're below allowance
oldest := db . oldest
for size > limit && oldest != ( common . Hash { } ) {
// Fetch the oldest referenced node and push into the batch
node := db . nodes [ oldest ]
if err := batch . Put ( oldest [ : ] , node . blob ) ; err != nil {
db . lock . RUnlock ( )
return err
}
// If we exceeded the ideal batch size, commit and reset
if batch . ValueSize ( ) >= ethdb . IdealBatchSize {
if err := batch . Write ( ) ; err != nil {
log . Error ( "Failed to write flush list to disk" , "err" , err )
db . lock . RUnlock ( )
return err
}
batch . Reset ( )
}
// Iterate to the next flush item, or abort if the size cap was achieved. Size
// is the total size, including both the useful cached data (hash -> blob), as
// well as the flushlist metadata (2*hash). When flushing items from the cache,
// we need to reduce both.
size -= common . StorageSize ( 3 * common . HashLength + len ( node . blob ) )
oldest = node . flushNext
}
// Flush out any remainder data from the last batch
if err := batch . Write ( ) ; err != nil {
log . Error ( "Failed to write flush list to disk" , "err" , err )
db . lock . RUnlock ( )
return err
}
db . lock . RUnlock ( )
// Write successful, clear out the flushed data
db . lock . Lock ( )
defer db . lock . Unlock ( )
if flushPreimages {
db . preimages = make ( map [ common . Hash ] [ ] byte )
db . preimagesSize = 0
}
for db . oldest != oldest {
node := db . nodes [ db . oldest ]
delete ( db . nodes , db . oldest )
db . oldest = node . flushNext
db . nodesSize -= common . StorageSize ( common . HashLength + len ( node . blob ) )
}
if db . oldest != ( common . Hash { } ) {
db . nodes [ db . oldest ] . flushPrev = common . Hash { }
}
db . flushnodes += uint64 ( nodes - len ( db . nodes ) )
db . flushsize += storage - db . nodesSize
db . flushtime += time . Since ( start )
memcacheFlushTimeTimer . Update ( time . Since ( start ) )
memcacheFlushSizeMeter . Mark ( int64 ( storage - db . nodesSize ) )
memcacheFlushNodesMeter . Mark ( int64 ( nodes - len ( db . nodes ) ) )
log . Debug ( "Persisted nodes from memory database" , "nodes" , nodes - len ( db . nodes ) , "size" , storage - db . nodesSize , "time" , time . Since ( start ) ,
"flushnodes" , db . flushnodes , "flushsize" , db . flushsize , "flushtime" , db . flushtime , "livenodes" , len ( db . nodes ) , "livesize" , db . nodesSize )
return nil
}
// Commit iterates over all the children of a particular node, writes them out
// Commit iterates over all the children of a particular node, writes them out
// to disk, forcefully tearing down all references in both directions.
// to disk, forcefully tearing down all references in both directions.
//
//
@ -266,7 +412,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
}
}
}
}
// Move the trie itself into the batch, flushing if enough data is accumulated
// Move the trie itself into the batch, flushing if enough data is accumulated
nodes , storage := len ( db . nodes ) , db . nodesSize + db . preimagesSize
nodes , storage := len ( db . nodes ) , db . nodesSize
if err := db . commit ( node , batch ) ; err != nil {
if err := db . commit ( node , batch ) ; err != nil {
log . Error ( "Failed to commit trie from trie database" , "err" , err )
log . Error ( "Failed to commit trie from trie database" , "err" , err )
db . lock . RUnlock ( )
db . lock . RUnlock ( )
@ -289,15 +435,20 @@ func (db *Database) Commit(node common.Hash, report bool) error {
db . uncache ( node )
db . uncache ( node )
memcacheCommitTimeTimer . Update ( time . Since ( start ) )
memcacheCommitSizeMeter . Mark ( int64 ( storage - db . nodesSize ) )
memcacheCommitNodesMeter . Mark ( int64 ( nodes - len ( db . nodes ) ) )
logger := log . Info
logger := log . Info
if ! report {
if ! report {
logger = log . Debug
logger = log . Debug
}
}
logger ( "Persisted trie from memory database" , "nodes" , nodes - len ( db . nodes ) , "size" , storage - db . nodesSize , "time" , time . Since ( start ) ,
logger ( "Persisted trie from memory database" , "nodes" , nodes - len ( db . nodes ) + int ( db . flushnodes ) , "size" , storage - db . nodesSize + db . flushsize , "time" , time . Since ( start ) + db . flushtime ,
"gcnodes" , db . gcnodes , "gcsize" , db . gcsize , "gctime" , db . gctime , "livenodes" , len ( db . nodes ) , "livesize" , db . nodesSize )
"gcnodes" , db . gcnodes , "gcsize" , db . gcsize , "gctime" , db . gctime , "livenodes" , len ( db . nodes ) , "livesize" , db . nodesSize )
// Reset the garbage collection statistics
// Reset the garbage collection statistics
db . gcnodes , db . gcsize , db . gctime = 0 , 0 , 0
db . gcnodes , db . gcsize , db . gctime = 0 , 0 , 0
db . flushnodes , db . flushsize , db . flushtime = 0 , 0 , 0
return nil
return nil
}
}
@ -317,7 +468,7 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error {
if err := batch . Put ( hash [ : ] , node . blob ) ; err != nil {
if err := batch . Put ( hash [ : ] , node . blob ) ; err != nil {
return err
return err
}
}
// If we've reached an optimal m atch size, commit and start over
// If we've reached an optimal b atch size, commit and start over
if batch . ValueSize ( ) >= ethdb . IdealBatchSize {
if batch . ValueSize ( ) >= ethdb . IdealBatchSize {
if err := batch . Write ( ) ; err != nil {
if err := batch . Write ( ) ; err != nil {
return err
return err
@ -337,7 +488,14 @@ func (db *Database) uncache(hash common.Hash) {
if ! ok {
if ! ok {
return
return
}
}
// Otherwise uncache the node's subtries and remove the node itself too
// Node still exists, remove it from the flush-list
if hash == db . oldest {
db . oldest = node . flushNext
} else {
db . nodes [ node . flushPrev ] . flushNext = node . flushNext
db . nodes [ node . flushNext ] . flushPrev = node . flushPrev
}
// Uncache the node's subtries and remove the node itself too
for child := range node . children {
for child := range node . children {
db . uncache ( child )
db . uncache ( child )
}
}
@ -347,9 +505,13 @@ func (db *Database) uncache(hash common.Hash) {
// Size returns the current storage size of the memory cache in front of the
// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
// persistent database layer.
func ( db * Database ) Size ( ) common . StorageSize {
func ( db * Database ) Size ( ) ( common . StorageSize , common . StorageSize ) {
db . lock . RLock ( )
db . lock . RLock ( )
defer db . lock . RUnlock ( )
defer db . lock . RUnlock ( )
return db . nodesSize + db . preimagesSize
// db.nodesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted. For every useful node, we track 2 extra hashes as the flushlist.
var flushlistSize = common . StorageSize ( len ( db . nodes ) * 2 * common . HashLength )
return db . nodesSize + flushlistSize , db . preimagesSize
}
}