diff --git a/eth/handler.go b/eth/handler.go index 7e9ec593a..acc16812a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,15 +18,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 -) - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -57,9 +48,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - quitSync chan struct{} + newPeerCh chan *peer + newHashCh chan []*blockAnnounce + newBlockCh chan chan []*types.Block + quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), + newBlockCh: make(chan chan []*types.Block), quitSync: make(chan struct{}), } @@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return p.sendBlocks(blocks) case BlocksMsg: - var blocks []*types.Block - + // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - - // Either deliver to the downloader or the importer - if self.downloader.Synchronising() { - self.downloader.DeliverBlocks(p.id, blocks) - } else { - for _, block := range blocks { - if err := self.importBlock(p, block, nil); err != nil { - return err + // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) + filter := make(chan []*types.Block) + select { + case <-self.quitSync: + case self.newBlockCh <- filter: + select { + case <-self.quitSync: + case filter <- blocks: + select { + case <-self.quitSync: + case blocks := <-filter: + self.downloader.DeliverBlocks(p.id, blocks) } } } @@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } if len(announces) > 0 { - self.newHashCh <- announces + select { + case self.newHashCh <- announces: + case <-self.quitSync: + } } case NewBlockMsg: diff --git a/eth/sync.go b/eth/sync.go index 1a1cbdb47..f761f3cd1 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,6 +12,16 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 +) + // blockAnnounce is the hash notification of the availability of a new block in // the network. type blockAnnounce struct { @@ -25,6 +35,7 @@ type blockAnnounce struct { func (pm *ProtocolManager) fetcher() { announces := make(map[common.Hash]*blockAnnounce) request := make(map[*peer][]common.Hash) + pending := make(map[common.Hash]*blockAnnounce) cycle := time.Tick(notifyCheckCycle) // Iterate the block fetching until a quit is requested @@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() { } case <-cycle: + // Clean up any expired block fetches + for hash, announce := range pending { + if time.Since(announce.time) > notifyFetchTimeout { + delete(pending, hash) + } + } // Check if any notified blocks failed to arrive for hash, announce := range announces { if time.Since(announce.time) > notifyArriveTimeout { if !pm.chainman.HasBlock(hash) { request[announce.peer] = append(request[announce.peer], hash) + pending[hash] = announce } delete(announces, hash) } @@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() { } request = make(map[*peer][]common.Hash) + case filter := <-pm.newBlockCh: + // Blocks arrived, extract any explicit requests, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-pm.quitSync: + return + } + + fetch, sync := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + if _, ok := pending[hash]; ok { + fetch = append(fetch, block) + } else { + sync = append(sync, block) + } + } + + select { + case filter <- sync: + case <-pm.quitSync: + return + } + // If any explicit fetches were replied to, import them + if len(fetch) > 0 { + go func() { + for _, block := range fetch { + if announce := pending[block.Hash()]; announce != nil { + if err := pm.importBlock(announce.peer, block, nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + } + }() + } + case <-pm.quitSync: return }