core: make current*Block atomic, and accessor functions mutex-free (#16171)

* core: make current*Block atomic, and accessor functions mutex-free

* core: fix review concerns

* core: fix error in atomic assignment

* core/light: implement atomic getter/setter for headerchain
pull/16056/merge
Martin Holst Swende 7 years ago committed by Péter Szilágyi
parent d398d04e27
commit f83237573f
  1. 112
      core/blockchain.go
  2. 39
      core/headerchain.go
  3. 6
      light/lightchain.go

@ -107,8 +107,8 @@ type BlockChain struct {
procmu sync.RWMutex // block processor lock
checkpoint int // checkpoint counts towards the new checkpoint
currentBlock *types.Block // Current head of the block chain
currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
@ -224,10 +224,10 @@ func (bc *BlockChain) loadLastState() error {
}
}
// Everything seems to be fine, set as the head block
bc.currentBlock = currentBlock
bc.currentBlock.Store(currentBlock)
// Restore the last known head header
currentHeader := bc.currentBlock.Header()
currentHeader := currentBlock.Header()
if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
currentHeader = header
@ -236,21 +236,23 @@ func (bc *BlockChain) loadLastState() error {
bc.hc.SetCurrentHeader(currentHeader)
// Restore the last known head fast block
bc.currentFastBlock = bc.currentBlock
bc.currentFastBlock.Store(currentBlock)
if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock = block
bc.currentFastBlock.Store(block)
}
}
// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()
headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
blockTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64())
fastTd := bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64())
blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd)
log.Info("Loaded most recent local full block", "number", bc.currentBlock.Number(), "hash", bc.currentBlock.Hash(), "td", blockTd)
log.Info("Loaded most recent local fast block", "number", bc.currentFastBlock.Number(), "hash", bc.currentFastBlock.Hash(), "td", fastTd)
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd)
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd)
return nil
}
@ -279,30 +281,32 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.futureBlocks.Purge()
// Rewind the block chain, ensuring we don't end up with a stateless head block
if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
}
if bc.currentBlock != nil {
if _, err := state.New(bc.currentBlock.Root(), bc.stateCache); err != nil {
if currentBlock := bc.CurrentBlock(); currentBlock != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
bc.currentBlock = nil
bc.currentBlock.Store(bc.genesisBlock)
}
}
// Rewind the fast block in a simpleton way to the target head
if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
}
// If either blocks reached nil, reset to the genesis state
if bc.currentBlock == nil {
bc.currentBlock = bc.genesisBlock
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
}
if bc.currentFastBlock == nil {
bc.currentFastBlock = bc.genesisBlock
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
}
if err := WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()); err != nil {
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
if err := WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
log.Crit("Failed to reset head full block", "err", err)
}
if err := WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()); err != nil {
if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
log.Crit("Failed to reset head fast block", "err", err)
}
return bc.loadLastState()
@ -321,7 +325,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
}
// If all checks out, manually set the head block
bc.mu.Lock()
bc.currentBlock = block
bc.currentBlock.Store(block)
bc.mu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
@ -330,28 +334,19 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// GasLimit returns the gas limit of the current HEAD block.
func (bc *BlockChain) GasLimit() uint64 {
bc.mu.RLock()
defer bc.mu.RUnlock()
return bc.currentBlock.GasLimit()
return bc.CurrentBlock().GasLimit()
}
// CurrentBlock retrieves the current head block of the canonical chain. The
// block is retrieved from the blockchain's internal cache.
func (bc *BlockChain) CurrentBlock() *types.Block {
bc.mu.RLock()
defer bc.mu.RUnlock()
return bc.currentBlock
return bc.currentBlock.Load().(*types.Block)
}
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (bc *BlockChain) CurrentFastBlock() *types.Block {
bc.mu.RLock()
defer bc.mu.RUnlock()
return bc.currentFastBlock
return bc.currentFastBlock.Load().(*types.Block)
}
// SetProcessor sets the processor required for making state modifications.
@ -416,10 +411,10 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
}
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock = bc.genesisBlock
bc.currentBlock.Store(bc.genesisBlock)
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock = bc.genesisBlock
bc.currentFastBlock.Store(bc.genesisBlock)
return nil
}
@ -444,7 +439,7 @@ func (bc *BlockChain) repair(head **types.Block) error {
// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.currentBlock.NumberU64())
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
}
// ExportN writes a subset of the active chain to the given writer.
@ -488,7 +483,7 @@ func (bc *BlockChain) insert(block *types.Block) {
if err := WriteHeadBlockHash(bc.db, block.Hash()); err != nil {
log.Crit("Failed to insert head block hash", "err", err)
}
bc.currentBlock = block
bc.currentBlock.Store(block)
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
@ -497,7 +492,7 @@ func (bc *BlockChain) insert(block *types.Block) {
if err := WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil {
log.Crit("Failed to insert head fast block hash", "err", err)
}
bc.currentFastBlock = block
bc.currentFastBlock.Store(block)
}
}
@ -714,13 +709,15 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
if currentHeader.Hash() == hash {
bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
}
if bc.currentFastBlock.Hash() == hash {
bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash(), bc.currentFastBlock.NumberU64()-1)
WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash())
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
bc.currentFastBlock.Store(newFastBlock)
WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
}
if bc.currentBlock.Hash() == hash {
bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash(), bc.currentBlock.NumberU64()-1)
WriteHeadBlockHash(bc.db, bc.currentBlock.Hash())
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
bc.currentBlock.Store(newBlock)
WriteHeadBlockHash(bc.db, newBlock.Hash())
}
}
}
@ -829,11 +826,12 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
bc.mu.Lock()
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
if bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64()).Cmp(td) < 0 {
currentFastBlock := bc.CurrentFastBlock()
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
if err := WriteHeadFastBlockHash(bc.db, head.Hash()); err != nil {
log.Crit("Failed to update head fast block hash", "err", err)
}
bc.currentFastBlock = head
bc.currentFastBlock.Store(head)
}
}
bc.mu.Unlock()
@ -880,7 +878,8 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
bc.mu.Lock()
defer bc.mu.Unlock()
localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64())
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
// Irrelevant of the canonical status, write the block itself to the database
@ -955,14 +954,15 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
reorg := externTd.Cmp(localTd) > 0
currentBlock = bc.CurrentBlock()
if !reorg && externTd.Cmp(localTd) == 0 {
// Split same-difficulty blocks by number, then at random
reorg = block.NumberU64() < bc.currentBlock.NumberU64() || (block.NumberU64() == bc.currentBlock.NumberU64() && mrand.Float64() < 0.5)
reorg = block.NumberU64() < currentBlock.NumberU64() || (block.NumberU64() == currentBlock.NumberU64() && mrand.Float64() < 0.5)
}
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != bc.currentBlock.Hash() {
if err := bc.reorg(bc.currentBlock, block); err != nil {
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}
@ -1091,7 +1091,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
case err == consensus.ErrPrunedAncestor:
// Block competing with the canonical chain, store in the db, but don't process
// until the competitor TD goes above the canonical TD
localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64())
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
@ -1480,9 +1481,6 @@ func (bc *BlockChain) writeHeader(header *types.Header) error {
// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (bc *BlockChain) CurrentHeader() *types.Header {
bc.mu.RLock()
defer bc.mu.RUnlock()
return bc.hc.CurrentHeader()
}

