core, eth, trie: reuse trie journals in all our code

pull/3053/head
Péter Szilágyi 8 years ago
parent cd791bd855
commit 710435b51b
  1. 7
      core/blockchain.go
  2. 51
      core/state/statedb.go
  3. 6
      eth/api.go
  4. 2
      eth/api_backend.go
  5. 2
      ethdb/database.go
  6. 2
      internal/ethapi/api.go
  7. 2
      miner/worker.go
  8. 25
      trie/secure_trie.go
  9. 71
      trie/secure_trie_test.go

@ -357,7 +357,12 @@ func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }
// State returns a new mutable state based on the current HEAD block. // State returns a new mutable state based on the current HEAD block.
func (self *BlockChain) State() (*state.StateDB, error) { func (self *BlockChain) State() (*state.StateDB, error) {
return state.New(self.CurrentBlock().Root(), self.chainDb) return self.StateAt(self.CurrentBlock().Root())
}
// StateAt returns a new mutable state based on a particular point in time.
func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
return self.stateCache.New(root)
} }
// Reset purges the entire blockchain, restoring it to its genesis state. // Reset purges the entire blockchain, restoring it to its genesis state.

@ -20,6 +20,7 @@ package state
import ( import (
"fmt" "fmt"
"math/big" "math/big"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -66,6 +67,8 @@ type StateDB struct {
txIndex int txIndex int
logs map[common.Hash]vm.Logs logs map[common.Hash]vm.Logs
logSize uint logSize uint
lock sync.Mutex
} }
// Create a new state from a given trie // Create a new state from a given trie
@ -86,32 +89,53 @@ func New(root common.Hash, db ethdb.Database) (*StateDB, error) {
}, nil }, nil
} }
// Reset clears out all emphemeral state objects from the state db, but keeps // New creates a new statedb by reusing any journalled tries to avoid costly
// the underlying state trie to avoid reloading data for the next operations. // disk io.
func (self *StateDB) Reset(root common.Hash) error { func (self *StateDB) New(root common.Hash) (*StateDB, error) {
self.lock.Lock()
defer self.lock.Unlock()
tr, err := self.openTrie(root) tr, err := self.openTrie(root)
if err != nil { if err != nil {
return err return nil, err
} }
*self = StateDB{ return &StateDB{
db: self.db, db: self.db,
trie: tr, trie: tr,
pastTries: self.pastTries,
codeSizeCache: self.codeSizeCache, codeSizeCache: self.codeSizeCache,
stateObjects: make(map[common.Address]*StateObject), stateObjects: make(map[common.Address]*StateObject),
stateObjectsDirty: make(map[common.Address]struct{}), stateObjectsDirty: make(map[common.Address]struct{}),
refund: new(big.Int), refund: new(big.Int),
logs: make(map[common.Hash]vm.Logs), logs: make(map[common.Hash]vm.Logs),
}, nil
}
// Reset clears out all emphemeral state objects from the state db, but keeps
// the underlying state trie to avoid reloading data for the next operations.
func (self *StateDB) Reset(root common.Hash) error {
self.lock.Lock()
defer self.lock.Unlock()
tr, err := self.openTrie(root)
if err != nil {
return err
} }
self.trie = tr
self.stateObjects = make(map[common.Address]*StateObject)
self.stateObjectsDirty = make(map[common.Address]struct{})
self.refund = new(big.Int)
self.thash = common.Hash{}
self.bhash = common.Hash{}
self.txIndex = 0
self.logs = make(map[common.Hash]vm.Logs)
self.logSize = 0
return nil return nil
} }
// openTrie creates a trie. It uses an existing trie if one is available // openTrie creates a trie. It uses an existing trie if one is available
// from the journal if available. // from the journal if available.
func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) { func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) {
if self.trie != nil && self.trie.Hash() == root {
return self.trie, nil
}
for i := len(self.pastTries) - 1; i >= 0; i-- { for i := len(self.pastTries) - 1; i >= 0; i-- {
if self.pastTries[i].Hash() == root { if self.pastTries[i].Hash() == root {
tr := *self.pastTries[i] tr := *self.pastTries[i]
@ -122,6 +146,9 @@ func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) {
} }
func (self *StateDB) pushTrie(t *trie.SecureTrie) { func (self *StateDB) pushTrie(t *trie.SecureTrie) {
self.lock.Lock()
defer self.lock.Unlock()
if len(self.pastTries) >= maxJournalLength { if len(self.pastTries) >= maxJournalLength {
copy(self.pastTries, self.pastTries[1:]) copy(self.pastTries, self.pastTries[1:])
self.pastTries[len(self.pastTries)-1] = t self.pastTries[len(self.pastTries)-1] = t
@ -381,6 +408,9 @@ func (self *StateDB) CreateAccount(addr common.Address) vm.Account {
// //
func (self *StateDB) Copy() *StateDB { func (self *StateDB) Copy() *StateDB {
self.lock.Lock()
defer self.lock.Unlock()
// Copy all the basic fields, initialize the memory ones // Copy all the basic fields, initialize the memory ones
state := &StateDB{ state := &StateDB{
db: self.db, db: self.db,
@ -406,6 +436,9 @@ func (self *StateDB) Copy() *StateDB {
} }
func (self *StateDB) Set(state *StateDB) { func (self *StateDB) Set(state *StateDB) {
self.lock.Lock()
defer self.lock.Unlock()
self.db = state.db self.db = state.db
self.trie = state.trie self.trie = state.trie
self.pastTries = state.pastTries self.pastTries = state.pastTries

@ -293,7 +293,7 @@ func (api *PublicDebugAPI) DumpBlock(number uint64) (state.Dump, error) {
if block == nil { if block == nil {
return state.Dump{}, fmt.Errorf("block #%d not found", number) return state.Dump{}, fmt.Errorf("block #%d not found", number)
} }
stateDb, err := state.New(block.Root(), api.eth.ChainDb()) stateDb, err := api.eth.BlockChain().StateAt(block.Root())
if err != nil { if err != nil {
return state.Dump{}, err return state.Dump{}, err
} }
@ -406,7 +406,7 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf
if err := core.ValidateHeader(api.config, blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash(), block.NumberU64()-1), true, false); err != nil { if err := core.ValidateHeader(api.config, blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash(), block.NumberU64()-1), true, false); err != nil {
return false, structLogger.StructLogs(), err return false, structLogger.StructLogs(), err
} }
statedb, err := state.New(blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1).Root(), api.eth.ChainDb()) statedb, err := blockchain.StateAt(blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1).Root())
if err != nil { if err != nil {
return false, structLogger.StructLogs(), err return false, structLogger.StructLogs(), err
} }
@ -501,7 +501,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common.
if parent == nil { if parent == nil {
return nil, fmt.Errorf("block parent %x not found", block.ParentHash()) return nil, fmt.Errorf("block parent %x not found", block.ParentHash())
} }
stateDb, err := state.New(parent.Root(), api.eth.ChainDb()) stateDb, err := api.eth.BlockChain().StateAt(parent.Root())
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -81,7 +81,7 @@ func (b *EthApiBackend) StateAndHeaderByNumber(blockNr rpc.BlockNumber) (ethapi.
if header == nil { if header == nil {
return nil, nil, nil return nil, nil, nil
} }
stateDb, err := state.New(header.Root, b.eth.chainDb) stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return EthApiState{stateDb}, header, err return EthApiState{stateDb}, header, err
} }

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
@ -84,6 +85,7 @@ func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
OpenFilesCacheCapacity: handles, OpenFilesCacheCapacity: handles,
BlockCacheCapacity: cache / 2 * opt.MiB, BlockCacheCapacity: cache / 2 * opt.MiB,
WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
Filter: filter.NewBloomFilter(10),
}) })
if _, corrupted := err.(*errors.ErrCorrupted); corrupted { if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(file, nil) db, err = leveldb.RecoverFile(file, nil)

@ -454,6 +454,8 @@ type CallArgs struct {
} }
func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (string, *big.Int, error) { func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (string, *big.Int, error) {
defer func(start time.Time) { glog.V(logger.Debug).Infof("call took %v", time.Since(start)) }(time.Now())
state, header, err := s.b.StateAndHeaderByNumber(blockNr) state, header, err := s.b.StateAndHeaderByNumber(blockNr)
if state == nil || err != nil { if state == nil || err != nil {
return "0x", common.Big0, err return "0x", common.Big0, err

@ -361,7 +361,7 @@ func (self *worker) push(work *Work) {
// makeCurrent creates a new environment for the current cycle. // makeCurrent creates a new environment for the current cycle.
func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error { func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error {
state, err := state.New(parent.Root(), self.eth.ChainDb()) state, err := self.chain.StateAt(parent.Root())
if err != nil { if err != nil {
return err return err
} }

@ -24,6 +24,8 @@ import (
var secureKeyPrefix = []byte("secure-key-") var secureKeyPrefix = []byte("secure-key-")
const secureKeyLength = 11 + 32 // Length of the above prefix + 32byte hash
// SecureTrie wraps a trie with key hashing. In a secure trie, all // SecureTrie wraps a trie with key hashing. In a secure trie, all
// access operations hash the key using keccak256. This prevents // access operations hash the key using keccak256. This prevents
// calling code from creating long chains of nodes that // calling code from creating long chains of nodes that
@ -36,9 +38,10 @@ var secureKeyPrefix = []byte("secure-key-")
// SecureTrie is not safe for concurrent use. // SecureTrie is not safe for concurrent use.
type SecureTrie struct { type SecureTrie struct {
trie Trie trie Trie
hashKeyBuf []byte hashKeyBuf [secureKeyLength]byte
secKeyBuf [200]byte secKeyBuf [200]byte
secKeyCache map[string][]byte secKeyCache map[string][]byte
secKeyCacheOwner *SecureTrie // Pointer to self, replace the key cache on mismatch
} }
// NewSecure creates a trie with an existing root node from db. // NewSecure creates a trie with an existing root node from db.
@ -57,7 +60,6 @@ func NewSecure(root common.Hash, db Database) (*SecureTrie, error) {
} }
return &SecureTrie{ return &SecureTrie{
trie: *trie, trie: *trie,
secKeyCache: make(map[string][]byte),
}, nil }, nil
} }
@ -104,7 +106,7 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error {
if err != nil { if err != nil {
return err return err
} }
t.secKeyCache[string(hk)] = common.CopyBytes(key) t.getSecKeyCache()[string(hk)] = common.CopyBytes(key)
return nil return nil
} }
@ -119,14 +121,14 @@ func (t *SecureTrie) Delete(key []byte) {
// If a node was not found in the database, a MissingNodeError is returned. // If a node was not found in the database, a MissingNodeError is returned.
func (t *SecureTrie) TryDelete(key []byte) error { func (t *SecureTrie) TryDelete(key []byte) error {
hk := t.hashKey(key) hk := t.hashKey(key)
delete(t.secKeyCache, string(hk)) delete(t.getSecKeyCache(), string(hk))
return t.trie.TryDelete(hk) return t.trie.TryDelete(hk)
} }
// GetKey returns the sha3 preimage of a hashed key that was // GetKey returns the sha3 preimage of a hashed key that was
// previously used to store a value. // previously used to store a value.
func (t *SecureTrie) GetKey(shaKey []byte) []byte { func (t *SecureTrie) GetKey(shaKey []byte) []byte {
if key, ok := t.secKeyCache[string(shaKey)]; ok { if key, ok := t.getSecKeyCache()[string(shaKey)]; ok {
return key return key
} }
key, _ := t.trie.db.Get(t.secKey(shaKey)) key, _ := t.trie.db.Get(t.secKey(shaKey))
@ -165,7 +167,7 @@ func (t *SecureTrie) NodeIterator() *NodeIterator {
// the trie's database. Calling code must ensure that the changes made to db are // the trie's database. Calling code must ensure that the changes made to db are
// written back to the trie's attached database before using the trie. // written back to the trie's attached database before using the trie.
func (t *SecureTrie) CommitTo(db DatabaseWriter) (root common.Hash, err error) { func (t *SecureTrie) CommitTo(db DatabaseWriter) (root common.Hash, err error) {
if len(t.secKeyCache) > 0 { if len(t.getSecKeyCache()) > 0 {
for hk, key := range t.secKeyCache { for hk, key := range t.secKeyCache {
if err := db.Put(t.secKey([]byte(hk)), key); err != nil { if err := db.Put(t.secKey([]byte(hk)), key); err != nil {
return common.Hash{}, err return common.Hash{}, err
@ -196,3 +198,14 @@ func (t *SecureTrie) hashKey(key []byte) []byte {
returnHasherToPool(h) returnHasherToPool(h)
return buf return buf
} }
// getSecKeyCache returns the current secure key cache, creating a new one if
// ownership changed (i.e. the current secure trie is a copy of another owning
// the actual cache).
func (t *SecureTrie) getSecKeyCache() map[string][]byte {
if t != t.secKeyCacheOwner {
t.secKeyCacheOwner = t
t.secKeyCache = make(map[string][]byte)
}
return t.secKeyCache
}

@ -18,6 +18,8 @@ package trie
import ( import (
"bytes" "bytes"
"runtime"
"sync"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -31,6 +33,37 @@ func newEmptySecure() *SecureTrie {
return trie return trie
} }
// makeTestSecureTrie creates a large enough secure trie for testing.
func makeTestSecureTrie() (ethdb.Database, *SecureTrie, map[string][]byte) {
// Create an empty trie
db, _ := ethdb.NewMemDatabase()
trie, _ := NewSecure(common.Hash{}, db)
// Fill it with some arbitrary data
content := make(map[string][]byte)
for i := byte(0); i < 255; i++ {
// Map the same data under multiple keys
key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
content[string(key)] = val
trie.Update(key, val)
key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
content[string(key)] = val
trie.Update(key, val)
// Add some other data to inflate th trie
for j := byte(3); j < 13; j++ {
key, val = common.LeftPadBytes([]byte{j, i}, 32), []byte{j, i}
content[string(key)] = val
trie.Update(key, val)
}
}
trie.Commit()
// Return the generated trie
return db, trie, content
}
func TestSecureDelete(t *testing.T) { func TestSecureDelete(t *testing.T) {
trie := newEmptySecure() trie := newEmptySecure()
vals := []struct{ k, v string }{ vals := []struct{ k, v string }{
@ -72,3 +105,41 @@ func TestSecureGetKey(t *testing.T) {
t.Errorf("GetKey returned %q, want %q", k, key) t.Errorf("GetKey returned %q, want %q", k, key)
} }
} }
func TestSecureTrieConcurrency(t *testing.T) {
// Create an initial trie and copy if for concurrent access
_, trie, _ := makeTestSecureTrie()
threads := runtime.NumCPU()
tries := make([]*SecureTrie, threads)
for i := 0; i < threads; i++ {
cpy := *trie
tries[i] = &cpy
}
// Start a batch of goroutines interactng with the trie
pend := new(sync.WaitGroup)
pend.Add(threads)
for i := 0; i < threads; i++ {
go func(index int) {
defer pend.Done()
for j := byte(0); j < 255; j++ {
// Map the same data under multiple keys
key, val := common.LeftPadBytes([]byte{byte(index), 1, j}, 32), []byte{j}
tries[index].Update(key, val)
key, val = common.LeftPadBytes([]byte{byte(index), 2, j}, 32), []byte{j}
tries[index].Update(key, val)
// Add some other data to inflate the trie
for k := byte(3); k < 13; k++ {
key, val = common.LeftPadBytes([]byte{byte(index), k, j}, 32), []byte{k, j}
tries[index].Update(key, val)
}
}
tries[index].Commit()
}(i)
}
// Wait for all threads to finish
pend.Wait()
}

Loading…
Cancel
Save