Improved chain manager, improved block processor, fixed tests

* ChainManager allows cached future blocks for later processing
* BlockProcessor allows a 4 second window on future blocks
* Fixed tests
pull/632/merge
obscuren 10 years ago
parent 29f120206e
commit e1ed8c33bd
  1. 2
      core/block_processor.go
  2. 3
      core/block_processor_test.go
  3. 1
      core/chain_makers.go
  4. 45
      core/chain_manager.go

@ -260,7 +260,7 @@ func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
} }
// Allow future blocks up to 10 seconds // Allow future blocks up to 10 seconds
if int64(block.Time)+10 > time.Now().Unix() { if int64(block.Time) > time.Now().Unix()+4 {
return BlockFutureErr return BlockFutureErr
} }

@ -22,10 +22,11 @@ func TestNumber(t *testing.T) {
bp, chain := proc() bp, chain := proc()
block1 := chain.NewBlock(common.Address{}) block1 := chain.NewBlock(common.Address{})
block1.Header().Number = big.NewInt(3) block1.Header().Number = big.NewInt(3)
block1.Header().Time--
err := bp.ValidateHeader(block1.Header(), chain.Genesis().Header()) err := bp.ValidateHeader(block1.Header(), chain.Genesis().Header())
if err != BlockNumberErr { if err != BlockNumberErr {
t.Errorf("expected block number error") t.Errorf("expected block number error %v", err)
} }
block1 = chain.NewBlock(common.Address{}) block1 = chain.NewBlock(common.Address{})

@ -109,6 +109,7 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat
// Effectively a fork factory // Effectively a fork factory
func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager { func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager {
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux}
bc.futureBlocks = NewBlockCache(1000)
if block == nil { if block == nil {
bc.Reset() bc.Reset()
} else { } else {

@ -6,6 +6,7 @@ import (
"io" "io"
"math/big" "math/big"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -95,7 +96,8 @@ type ChainManager struct {
transState *state.StateDB transState *state.StateDB
txState *state.ManagedState txState *state.ManagedState
cache *BlockCache cache *BlockCache
futureBlocks *BlockCache
quit chan struct{} quit chan struct{}
} }
@ -107,6 +109,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
// Take ownership of this particular state // Take ownership of this particular state
bc.txState = state.ManageState(bc.State().Copy()) bc.txState = state.ManageState(bc.State().Copy())
bc.futureBlocks = NewBlockCache(254)
bc.makeCache() bc.makeCache()
go bc.update() go bc.update()
@ -433,6 +436,19 @@ type queueEvent struct {
splitCount int splitCount int
} }
func (self *ChainManager) procFutureBlocks() {
self.futureBlocks.mu.Lock()
blocks := make([]*types.Block, len(self.futureBlocks.blocks))
for i, hash := range self.futureBlocks.hashes {
blocks[i] = self.futureBlocks.Get(hash)
}
self.futureBlocks.mu.Unlock()
types.BlockBy(types.Number).Sort(blocks)
self.InsertChain(blocks)
}
func (self *ChainManager) InsertChain(chain types.Blocks) error { func (self *ChainManager) InsertChain(chain types.Blocks) error {
//self.tsmu.Lock() //self.tsmu.Lock()
//defer self.tsmu.Unlock() //defer self.tsmu.Unlock()
@ -452,12 +468,27 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
continue continue
} }
if err == BlockEqualTSErr { block.Td = new(big.Int)
//queue[i] = ChainSideEvent{block, logs} // Do not penelise on future block. We'll need a block queue eventually that will queue
// XXX silently discard it? // future block for future use
if err == BlockFutureErr {
self.futureBlocks.Push(block)
continue
}
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
self.futureBlocks.Push(block)
continue continue
} }
/*
if err == BlockEqualTSErr {
//queue[i] = ChainSideEvent{block, logs}
// XXX silently discard it?
continue
}
*/
h := block.Header() h := block.Header()
chainlogger.Errorf("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes()[:4]) chainlogger.Errorf("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes()[:4])
chainlogger.Errorln(err) chainlogger.Errorln(err)
@ -513,6 +544,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
} }
self.mu.Unlock() self.mu.Unlock()
self.futureBlocks.Delete(block.Hash())
} }
if len(chain) > 0 && glog.V(logger.Info) { if len(chain) > 0 && glog.V(logger.Info) {
@ -527,7 +560,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
func (self *ChainManager) update() { func (self *ChainManager) update() {
events := self.eventMux.Subscribe(queueEvent{}) events := self.eventMux.Subscribe(queueEvent{})
futureTimer := time.NewTicker(5 * time.Second)
out: out:
for { for {
select { select {
@ -553,6 +586,8 @@ out:
self.eventMux.Post(event) self.eventMux.Post(event)
} }
} }
case <-futureTimer.C:
self.procFutureBlocks()
case <-self.quit: case <-self.quit:
break out break out
} }

Loading…
Cancel
Save