@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/hashicorp/golang-lru"
"sync/atomic"
)
const (
@ -51,8 +52,8 @@ type HeaderChain struct {
chainDb ethdb.Database
genesisHeader *types.Header
currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)
currentHeader atomic.Value // Current head of the header chain (may be above the block chain!)
currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)
headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties
@ -95,13 +96,13 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
return nil, ErrNoGenesis
}
hc.currentHeader = hc.genesisHeader
hc.currentHeader.Store(hc.genesisHeader)
if head := GetHeadBlockHash(chainDb); head != (common.Hash{}) {
if chead := hc.GetHeaderByHash(head); chead != nil {
hc.currentHeader = chead
hc.currentHeader.Store(chead)
}
}
hc.currentHeaderHash = hc.currentHeader.Hash()
hc.currentHeaderHash = hc.CurrentHeader().Hash()
return hc, nil
}
@ -139,7 +140,7 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
localTd := hc.GetTd(hc.currentHeaderHash, hc.currentHeader.Number.Uint64())
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
externTd := new(big.Int).Add(header.Difficulty, ptd)
// Irrelevant of the canonical status, write the td and header to the database
@ -181,7 +182,8 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
if err := WriteHeadHeaderHash(hc.chainDb, hash); err != nil {
log.Crit("Failed to insert head header hash", "err", err)
}
hc.currentHeaderHash, hc.currentHeader = hash, types.CopyHeader(header)
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
status = CanonStatTy
} else {
@ -383,7 +385,7 @@ func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header {
// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (hc *HeaderChain) CurrentHeader() *types.Header {
return hc.currentHeader
return hc.currentHeader.Load().(*types.Header)
}
// SetCurrentHeader sets the current head header of the canonical chain.
@ -391,7 +393,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
if err := WriteHeadHeaderHash(hc.chainDb, head.Hash()); err != nil {
log.Crit("Failed to insert head header hash", "err", err)
}
hc.currentHeader = head
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
}
@ -403,19 +405,20 @@ type DeleteCallback func(common.Hash, uint64)
// will be deleted and the new one set.
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
height := uint64(0)
if hc.currentHeader != nil {
height = hc.currentHeader.Number.Uint64()
if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64()
}
for hc.currentHeader != nil && hc.currentHeader.Number.Uint64() > head {
hash := hc.currentHeader.Hash()
num := hc.currentHeader.Number.Uint64()
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash()
num := hdr.Number.Uint64()
if delFn != nil {
delFn(hash, num)
}
DeleteHeader(hc.chainDb, hash, num)
DeleteTd(hc.chainDb, hash, num)
hc.currentHeader = hc.GetHeader(hc.currentHeader.ParentHash, hc.currentHeader.Number.Uint64()-1)
hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
@ -426,10 +429,10 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
hc.tdCache.Purge()
hc.numberCache.Purge()
if hc.currentHeader == nil {
hc.currentHeader = hc.genesisHeader
if hc.CurrentHeader() == nil {
hc.currentHeader.Store(hc.genesisHeader)
}
hc.currentHeaderHash = hc.currentHeader.Hash()
hc.currentHeaderHash = hc.CurrentHeader().Hash()
if err := WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash); err != nil {
log.Crit("Failed to reset head header hash", "err", err)

@ -171,9 +171,6 @@ func (bc *LightChain) SetHead(head uint64) {
// GasLimit returns the gas limit of the current HEAD block.
func (self *LightChain) GasLimit() uint64 {
self.mu.RLock()
defer self.mu.RUnlock()
return self.hc.CurrentHeader().GasLimit
}
@ -387,9 +384,6 @@ func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (self *LightChain) CurrentHeader() *types.Header {
self.mu.RLock()
defer self.mu.RUnlock()
return self.hc.CurrentHeader()
}

Loading…
Cancel
Save