|
|
@ -522,9 +522,7 @@ out: |
|
|
|
peer.Promote() |
|
|
|
peer.Promote() |
|
|
|
peer.SetIdle() |
|
|
|
peer.SetIdle() |
|
|
|
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) |
|
|
|
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) |
|
|
|
if atomic.LoadInt32(&d.processing) == 0 { |
|
|
|
go d.process() |
|
|
|
go d.process() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case errInvalidChain: |
|
|
|
case errInvalidChain: |
|
|
|
// The hash chain is invalid (blocks are not ordered properly), abort
|
|
|
|
// The hash chain is invalid (blocks are not ordered properly), abort
|
|
|
@ -701,6 +699,19 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// process takes blocks from the queue and tries to import them into the chain.
|
|
|
|
// process takes blocks from the queue and tries to import them into the chain.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// The algorithmic flow is as follows:
|
|
|
|
|
|
|
|
// - The `processing` flag is swapped to 1 to ensure singleton access
|
|
|
|
|
|
|
|
// - The current `cancel` channel is retrieved to detect sync abortions
|
|
|
|
|
|
|
|
// - Blocks are iteratively taken from the cache and inserted into the chain
|
|
|
|
|
|
|
|
// - When the cache becomes empty, insertion stops
|
|
|
|
|
|
|
|
// - The `processing` flag is swapped back to 0
|
|
|
|
|
|
|
|
// - A post-exit check is made whether new blocks became available
|
|
|
|
|
|
|
|
// - This step is important: it handles a potential race condition between
|
|
|
|
|
|
|
|
// checking for no more work, and releasing the processing "mutex". In
|
|
|
|
|
|
|
|
// between these state changes, a block may have arrived, but a processing
|
|
|
|
|
|
|
|
// attempt denied, so we need to re-enter to ensure the block isn't left
|
|
|
|
|
|
|
|
// to idle in the cache.
|
|
|
|
func (d *Downloader) process() (err error) { |
|
|
|
func (d *Downloader) process() (err error) { |
|
|
|
// Make sure only one goroutine is ever allowed to process blocks at once
|
|
|
|
// Make sure only one goroutine is ever allowed to process blocks at once
|
|
|
|
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { |
|
|
|
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { |
|
|
@ -763,7 +774,7 @@ func (d *Downloader) process() (err error) { |
|
|
|
// Try to inset the blocks, drop the originating peer if there's an error
|
|
|
|
// Try to inset the blocks, drop the originating peer if there's an error
|
|
|
|
index, err := d.insertChain(raw) |
|
|
|
index, err := d.insertChain(raw) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err) |
|
|
|
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) |
|
|
|
d.dropPeer(blocks[index].OriginPeer) |
|
|
|
d.dropPeer(blocks[index].OriginPeer) |
|
|
|
d.Cancel() |
|
|
|
d.Cancel() |
|
|
|
return errCancelChainImport |
|
|
|
return errCancelChainImport |
|
|
|