From 2f4cbe22f5207b830f2685caae175cce70bcd231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 18 Jun 2015 00:04:57 +0300 Subject: [PATCH 1/3] eth, eth/downloader: fix processing interrupt caused by temp cancel --- eth/downloader/downloader.go | 40 +++++++++++++++---------------- eth/downloader/downloader_test.go | 4 ++-- eth/sync.go | 2 +- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c7a05eb35e..a79eabb3c9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -87,6 +87,8 @@ type Downloader struct { checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain banned *set.Set // Set of hashes we've received and banned + interrupt int32 // Atomic boolean to signal termination + // Statistics importStart time.Time // Instance when the last blocks were taken from the cache importQueue []*Block // Previously taken blocks to check import progress @@ -245,12 +247,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - - // Create cancel channel for aborting mid-flight - d.cancelLock.Lock() - d.cancelCh = make(chan struct{}) - d.cancelLock.Unlock() - // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue @@ -260,12 +256,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { d.peers.Reset() d.checks = make(map[common.Hash]*crossCheck) + // Create cancel channel for aborting mid-flight + d.cancelLock.Lock() + d.cancelCh = make(chan struct{}) + d.cancelLock.Unlock() + // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) if p == nil { return errUnknownPeer } - return d.syncWithPeer(p, hash) } @@ -282,7 +282,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { defer func() { // reset on error if err != nil { - d.Cancel() + d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -301,9 +301,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { return nil } -// Cancel cancels all of the operations and resets the queue. It returns true +// cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) Cancel() { +func (d *Downloader) cancel() { // Close the current cancel channel d.cancelLock.Lock() if d.cancelCh != nil { @@ -320,6 +320,12 @@ func (d *Downloader) Cancel() { d.queue.Reset() } +// Terminate interrupts the downloader, canceling all pending operations. +func (d *Downloader) Terminate() { + atomic.StoreInt32(&d.interrupt, 1) + d.cancel() +} + // fetchHahes starts retrieving hashes backwards from a specific peer and hash, // up until it finds a common ancestor. If the source peer times out, alternative // ones are tried for continuation. @@ -737,12 +743,6 @@ func (d *Downloader) process() (err error) { atomic.StoreInt32(&d.processing, 0) }() - - // Fetch the current cancel channel to allow termination - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - // Repeat the processing as long as there are blocks to import for { // Fetch the next batch of blocks @@ -759,12 +759,10 @@ func (d *Downloader) process() (err error) { // Actually import the blocks glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - for len(blocks) != 0 { // TODO: quit + for len(blocks) != 0 { // Check for any termination requests - select { - case <-cancel: + if atomic.LoadInt32(&d.interrupt) == 1 { return errCancelChainImport - default: } // Retrieve the first batch of blocks to insert max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) @@ -777,7 +775,7 @@ func (d *Downloader) process() (err error) { if err != nil { glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) d.dropPeer(blocks[index].OriginPeer) - d.Cancel() + d.cancel() return errCancelChainImport } blocks = blocks[max:] diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 53eb5f81d5..f97e6077b1 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -247,7 +247,7 @@ func TestCancel(t *testing.T) { tester.newPeer("peer", hashes, blocks) // Make sure canceling works with a pristine downloader - tester.downloader.Cancel() + tester.downloader.cancel() hashCount, blockCount := tester.downloader.queue.Size() if hashCount > 0 || blockCount > 0 { t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) @@ -256,7 +256,7 @@ func TestCancel(t *testing.T) { if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - tester.downloader.Cancel() + tester.downloader.cancel() hashCount, blockCount = tester.downloader.queue.Size() if hashCount > 0 || blockCount > 0 { t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) diff --git a/eth/sync.go b/eth/sync.go index a3b177a4d8..751bc1a2af 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -251,7 +251,7 @@ func (pm *ProtocolManager) fetcher() { // downloading hashes and blocks as well as retrieving cached ones. func (pm *ProtocolManager) syncer() { // Abort any pending syncs if we terminate - defer pm.downloader.Cancel() + defer pm.downloader.Terminate() forceSync := time.Tick(forceSyncCycle) for { From 55dd8fd6216e8880f441975f32eb070be5d401a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 18 Jun 2015 00:26:54 +0300 Subject: [PATCH 2/3] eth/downloader: always reenter processing if not exiting --- eth/downloader/downloader.go | 45 +++++++++++++++---------------- eth/downloader/downloader_test.go | 31 +++++++++++---------- 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a79eabb3c9..9866a5b46d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -33,23 +33,22 @@ var ( ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errBannedHead = errors.New("peer head hash already banned") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errCrossCheckFailed = errors.New("block cross-check failed") - errCancelHashFetch = errors.New("hash fetching canceled (requested)") - errCancelBlockFetch = errors.New("block downloading canceled (requested)") - errCancelChainImport = errors.New("chain importing canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errBannedHead = errors.New("peer head hash already banned") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") + errCancelHashFetch = errors.New("hash fetching canceled (requested)") + errCancelBlockFetch = errors.New("block downloading canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) // hashCheckFn is a callback type for verifying a hash's presence in the local chain. @@ -719,7 +718,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { // between these state changes, a block may have arrived, but a processing // attempt denied, so we need to re-enter to ensure the block isn't left // to idle in the cache. -func (d *Downloader) process() (err error) { +func (d *Downloader) process() { // Make sure only one goroutine is ever allowed to process blocks at once if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { return @@ -729,8 +728,8 @@ func (d *Downloader) process() (err error) { // the fresh blocks might have been rejected entry to to this present thread // not yet releasing the `processing` state. defer func() { - if err == nil && d.queue.GetHeadBlock() != nil { - err = d.process() + if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil { + d.process() } }() // Release the lock upon exit (note, before checking for reentry!), and set @@ -748,7 +747,7 @@ func (d *Downloader) process() (err error) { // Fetch the next batch of blocks blocks := d.queue.TakeBlocks() if len(blocks) == 0 { - return nil + return } // Reset the import statistics d.importLock.Lock() @@ -762,7 +761,7 @@ func (d *Downloader) process() (err error) { for len(blocks) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return errCancelChainImport + return } // Retrieve the first batch of blocks to insert max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) @@ -776,7 +775,7 @@ func (d *Downloader) process() (err error) { glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) d.dropPeer(blocks[index].OriginPeer) d.cancel() - return errCancelChainImport + return } blocks = blocks[max:] } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index f97e6077b1..40f77e7dba 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -749,22 +749,21 @@ func TestHashAttackerDropping(t *testing.T) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() From 4365668462a1f3b1f637388ff33b714529536f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 18 Jun 2015 00:42:02 +0300 Subject: [PATCH 3/3] eth/downloader: extend slow test to fix even slower CI server... --- eth/downloader/downloader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 40f77e7dba..484cc32186 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -359,7 +359,7 @@ func TestSlowSynchronisation(t *testing.T) { // Create a batch of blocks, with a slow and a full speed peer targetCycles := 2 targetBlocks := targetCycles*blockCacheLimit - 15 - targetIODelay := 500 * time.Millisecond + targetIODelay := time.Second hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes)