@ -1,6 +1,7 @@
package eth
import (
"bytes"
"fmt"
"math"
"math/big"
@ -24,8 +25,8 @@ const (
blocksRequestRepetition = 1
blockHashesRequestInterval = 500 // ms
blocksRequestMaxIdleRounds = 100
cacheTimeout = 3 // minute s
blockTimeout = 5 // minute s
blockHashesTimeout = 60 // second s
blocks Timeout = 120 // second s
)
type poolNode struct {
@ -70,9 +71,14 @@ type BlockPool struct {
type peerInfo struct {
lock sync . RWMutex
td * big . Int
currentBlock [ ] byte
id string
td * big . Int
currentBlockHash [ ] byte
currentBlock * types . Block
currentBlockC chan * types . Block
parentHash [ ] byte
headSection * section
headSectionC chan * section
id string
requestBlockHashes func ( [ ] byte ) error
requestBlocks func ( [ ] [ ] byte ) error
@ -203,30 +209,39 @@ func (self *BlockPool) Wait(t time.Duration) {
// AddPeer is called by the eth protocol instance running on the peer after
// the status message has been received with total difficulty and current block hash
// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
func ( self * BlockPool ) AddPeer ( td * big . Int , currentBlock [ ] byte , peerId string , requestBlockHashes func ( [ ] byte ) error , requestBlocks func ( [ ] [ ] byte ) error , peerError func ( int , string , ... interface { } ) ) bool {
func ( self * BlockPool ) AddPeer ( td * big . Int , currentBlockHash [ ] byte , peerId string , requestBlockHashes func ( [ ] byte ) error , requestBlocks func ( [ ] [ ] byte ) error , peerError func ( int , string , ... interface { } ) ) ( best bool ) {
self . peersLock . Lock ( )
defer self . peersLock . Unlock ( )
peer , ok := self . peers [ peerId ]
if ok {
poolLogger . Debugf ( "Update peer %v with td %v and current block %x" , peerId , td , currentBlock [ : 4 ] )
peer . td = td
peer . currentBlock = currentBlock
if bytes . Compare ( peer . currentBlockHash , currentBlockHash ) != 0 {
poolLogger . Debugf ( "Update peer %v with td %v and current block %s" , peerId , td , name ( currentBlockHash ) )
peer . lock . Lock ( )
peer . td = td
peer . currentBlockHash = currentBlockHash
peer . currentBlock = nil
peer . parentHash = nil
peer . headSection = nil
peer . lock . Unlock ( )
}
} else {
peer = & peerInfo {
td : td ,
currentBlock : currentBlock ,
currentBlockHash : currentBlockHash ,
id : peerId , //peer.Identity().Pubkey()
requestBlockHashes : requestBlockHashes ,
requestBlocks : requestBlocks ,
peerError : peerError ,
sections : make ( map [ string ] * section ) ,
currentBlockC : make ( chan * types . Block ) ,
headSectionC : make ( chan * section ) ,
}
self . peers [ peerId ] = peer
poolLogger . Debugf ( "add new peer %v with td %v and current block %x" , peerId , td , currentBlock [ : 4 ] )
poolLogger . Debugf ( "add new peer %v with td %v and current block %x" , peerId , td , currentBlockHash [ : 4 ] )
}
// check peer current head
if self . hasBlock ( currentBlock ) {
if self . hasBlock ( currentBlockHash ) {
// peer not ahead
return false
}
@ -234,22 +249,135 @@ func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string,
if self . peer == peer {
// new block update
// peer is already active best peer, request hashes
poolLogger . Debugf ( "[%s] already the best peer. request hashes from %s" , peerId , name ( currentBlock ) )
peer . requestBlockHashes ( currentBlock )
return true
poolLogger . Debugf ( "[%s] already the best peer. Request new head section info from %s" , peerId , name ( currentBlockHash ) )
peer . headSectionC <- nil
best = true
} else {
currentTD := ethutil . Big0
if self . peer != nil {
currentTD = self . peer . td
}
if td . Cmp ( currentTD ) > 0 {
poolLogger . Debugf ( "peer %v promoted best peer" , peerId )
self . switchPeer ( self . peer , peer )
self . peer = peer
best = true
}
}
return
}
currentTD := ethutil . Big0
if self . peer != nil {
currentTD = self . peer . td
}
if td . Cmp ( currentTD ) > 0 {
poolLogger . Debugf ( "peer %v promoted best peer" , peerId )
self . switchPeer ( self . peer , peer )
self . peer = peer
return true
}
return false
func ( self * BlockPool ) requestHeadSection ( peer * peerInfo ) {
self . wg . Add ( 1 )
self . procWg . Add ( 1 )
poolLogger . Debugf ( "[%s] head section at [%s] requesting info" , peer . id , name ( peer . currentBlockHash ) )
go func ( ) {
var idle bool
peer . lock . RLock ( )
quitC := peer . quitC
currentBlockHash := peer . currentBlockHash
peer . lock . RUnlock ( )
blockHashesRequestTimer := time . NewTimer ( 0 )
blocksRequestTimer := time . NewTimer ( 0 )
suicide := time . NewTimer ( blockHashesTimeout * time . Second )
blockHashesRequestTimer . Stop ( )
defer blockHashesRequestTimer . Stop ( )
defer blocksRequestTimer . Stop ( )
entry := self . get ( currentBlockHash )
if entry != nil {
entry . node . lock . RLock ( )
currentBlock := entry . node . block
entry . node . lock . RUnlock ( )
if currentBlock != nil {
peer . lock . Lock ( )
peer . currentBlock = currentBlock
peer . parentHash = currentBlock . ParentHash ( )
poolLogger . Debugf ( "[%s] head block [%s] found" , peer . id , name ( currentBlockHash ) )
peer . lock . Unlock ( )
blockHashesRequestTimer . Reset ( 0 )
blocksRequestTimer . Stop ( )
}
}
LOOP :
for {
select {
case <- self . quit :
break LOOP
case <- quitC :
poolLogger . Debugf ( "[%s] head section at [%s] incomplete - quit request loop" , peer . id , name ( currentBlockHash ) )
break LOOP
case headSection := <- peer . headSectionC :
peer . lock . Lock ( )
peer . headSection = headSection
if headSection == nil {
oldBlockHash := currentBlockHash
currentBlockHash = peer . currentBlockHash
poolLogger . Debugf ( "[%s] head section changed [%s] -> [%s]" , peer . id , name ( oldBlockHash ) , name ( currentBlockHash ) )
if idle {
idle = false
suicide . Reset ( blockHashesTimeout * time . Second )
self . procWg . Add ( 1 )
}
blocksRequestTimer . Reset ( blocksRequestInterval * time . Millisecond )
} else {
poolLogger . DebugDetailf ( "[%s] head section at [%s] created" , peer . id , name ( currentBlockHash ) )
if ! idle {
idle = true
suicide . Stop ( )
self . procWg . Done ( )
}
}
peer . lock . Unlock ( )
blockHashesRequestTimer . Stop ( )
case <- blockHashesRequestTimer . C :
poolLogger . DebugDetailf ( "[%s] head section at [%s] not found, requesting block hashes" , peer . id , name ( currentBlockHash ) )
peer . requestBlockHashes ( currentBlockHash )
blockHashesRequestTimer . Reset ( blockHashesRequestInterval * time . Millisecond )
case currentBlock := <- peer . currentBlockC :
peer . lock . Lock ( )
peer . currentBlock = currentBlock
peer . parentHash = currentBlock . ParentHash ( )
poolLogger . DebugDetailf ( "[%s] head block [%s] found" , peer . id , name ( currentBlockHash ) )
peer . lock . Unlock ( )
if self . hasBlock ( currentBlock . ParentHash ( ) ) {
if err := self . insertChain ( types . Blocks ( [ ] * types . Block { currentBlock } ) ) ; err != nil {
peer . peerError ( ErrInvalidBlock , "%v" , err )
}
if ! idle {
idle = true
suicide . Stop ( )
self . procWg . Done ( )
}
} else {
blockHashesRequestTimer . Reset ( 0 )
}
blocksRequestTimer . Stop ( )
case <- blocksRequestTimer . C :
peer . lock . RLock ( )
poolLogger . DebugDetailf ( "[%s] head block [%s] not found, requesting" , peer . id , name ( currentBlockHash ) )
peer . requestBlocks ( [ ] [ ] byte { peer . currentBlockHash } )
peer . lock . RUnlock ( )
blocksRequestTimer . Reset ( blocksRequestInterval * time . Millisecond )
case <- suicide . C :
peer . peerError ( ErrInsufficientChainInfo , "peer failed to provide block hashes or head block for block hash %x" , currentBlockHash )
break LOOP
}
}
self . wg . Done ( )
if ! idle {
self . procWg . Done ( )
}
} ( )
}
// RemovePeer is called by the eth protocol when the peer disconnects
@ -274,13 +402,13 @@ func (self *BlockPool) RemovePeer(peerId string) {
newPeer = info
}
}
self . peer = newPeer
self . switchPeer ( peer , newPeer )
if newPeer != nil {
poolLogger . Debugf ( "peer %v with td %v promoted to best peer" , newPeer . id , newPeer . td )
} else {
poolLogger . Warnln ( "no peers" )
}
self . peer = newPeer
self . switchPeer ( peer , newPeer )
}
}
@ -299,25 +427,56 @@ func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string)
return
}
// peer is still the best
poolLogger . Debugf ( "adding hashes for best peer %s" , peerId )
var size , n int
var hash [ ] byte
var ok bool
var section , child , parent * section
var ok , headSection bool
var sec , child , parent * section
var entry * poolEntry
var nodes [ ] * poolNode
bestPeer := peer
hash , ok = next ( )
peer . lock . Lock ( )
if bytes . Compare ( peer . parentHash , hash ) == 0 {
if self . hasBlock ( peer . currentBlockHash ) {
return
}
poolLogger . Debugf ( "adding hashes at chain head for best peer %s starting from [%s]" , peerId , name ( peer . currentBlockHash ) )
headSection = true
if entry := self . get ( peer . currentBlockHash ) ; entry == nil {
node := & poolNode {
hash : peer . currentBlockHash ,
block : peer . currentBlock ,
peer : peerId ,
blockBy : peerId ,
}
if size == 0 {
sec = newSection ( )
}
nodes = append ( nodes , node )
size ++
n ++
} else {
child = entry . section
}
} else {
poolLogger . Debugf ( "adding hashes for best peer %s starting from [%s]" , peerId , name ( hash ) )
}
quitC := peer . quitC
peer . lock . Unlock ( )
LOOP :
// iterate using next (rlp stream lazy decoder) feeding hashesC
for hash , ok = next ( ) ; ok ; hash , ok = next ( ) {
for ; ok ; hash , ok = next ( ) {
n ++
select {
case <- self . quit :
return
case <- peer . quitC :
case <- quitC :
// if the peer is demoted, no more hashes taken
peer = nil
bestP eer = nil
break LOOP
default :
}
@ -325,8 +484,8 @@ LOOP:
// check if known block connecting the downloaded chain to our blockchain
poolLogger . DebugDetailf ( "[%s] known block" , name ( hash ) )
// mark child as absolute pool root with parent known to blockchain
if section != nil {
self . connectToBlockChain ( section )
if sec != nil {
self . connectToBlockChain ( sec )
} else {
if child != nil {
self . connectToBlockChain ( child )
@ -340,6 +499,7 @@ LOOP:
// reached a known chain in the pool
if entry . node == entry . section . bottom && n == 1 {
// the first block hash received is an orphan in the pool, so rejoice and continue
poolLogger . DebugDetailf ( "[%s] connecting child section" , sectionName ( entry . section ) )
child = entry . section
continue LOOP
}
@ -353,7 +513,7 @@ LOOP:
peer : peerId ,
}
if size == 0 {
section = newSection ( )
sec = newSection ( )
}
nodes = append ( nodes , node )
size ++
@ -379,10 +539,10 @@ LOOP:
}
if size > 0 {
self . processSection ( section , nodes )
poolLogger . DebugDetailf ( "[%s]->[%s](%v)->[%s] new chain section" , sectionName ( parent ) , sectionName ( section ) , size , sectionName ( child ) )
self . link ( parent , section )
self . link ( section , child )
self . processSection ( sec , nodes )
poolLogger . DebugDetailf ( "[%s]->[%s](%v)->[%s] new chain section" , sectionName ( parent ) , sectionName ( sec ) , size , sectionName ( child ) )
self . link ( parent , sec )
self . link ( sec , child )
} else {
poolLogger . DebugDetailf ( "[%s]->[%s] connecting known sections" , sectionName ( parent ) , sectionName ( child ) )
self . link ( parent , child )
@ -390,15 +550,31 @@ LOOP:
self . chainLock . Unlock ( )
if parent != nil && p eer != nil {
if parent != nil && bestP eer != nil {
self . activateChain ( parent , peer )
poolLogger . Debugf ( "[%s] activate parent section [%s]" , name ( parent . top . hash ) , sectionName ( parent ) )
}
if section != nil {
peer . addSection ( section . top . hash , section )
section . controlC <- peer
poolLogger . Debugf ( "[%s] activate new section" , sectionName ( section ) )
if sec != nil {
peer . addSection ( sec . top . hash , sec )
// request next section here once, only repeat if bottom block arrives,
// otherwise no way to check if it arrived
peer . requestBlockHashes ( sec . bottom . hash )
sec . controlC <- bestPeer
poolLogger . Debugf ( "[%s] activate new section" , sectionName ( sec ) )
}
if headSection {
var headSec * section
switch {
case sec != nil :
headSec = sec
case child != nil :
headSec = child
default :
headSec = parent
}
peer . headSectionC <- headSec
}
}
@ -426,14 +602,21 @@ func sectionName(section *section) (name string) {
// only the first PoW-valid block for a hash is considered legit
func ( self * BlockPool ) AddBlock ( block * types . Block , peerId string ) {
hash := block . Hash ( )
if self . hasBlock ( hash ) {
poolLogger . DebugDetailf ( "block [%s] already known" , name ( hash ) )
return
}
self . peersLock . Lock ( )
peer := self . peer
self . peersLock . Unlock ( )
entry := self . get ( hash )
if bytes . Compare ( hash , peer . currentBlockHash ) == 0 {
poolLogger . Debugf ( "add head block [%s] for peer %s" , name ( hash ) , peerId )
peer . currentBlockC <- block
} else {
if entry == nil {
poolLogger . Warnf ( "unrequested block [%s] by peer %s" , name ( hash ) , peerId )
self . peerError ( peerId , ErrUnrequestedBlock , "%x" , hash )
}
}
if entry == nil {
poolLogger . Warnf ( "unrequested block [%x] by peer %s" , hash , peerId )
self . peerError ( peerId , ErrUnrequestedBlock , "%x" , hash )
return
}
@ -443,17 +626,21 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
// check if block already present
if node . block != nil {
poolLogger . DebugDetailf ( "block [%x ] already sent by %s" , name ( hash ) , node . blockBy )
poolLogger . DebugDetailf ( "block [%s ] already sent by %s" , name ( hash ) , node . blockBy )
return
}
// validate block for PoW
if ! self . verifyPoW ( block ) {
poolLogger . Warnf ( "invalid pow on block [%x] by peer %s" , hash , peerId )
self . peerError ( peerId , ErrInvalidPoW , "%x" , hash )
return
}
if self . hasBlock ( hash ) {
poolLogger . DebugDetailf ( "block [%s] already known" , name ( hash ) )
} else {
// validate block for PoW
if ! self . verifyPoW ( block ) {
poolLogger . Warnf ( "invalid pow on block [%s] by peer %s" , name ( hash ) , peerId )
self . peerError ( peerId , ErrInvalidPoW , "%x" , hash )
return
}
}
poolLogger . Debugf ( "added block [%s] sent by peer %s" , name ( hash ) , peerId )
node . block = block
node . blockBy = peerId
@ -544,23 +731,23 @@ LOOP:
// - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
// - when turned back on it recursively calls itself on the root of the next chain section
// - when exits, signals to
func ( self * BlockPool ) processSection ( section * section , nodes [ ] * poolNode ) {
func ( self * BlockPool ) processSection ( sec * section , nodes [ ] * poolNode ) {
for i , node := range nodes {
entry := & poolEntry { node : node , section : section , index : i }
entry := & poolEntry { node : node , section : sec , index : i }
self . set ( node . hash , entry )
}
section . bottom = nodes [ len ( nodes ) - 1 ]
section . top = nodes [ 0 ]
section . nodes = nodes
poolLogger . DebugDetailf ( "[%s] setup section process" , sectionName ( section ) )
sec . bottom = nodes [ len ( nodes ) - 1 ]
sec . top = nodes [ 0 ]
sec . nodes = nodes
poolLogger . DebugDetailf ( "[%s] setup section process" , sectionName ( sec ) )
self . wg . Add ( 1 )
go func ( ) {
// absolute time after which sub-chain is killed if not complete (some blocks are missing)
suicideTimer := time . After ( blockTimeout * time . Minute )
suicideTimer := time . After ( blocks Timeout * time . Second )
var peer , newPeer * peerInfo
@ -580,21 +767,23 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
var insertChain bool
var quitC chan bool
var blockChainC = section . blockChainC
var blockChainC = sec . blockChainC
var parentHash [ ] byte
LOOP :
for {
if insertChain {
insertChain = false
rest , err := self . addSectionToBlockChain ( section )
rest , err := self . addSectionToBlockChain ( sec )
if err != nil {
close ( section . suicideC )
close ( sec . suicideC )
continue LOOP
}
if rest == 0 {
blocksRequestsComplete = true
child := self . getChild ( section )
child := self . getChild ( sec )
if child != nil {
self . connectToBlockChain ( child )
}
@ -603,7 +792,7 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
if blockHashesRequestsComplete && blocksRequestsComplete {
// not waiting for hashes any more
poolLogger . Debugf ( "[%s] section complete %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts)" , sectionName ( section ) , depth , blocksRequests , blockHashesRequests )
poolLogger . Debugf ( "[%s] section complete %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts)" , sectionName ( sec ) , depth , blocksRequests , blockHashesRequests )
break LOOP
} // otherwise suicide if no hashes coming
@ -611,11 +800,12 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
// went through all blocks in section
if missing == 0 {
// no missing blocks
poolLogger . DebugDetailf ( "[%s] got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v" , sectionName ( section ) , blocksRequests , missing , lastMissing , depth )
poolLogger . DebugDetailf ( "[%s] got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v" , sectionName ( sec ) , blocksRequests , missing , lastMissing , depth )
blocksRequestsComplete = true
blocksRequestTimer = nil
blocksRequestTime = false
} else {
poolLogger . DebugDetailf ( "[%s] section checked: missing %v/%v/%v" , sectionName ( sec ) , missing , lastMissing , depth )
// some missing blocks
blocksRequests ++
if len ( hashes ) > 0 {
@ -630,8 +820,8 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
idle ++
// too many idle rounds
if idle >= blocksRequestMaxIdleRounds {
poolLogger . DebugDetailf ( "[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up..." , sectionName ( section ) , idle , blocksRequests , missing , lastMissing , depth )
close ( section . suicideC )
poolLogger . DebugDetailf ( "[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up..." , sectionName ( sec ) , idle , blocksRequests , missing , lastMissing , depth )
close ( sec . suicideC )
}
} else {
idle = 0
@ -653,22 +843,39 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
//
if ready && blocksRequestTime && ! blocksRequestsComplete {
poolLogger . DebugDetailf ( "[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v" , sectionName ( section ) , blocksRequests , missing , lastMissing , depth )
poolLogger . DebugDetailf ( "[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v" , sectionName ( sec ) , blocksRequests , missing , lastMissing , depth )
blocksRequestTimer = time . After ( blocksRequestInterval * time . Millisecond )
blocksRequestTime = false
processC = offC
}
if blockHashesRequestTime {
if self . getParent ( section ) != nil {
var parentSection = self . getParent ( sec )
if parentSection == nil {
if parent := self . get ( parentHash ) ; parent != nil {
parentSection = parent . section
self . chainLock . Lock ( )
self . link ( parentSection , sec )
self . chainLock . Unlock ( )
} else {
if self . hasBlock ( parentHash ) {
insertChain = true
blockHashesRequestTime = false
blockHashesRequestTimer = nil
blockHashesRequestsComplete = true
continue LOOP
}
}
}
if parentSection != nil {
// if not root of chain, switch off
poolLogger . DebugDetailf ( "[%s] parent found, hash requests deactivated (after %v total attempts)\n" , sectionName ( section ) , blockHashesRequests )
poolLogger . DebugDetailf ( "[%s] parent found, hash requests deactivated (after %v total attempts)\n" , sectionName ( sec ) , blockHashesRequests )
blockHashesRequestTimer = nil
blockHashesRequestsComplete = true
} else {
blockHashesRequests ++
poolLogger . Debugf ( "[%s] hash request on root (%v total attempts)\n" , sectionName ( section ) , blockHashesRequests )
peer . requestBlockHashes ( section . bottom . hash )
poolLogger . Debugf ( "[%s] hash request on root (%v total attempts)\n" , sectionName ( sec ) , blockHashesRequests )
peer . requestBlockHashes ( sec . bottom . hash )
blockHashesRequestTimer = time . After ( blockHashesRequestInterval * time . Millisecond )
}
blockHashesRequestTime = false
@ -682,27 +889,27 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
// peer quit or demoted, put section in idle mode
quitC = nil
go func ( ) {
section . controlC <- nil
sec . controlC <- nil
} ( )
case <- self . purgeC :
suicideTimer = time . After ( 0 )
case <- suicideTimer :
close ( section . suicideC )
poolLogger . Debugf ( "[%s] timeout. (%v total attempts): missing %v/%v/%v" , sectionName ( section ) , blocksRequests , missing , lastMissing , depth )
close ( sec . suicideC )
poolLogger . Debugf ( "[%s] timeout. (%v total attempts): missing %v/%v/%v" , sectionName ( sec ) , blocksRequests , missing , lastMissing , depth )
case <- section . suicideC :
poolLogger . Debugf ( "[%s] suicide" , sectionName ( section ) )
case <- sec . suicideC :
poolLogger . Debugf ( "[%s] suicide" , sectionName ( sec ) )
// first delink from child and parent under chainlock
self . chainLock . Lock ( )
self . link ( nil , section )
self . link ( section , nil )
self . link ( nil , sec )
self . link ( sec , nil )
self . chainLock . Unlock ( )
// delete node entries from pool index under pool lock
self . lock . Lock ( )
for _ , node := range section . nodes {
for _ , node := range sec . nodes {
delete ( self . pool , string ( node . hash ) )
}
self . lock . Unlock ( )
@ -710,20 +917,20 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
break LOOP
case <- blocksRequestTimer :
poolLogger . DebugDetailf ( "[%s] block request time" , sectionName ( section ) )
poolLogger . DebugDetailf ( "[%s] block request time" , sectionName ( sec ) )
blocksRequestTime = true
case <- blockHashesRequestTimer :
poolLogger . DebugDetailf ( "[%s] hash request time" , sectionName ( section ) )
poolLogger . DebugDetailf ( "[%s] hash request time" , sectionName ( sec ) )
blockHashesRequestTime = true
case newPeer = <- section . controlC :
case newPeer = <- sec . controlC :
// active -> idle
if peer != nil && newPeer == nil {
self . procWg . Done ( )
if init {
poolLogger . Debugf ( "[%s] idle mode (%v total attempts): missing %v/%v/%v" , sectionName ( section ) , blocksRequests , missing , lastMissing , depth )
poolLogger . Debugf ( "[%s] idle mode (%v total attempts): missing %v/%v/%v" , sectionName ( sec ) , blocksRequests , missing , lastMissing , depth )
}
blocksRequestTime = false
blocksRequestTimer = nil
@ -739,11 +946,11 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
if peer == nil && newPeer != nil {
self . procWg . Add ( 1 )
poolLogger . Debugf ( "[%s] active mode" , sectionName ( section ) )
poolLogger . Debugf ( "[%s] active mode" , sectionName ( sec ) )
if ! blocksRequestsComplete {
blocksRequestTime = true
}
if ! blockHashesRequestsComplete {
if ! blockHashesRequestsComplete && parentHash != nil {
blockHashesRequestTime = true
}
if ! init {
@ -753,13 +960,13 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
missing = 0
self . wg . Add ( 1 )
self . procWg . Add ( 1 )
depth = len ( section . nodes )
depth = len ( sec . nodes )
lastMissing = depth
// if not run at least once fully, launch iterator
go func ( ) {
var node * poolNode
IT :
for _ , node = range section . nodes {
for _ , node = range sec . nodes {
select {
case processC <- node :
case <- self . quit :
@ -771,7 +978,7 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
self . procWg . Done ( )
} ( )
} else {
poolLogger . Debugf ( "[%s] restore earlier state" , sectionName ( section ) )
poolLogger . Debugf ( "[%s] restore earlier state" , sectionName ( sec ) )
processC = offC
}
}
@ -781,7 +988,7 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
}
peer = newPeer
case waiter := <- section . forkC :
case waiter := <- sec . forkC :
// this case just blocks the process until section is split at the fork
<- waiter
init = false
@ -794,7 +1001,7 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
init = true
done = true
processC = make ( chan * poolNode , missing )
poolLogger . DebugDetailf ( "[%s] section initalised: missing %v/%v/%v" , sectionName ( section ) , missing , lastMissing , depth )
poolLogger . DebugDetailf ( "[%s] section initalised: missing %v/%v/%v" , sectionName ( sec ) , missing , lastMissing , depth )
continue LOOP
}
if ready {
@ -811,17 +1018,24 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
missing ++
hashes = append ( hashes , node . hash )
if len ( hashes ) == blockBatchSize {
poolLogger . Debugf ( "[%s] request %v missing blocks" , sectionName ( section ) , len ( hashes ) )
poolLogger . Debugf ( "[%s] request %v missing blocks" , sectionName ( sec ) , len ( hashes ) )
self . requestBlocks ( blocksRequests , hashes )
hashes = nil
}
missingC <- node
} else {
if blockChainC == nil && i == lastMissing {
insertChain = true
if i == lastMissing {
if blockChainC == nil {
insertChain = true
} else {
if parentHash == nil {
parentHash = block . ParentHash ( )
poolLogger . Debugf ( "[%s] found root block [%s]" , sectionName ( sec ) , name ( parentHash ) )
blockHashesRequestTime = true
}
}
}
}
poolLogger . Debugf ( "[%s] %v/%v/%v/%v" , sectionName ( section ) , i , missing , lastMissing , depth )
if i == lastMissing && init {
done = true
}
@ -829,23 +1043,22 @@ func (self *BlockPool) processSection(section *section, nodes []*poolNode) {
case <- blockChainC :
// closed blockChain channel indicates that the blockpool is reached
// connected to the blockchain, insert the longest chain of blocks
poolLogger . Debugf ( "[%s] reached blockchain" , sectionName ( section ) )
poolLogger . Debugf ( "[%s] reached blockchain" , sectionName ( sec ) )
blockChainC = nil
// switch off hash requests in case they were on
blockHashesRequestTime = false
blockHashesRequestTimer = nil
blockHashesRequestsComplete = true
// section root has block
if len ( section . nodes ) > 0 && section . nodes [ len ( section . nodes ) - 1 ] . block != nil {
if len ( sec . nodes ) > 0 && sec . nodes [ len ( sec . nodes ) - 1 ] . block != nil {
insertChain = true
}
continue LOOP
} // select
} // for
poolLogger . Debugf ( "[%s] section complete: %v block hashes requests - %v block requests - missing %v/%v/%v" , sectionName ( section ) , blockHashesRequests , blocksRequests , missing , lastMissing , depth )
close ( section . offC )
close ( sec . offC )
self . wg . Done ( )
if peer != nil {
@ -917,22 +1130,28 @@ func (self *peerInfo) addSection(hash []byte, section *section) (found *section)
defer self . lock . Unlock ( )
key := string ( hash )
found = self . sections [ key ]
poolLogger . DebugDetailf ( "[%s] section process %s registered " , sectionName ( section ) , self . id )
poolLogger . DebugDetailf ( "[%s] section process stored for %s " , sectionName ( section ) , self . id )
self . sections [ key ] = section
return
}
func ( self * BlockPool ) switchPeer ( oldPeer , newPeer * peerInfo ) {
if newPeer != nil {
entry := self . get ( newPeer . currentBlock )
if entry == nil {
poolLogger . Debugf ( "[%s] head block [%s] not found, requesting hashes" , newPeer . id , name ( newPeer . currentBlock ) )
newPeer . requestBlockHashes ( newPeer . currentBlock )
} else {
poolLogger . Debugf ( "[%s] head block [%s] found, activate chain at section [%s]" , newPeer . id , name ( newPeer . currentBlock ) , sectionName ( entry . section ) )
self . activateChain ( entry . section , newPeer )
}
newPeer . quitC = make ( chan bool )
poolLogger . DebugDetailf ( "[%s] activate section processes" , newPeer . id )
var addSections [ ] * section
for hash , section := range newPeer . sections {
// split sections get reorganised here
if string ( section . top . hash ) != hash {
addSections = append ( addSections , section )
if entry := self . get ( [ ] byte ( hash ) ) ; entry != nil {
addSections = append ( addSections , entry . section )
}
}
}
for _ , section := range addSections {
newPeer . sections [ string ( section . top . hash ) ] = section
}
for hash , section := range newPeer . sections {
// this will block if section process is waiting for peer lock
select {
@ -940,12 +1159,26 @@ func (self *BlockPool) switchPeer(oldPeer, newPeer *peerInfo) {
poolLogger . DebugDetailf ( "[%s][%x] section process complete - remove" , newPeer . id , hash [ : 4 ] )
delete ( newPeer . sections , hash )
case section . controlC <- newPeer :
poolLogger . DebugDetailf ( "[%s][%x] registered peer with section " , newPeer . id , hash [ : 4 ] )
poolLogger . DebugDetailf ( "[%s][%x] activates section [%s] " , newPeer . id , hash [ : 4 ] , sectionName ( section ) )
}
}
newPeer . quitC = make ( chan bool )
newPeer . lock . Lock ( )
headSection := newPeer . headSection
currentBlockHash := newPeer . currentBlockHash
newPeer . lock . Unlock ( )
if headSection == nil {
poolLogger . DebugDetailf ( "[%s] head section for [%s] not created, requesting info" , newPeer . id , name ( currentBlockHash ) )
self . requestHeadSection ( newPeer )
} else {
if entry := self . get ( currentBlockHash ) ; entry != nil {
headSection = entry . section
}
poolLogger . DebugDetailf ( "[%s] activate chain at head section [%s] for current head [%s]" , newPeer . id , sectionName ( headSection ) , name ( currentBlockHash ) )
self . activateChain ( headSection , newPeer )
}
}
if oldPeer != nil {
poolLogger . DebugDetailf ( "[%s] quit section processes" , oldPeer . id )
close ( oldPeer . quitC )
}
}