|
|
|
@ -3,7 +3,6 @@ package fetcher |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"math" |
|
|
|
|
"math/rand" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
@ -57,8 +56,9 @@ type inject struct { |
|
|
|
|
type Fetcher struct { |
|
|
|
|
// Various event channels
|
|
|
|
|
notify chan *announce |
|
|
|
|
insert chan *inject |
|
|
|
|
inject chan *inject |
|
|
|
|
filter chan chan []*types.Block |
|
|
|
|
done chan common.Hash |
|
|
|
|
quit chan struct{} |
|
|
|
|
|
|
|
|
|
// Announce states
|
|
|
|
@ -79,8 +79,9 @@ type Fetcher struct { |
|
|
|
|
func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher { |
|
|
|
|
return &Fetcher{ |
|
|
|
|
notify: make(chan *announce), |
|
|
|
|
insert: make(chan *inject), |
|
|
|
|
inject: make(chan *inject), |
|
|
|
|
filter: make(chan chan []*types.Block), |
|
|
|
|
done: make(chan common.Hash), |
|
|
|
|
quit: make(chan struct{}), |
|
|
|
|
announced: make(map[common.Hash][]*announce), |
|
|
|
|
fetching: make(map[common.Hash]*announce), |
|
|
|
@ -128,7 +129,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { |
|
|
|
|
block: block, |
|
|
|
|
} |
|
|
|
|
select { |
|
|
|
|
case f.insert <- op: |
|
|
|
|
case f.inject <- op: |
|
|
|
|
return nil |
|
|
|
|
case <-f.quit: |
|
|
|
|
return errTerminated |
|
|
|
@ -166,8 +167,6 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { |
|
|
|
|
func (f *Fetcher) loop() { |
|
|
|
|
// Iterate the block fetching until a quit is requested
|
|
|
|
|
fetch := time.NewTimer(0) |
|
|
|
|
done := make(chan common.Hash) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
// Clean up any expired block fetches
|
|
|
|
|
for hash, announce := range f.fetching { |
|
|
|
@ -179,27 +178,19 @@ func (f *Fetcher) loop() { |
|
|
|
|
// Import any queued blocks that could potentially fit
|
|
|
|
|
height := f.chainHeight() |
|
|
|
|
for !f.queue.Empty() { |
|
|
|
|
// If too high up the chain, continue later
|
|
|
|
|
op := f.queue.PopItem().(*inject) |
|
|
|
|
if number := op.block.NumberU64(); number > height+1 { |
|
|
|
|
number := op.block.NumberU64() |
|
|
|
|
|
|
|
|
|
// If too high up the chain or phase, continue later
|
|
|
|
|
if number > height+1 { |
|
|
|
|
f.queue.Push(op, -float32(op.block.NumberU64())) |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// Otherwise if not known yet, try and import
|
|
|
|
|
hash := op.block.Hash() |
|
|
|
|
if f.hasBlock(hash) { |
|
|
|
|
// Otherwise if fresh and still unknown, try and import
|
|
|
|
|
if number <= height || f.hasBlock(op.block.Hash()) { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Block may just fit, try to import it
|
|
|
|
|
glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", op.origin, op.block.NumberU64(), hash.Bytes()[:4]) |
|
|
|
|
go func() { |
|
|
|
|
defer func() { done <- hash }() |
|
|
|
|
|
|
|
|
|
if err := f.importBlock(op.origin, op.block); err != nil { |
|
|
|
|
glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", op.origin, op.block.NumberU64(), hash.Bytes()[:4], err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
f.insert(op.origin, op.block) |
|
|
|
|
} |
|
|
|
|
// Wait for an outside event to occur
|
|
|
|
|
select { |
|
|
|
@ -209,7 +200,6 @@ func (f *Fetcher) loop() { |
|
|
|
|
|
|
|
|
|
case notification := <-f.notify: |
|
|
|
|
// A block was announced, schedule if it's not yet downloading
|
|
|
|
|
glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) |
|
|
|
|
if _, ok := f.fetching[notification.hash]; ok { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
@ -218,11 +208,11 @@ func (f *Fetcher) loop() { |
|
|
|
|
f.reschedule(fetch) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case op := <-f.insert: |
|
|
|
|
case op := <-f.inject: |
|
|
|
|
// A direct block insertion was requested, try and fill any pending gaps
|
|
|
|
|
f.enqueue(op.origin, op.block) |
|
|
|
|
|
|
|
|
|
case hash := <-done: |
|
|
|
|
case hash := <-f.done: |
|
|
|
|
// A pending import finished, remove all traces of the notification
|
|
|
|
|
delete(f.announced, hash) |
|
|
|
|
delete(f.fetching, hash) |
|
|
|
@ -243,8 +233,7 @@ func (f *Fetcher) loop() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Send out all block requests
|
|
|
|
|
for peer, hashes := range request { |
|
|
|
|
glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) |
|
|
|
|
for _, hashes := range request { |
|
|
|
|
go f.fetching[hashes[0]].fetch(hashes) |
|
|
|
|
} |
|
|
|
|
// Schedule the next fetch if blocks are still pending
|
|
|
|
@ -304,7 +293,6 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { |
|
|
|
|
earliest = announces[0].time |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
glog.V(logger.Detail).Infof("Scheduling next fetch in %v", arriveTimeout-time.Since(earliest)) |
|
|
|
|
fetch.Reset(arriveTimeout - time.Since(earliest)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -313,9 +301,9 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { |
|
|
|
|
func (f *Fetcher) enqueue(peer string, block *types.Block) { |
|
|
|
|
hash := block.Hash() |
|
|
|
|
|
|
|
|
|
// Make sure the block isn't in some weird place
|
|
|
|
|
if math.Abs(float64(f.chainHeight())-float64(block.NumberU64())) > maxQueueDist { |
|
|
|
|
glog.Infof("Peer %s: discarded block #%d [%x] too far from head", peer, block.NumberU64(), hash.Bytes()[:4]) |
|
|
|
|
// Discard any past or too distant blocks
|
|
|
|
|
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist <= 0 || dist > maxQueueDist { |
|
|
|
|
glog.Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Schedule the block for future importing
|
|
|
|
@ -328,3 +316,22 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// insert spawns a new goroutine to run a block insertion into the chain. If the
|
|
|
|
|
// block's number is at the same height as the current import phase, if updates
|
|
|
|
|
// the phase states accordingly.
|
|
|
|
|
func (f *Fetcher) insert(peer string, block *types.Block) { |
|
|
|
|
hash := block.Hash() |
|
|
|
|
|
|
|
|
|
// Run the import on a new thread
|
|
|
|
|
glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", peer, block.NumberU64(), hash[:4]) |
|
|
|
|
go func() { |
|
|
|
|
defer func() { f.done <- hash }() |
|
|
|
|
|
|
|
|
|
// Run the actual import and log any issues
|
|
|
|
|
if err := f.importBlock(peer, block); err != nil { |
|
|
|
|
glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", peer, block.NumberU64(), hash[:4], err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|