From c14f0a44712891286b291761fb2d99bd90646234 Mon Sep 17 00:00:00 2001 From: Jeffrey Wilcke Date: Tue, 30 Jun 2015 13:46:37 +0200 Subject: [PATCH 1/2] core: added checkpoint for last block * Add a checkpoint every X blocks * Removed queued write --- core/chain_manager.go | 97 ++++++++++++++++---------------------- core/chain_manager_test.go | 3 +- 2 files changed, 42 insertions(+), 58 deletions(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index 808ccd201f..c89aae3f08 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -11,10 +11,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -23,7 +21,6 @@ import ( "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" "github.com/hashicorp/golang-lru" - "github.com/syndtr/goleveldb/leveldb" ) var ( @@ -40,6 +37,7 @@ const ( blockCacheLimit = 256 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 + checkpointLimit = 200 ) // CalcDifficulty is the difficulty adjustment algorithm. It returns @@ -101,6 +99,7 @@ type ChainManager struct { chainmu sync.RWMutex tsmu sync.RWMutex + checkpoint int // checkpoint counts towards the new checkpoint td *big.Int currentBlock *types.Block lastBlockHash common.Hash @@ -109,9 +108,8 @@ type ChainManager struct { transState *state.StateDB txState *state.ManagedState - cache *lru.Cache // cache is the LRU caching - futureBlocks *lru.Cache // future blocks are blocks added for later processing - pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db + cache *lru.Cache // cache is the LRU caching + futureBlocks *lru.Cache // future blocks are blocks added for later processing quit chan struct{} // procInterrupt must be atomically called @@ -240,15 +238,40 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } +func (bc *ChainManager) recover() bool { + data, _ := bc.blockDb.Get([]byte("checkpoint")) + if len(data) != 0 { + block := bc.GetBlock(common.BytesToHash(data)) + if block != nil { + err := bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) + if err != nil { + glog.Fatalln("db write err:", err) + } + + bc.currentBlock = block + bc.lastBlockHash = block.Hash() + return true + } + } + return false +} + func (bc *ChainManager) setLastState() { data, _ := bc.blockDb.Get([]byte("LastBlock")) if len(data) != 0 { block := bc.GetBlock(common.BytesToHash(data)) if block != nil { + bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes()) + bc.currentBlock = block bc.lastBlockHash = block.Hash() } else { - glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync") + glog.Infof("LastBlock (%x) not found. Recovering...\n", data) + if bc.recover() { + glog.Infof("Recover successful") + } else { + glog.Fatalf("Recover failed. Please report") + } } } else { bc.Reset() @@ -357,6 +380,16 @@ func (bc *ChainManager) insert(block *types.Block) { glog.Fatal("db write fail:", err) } + bc.checkpoint++ + if bc.checkpoint > checkpointLimit { + err = bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes()) + if err != nil { + glog.Fatal("db write fail:", err) + } + + bc.checkpoint = 0 + } + bc.currentBlock = block bc.lastBlockHash = block.Hash() } @@ -387,12 +420,6 @@ func (bc *ChainManager) HasBlock(hash common.Hash) bool { return true } - if bc.pendingBlocks != nil { - if _, exist := bc.pendingBlocks.Get(hash); exist { - return true - } - } - data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...)) return len(data) != 0 } @@ -423,12 +450,6 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block { return block.(*types.Block) } - if self.pendingBlocks != nil { - if block, _ := self.pendingBlocks.Get(hash); block != nil { - return block.(*types.Block) - } - } - data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...)) if len(data) == 0 { return nil @@ -519,31 +540,6 @@ func (self *ChainManager) procFutureBlocks() { } } -func (self *ChainManager) enqueueForWrite(block *types.Block) { - self.pendingBlocks.Add(block.Hash(), block) -} - -func (self *ChainManager) flushQueuedBlocks() { - db, batchWrite := self.blockDb.(*ethdb.LDBDatabase) - batch := new(leveldb.Batch) - for _, key := range self.pendingBlocks.Keys() { - b, _ := self.pendingBlocks.Get(key) - block := b.(*types.Block) - - enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) - key := append(blockHashPre, block.Hash().Bytes()...) - if batchWrite { - batch.Put(key, rle.Compress(enc)) - } else { - self.blockDb.Put(key, enc) - } - } - - if batchWrite { - db.LDB().Write(batch, nil) - } -} - type writeStatus byte const ( @@ -586,15 +582,7 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr status = sideStatTy } - if queued { - // Write block to database. Eventually we'll have to improve on this and throw away blocks that are - // not in the canonical chain. - self.mu.Lock() - self.enqueueForWrite(block) - self.mu.Unlock() - } else { - self.write(block) - } + self.write(block) // Delete from future blocks self.futureBlocks.Remove(block.Hash()) @@ -610,8 +598,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - self.pendingBlocks, _ = lru.New(len(chain)) - // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -629,7 +615,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Start the parallel nonce verifier. go verifyNonces(self.pow, chain, nonceQuit, nonceDone) defer close(nonceQuit) - defer self.flushQueuedBlocks() txcount := 0 for i, block := range chain { diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 8b3ea9e85a..6869bc746d 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -109,8 +109,7 @@ func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) { bman.bc.mu.Lock() { - bman.bc.enqueueForWrite(block) - //bman.bc.write(block) + bman.bc.write(block) } bman.bc.mu.Unlock() } From a748afce0322af35d6031d76bf38afa1f974296a Mon Sep 17 00:00:00 2001 From: Jeffrey Wilcke Date: Tue, 30 Jun 2015 15:42:20 +0200 Subject: [PATCH 2/2] core: txpool listen for ChainHeadEvent instead of ChainEvent Changed the transaction pool to listen for ChainHeadEvent when resetting the state instead of ChainEvent. It makes very little sense to burst through transactions while we are catching up (e.g., have more than one block to process) --- core/transaction_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 6a7012c658..ac9027755e 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -65,7 +65,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( gasLimit: gasLimitFn, minGasPrice: new(big.Int), pendingState: state.ManageState(currentStateFn()), - events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}), + events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}), } go pool.eventLoop() @@ -80,7 +80,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Lock() switch ev := ev.(type) { - case ChainEvent: + case ChainHeadEvent: pool.resetState() case GasPriceChanged: pool.minGasPrice = ev.Price