core: added checkpoint for last block

* Add a checkpoint every X blocks
* Removed queued write
pull/1363/head
Jeffrey Wilcke 9 years ago
parent ba95e445e1
commit c14f0a4471
  1. 97
      core/chain_manager.go
  2. 3
      core/chain_manager_test.go

@ -11,10 +11,8 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
@ -23,7 +21,6 @@ import (
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
"github.com/syndtr/goleveldb/leveldb"
) )
var ( var (
@ -40,6 +37,7 @@ const (
blockCacheLimit = 256 blockCacheLimit = 256
maxFutureBlocks = 256 maxFutureBlocks = 256
maxTimeFutureBlocks = 30 maxTimeFutureBlocks = 30
checkpointLimit = 200
) )
// CalcDifficulty is the difficulty adjustment algorithm. It returns // CalcDifficulty is the difficulty adjustment algorithm. It returns
@ -101,6 +99,7 @@ type ChainManager struct {
chainmu sync.RWMutex chainmu sync.RWMutex
tsmu sync.RWMutex tsmu sync.RWMutex
checkpoint int // checkpoint counts towards the new checkpoint
td *big.Int td *big.Int
currentBlock *types.Block currentBlock *types.Block
lastBlockHash common.Hash lastBlockHash common.Hash
@ -109,9 +108,8 @@ type ChainManager struct {
transState *state.StateDB transState *state.StateDB
txState *state.ManagedState txState *state.ManagedState
cache *lru.Cache // cache is the LRU caching cache *lru.Cache // cache is the LRU caching
futureBlocks *lru.Cache // future blocks are blocks added for later processing futureBlocks *lru.Cache // future blocks are blocks added for later processing
pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db
quit chan struct{} quit chan struct{}
// procInterrupt must be atomically called // procInterrupt must be atomically called
@ -240,15 +238,40 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb self.transState = statedb
} }
func (bc *ChainManager) recover() bool {
data, _ := bc.blockDb.Get([]byte("checkpoint"))
if len(data) != 0 {
block := bc.GetBlock(common.BytesToHash(data))
if block != nil {
err := bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
if err != nil {
glog.Fatalln("db write err:", err)
}
bc.currentBlock = block
bc.lastBlockHash = block.Hash()
return true
}
}
return false
}
func (bc *ChainManager) setLastState() { func (bc *ChainManager) setLastState() {
data, _ := bc.blockDb.Get([]byte("LastBlock")) data, _ := bc.blockDb.Get([]byte("LastBlock"))
if len(data) != 0 { if len(data) != 0 {
block := bc.GetBlock(common.BytesToHash(data)) block := bc.GetBlock(common.BytesToHash(data))
if block != nil { if block != nil {
bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes())
bc.currentBlock = block bc.currentBlock = block
bc.lastBlockHash = block.Hash() bc.lastBlockHash = block.Hash()
} else { } else {
glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync") glog.Infof("LastBlock (%x) not found. Recovering...\n", data)
if bc.recover() {
glog.Infof("Recover successful")
} else {
glog.Fatalf("Recover failed. Please report")
}
} }
} else { } else {
bc.Reset() bc.Reset()
@ -357,6 +380,16 @@ func (bc *ChainManager) insert(block *types.Block) {
glog.Fatal("db write fail:", err) glog.Fatal("db write fail:", err)
} }
bc.checkpoint++
if bc.checkpoint > checkpointLimit {
err = bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes())
if err != nil {
glog.Fatal("db write fail:", err)
}
bc.checkpoint = 0
}
bc.currentBlock = block bc.currentBlock = block
bc.lastBlockHash = block.Hash() bc.lastBlockHash = block.Hash()
} }
@ -387,12 +420,6 @@ func (bc *ChainManager) HasBlock(hash common.Hash) bool {
return true return true
} }
if bc.pendingBlocks != nil {
if _, exist := bc.pendingBlocks.Get(hash); exist {
return true
}
}
data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...)) data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...))
return len(data) != 0 return len(data) != 0
} }
@ -423,12 +450,6 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
return block.(*types.Block) return block.(*types.Block)
} }
if self.pendingBlocks != nil {
if block, _ := self.pendingBlocks.Get(hash); block != nil {
return block.(*types.Block)
}
}
data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...)) data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
if len(data) == 0 { if len(data) == 0 {
return nil return nil
@ -519,31 +540,6 @@ func (self *ChainManager) procFutureBlocks() {
} }
} }
func (self *ChainManager) enqueueForWrite(block *types.Block) {
self.pendingBlocks.Add(block.Hash(), block)
}
func (self *ChainManager) flushQueuedBlocks() {
db, batchWrite := self.blockDb.(*ethdb.LDBDatabase)
batch := new(leveldb.Batch)
for _, key := range self.pendingBlocks.Keys() {
b, _ := self.pendingBlocks.Get(key)
block := b.(*types.Block)
enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
key := append(blockHashPre, block.Hash().Bytes()...)
if batchWrite {
batch.Put(key, rle.Compress(enc))
} else {
self.blockDb.Put(key, enc)
}
}
if batchWrite {
db.LDB().Write(batch, nil)
}
}
type writeStatus byte type writeStatus byte
const ( const (
@ -586,15 +582,7 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr
status = sideStatTy status = sideStatTy
} }
if queued { self.write(block)
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.mu.Lock()
self.enqueueForWrite(block)
self.mu.Unlock()
} else {
self.write(block)
}
// Delete from future blocks // Delete from future blocks
self.futureBlocks.Remove(block.Hash()) self.futureBlocks.Remove(block.Hash())
@ -610,8 +598,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock() self.chainmu.Lock()
defer self.chainmu.Unlock() defer self.chainmu.Unlock()
self.pendingBlocks, _ = lru.New(len(chain))
// A queued approach to delivering events. This is generally // A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex // faster than direct delivery and requires much less mutex
// acquiring. // acquiring.
@ -629,7 +615,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// Start the parallel nonce verifier. // Start the parallel nonce verifier.
go verifyNonces(self.pow, chain, nonceQuit, nonceDone) go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
defer close(nonceQuit) defer close(nonceQuit)
defer self.flushQueuedBlocks()
txcount := 0 txcount := 0
for i, block := range chain { for i, block := range chain {

@ -109,8 +109,7 @@ func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) {
bman.bc.mu.Lock() bman.bc.mu.Lock()
{ {
bman.bc.enqueueForWrite(block) bman.bc.write(block)
//bman.bc.write(block)
} }
bman.bc.mu.Unlock() bman.bc.mu.Unlock()
} }

Loading…
Cancel
Save