eth, eth/downloader: remove parent verification from the downlaoder

pull/974/head
Péter Szilágyi 10 years ago
parent a4246c2da6
commit 3eda70c64c
  1. 14
      eth/downloader/downloader.go
  2. 26
      eth/downloader/downloader_test.go
  3. 10
      eth/downloader/queue.go
  4. 13
      eth/sync.go

@ -144,18 +144,8 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
} }
// TakeBlocks takes blocks from the queue and yields them to the caller. // TakeBlocks takes blocks from the queue and yields them to the caller.
func (d *Downloader) TakeBlocks() (types.Blocks, error) { func (d *Downloader) TakeBlocks() types.Blocks {
// If the head block is missing, no blocks are ready return d.queue.TakeBlocks()
head := d.queue.GetHeadBlock()
if head == nil {
return nil, nil
}
// If the parent hash of the head is unknown, notify the caller
if !d.hasBlock(head.ParentHash()) {
return nil, ErrUnknownParent
}
// Otherwise retrieve a full batch of blocks
return d.queue.TakeBlocks(head), nil
} }
func (d *Downloader) Has(hash common.Hash) bool { func (d *Downloader) Has(hash common.Hash) bool {

@ -197,10 +197,7 @@ func TestTaking(t *testing.T) {
if err != nil { if err != nil {
t.Error("download error", err) t.Error("download error", err)
} }
bs, err := tester.downloader.TakeBlocks() bs := tester.downloader.TakeBlocks()
if err != nil {
t.Fatalf("failed to take blocks: %v", err)
}
if len(bs) != targetBlocks { if len(bs) != targetBlocks {
t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks)
} }
@ -280,8 +277,7 @@ func TestThrottling(t *testing.T) {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
// Take a batch of blocks and accumulate // Take a batch of blocks and accumulate
blocks, _ := tester.downloader.TakeBlocks() took = append(took, tester.downloader.TakeBlocks()...)
took = append(took, blocks...)
} }
done <- struct{}{} done <- struct{}{}
}() }()
@ -315,14 +311,13 @@ func TestNonExistingParentAttack(t *testing.T) {
if err := tester.sync("attack", hashes[0]); err != nil { if err := tester.sync("attack", hashes[0]); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err) t.Fatalf("failed to synchronise blocks: %v", err)
} }
bs, err := tester.downloader.TakeBlocks() bs := tester.downloader.TakeBlocks()
if err != ErrUnknownParent { if len(bs) != 1 {
t.Fatalf("take error mismatch: have %v, want %v", err, ErrUnknownParent) t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
} }
if len(bs) != 0 { if tester.hasBlock(bs[0].ParentHash()) {
t.Error("retrieved block mismatch: have %v, want %v", len(bs), 0) t.Fatalf("tester knows about the unknown hash")
} }
// Cancel the download due to the parent attack
tester.downloader.Cancel() tester.downloader.Cancel()
// Reconstruct a valid chain, and try to synchronize with it // Reconstruct a valid chain, and try to synchronize with it
@ -331,11 +326,8 @@ func TestNonExistingParentAttack(t *testing.T) {
if err := tester.sync("valid", hashes[0]); err != nil { if err := tester.sync("valid", hashes[0]); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err) t.Fatalf("failed to synchronise blocks: %v", err)
} }
bs, err = tester.downloader.TakeBlocks() bs = tester.downloader.TakeBlocks()
if err != nil {
t.Fatalf("failed to retrieve blocks: %v", err)
}
if len(bs) != 1 { if len(bs) != 1 {
t.Error("retrieved block mismatch: have %v, want %v", len(bs), 1) t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
} }
} }

@ -172,17 +172,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block {
} }
// TakeBlocks retrieves and permanently removes a batch of blocks from the cache. // TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
// The head parameter is required to prevent a race condition where concurrent func (q *queue) TakeBlocks() types.Blocks {
// takes may fail parent verifications.
func (q *queue) TakeBlocks(head *types.Block) types.Blocks {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the head block's different // Accumulate all available blocks
if len(q.blockCache) == 0 || q.blockCache[0] != head {
return nil
}
// Otherwise accumulate all available blocks
var blocks types.Blocks var blocks types.Blocks
for _, block := range q.blockCache { for _, block := range q.blockCache {
if block == nil { if block == nil {

@ -40,9 +40,7 @@ func (pm *ProtocolManager) update() {
// Try to pull some blocks from the downloaded // Try to pull some blocks from the downloaded
if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
go func() { go func() {
if err := pm.processBlocks(); err != nil { pm.processBlocks()
pm.downloader.Cancel()
}
atomic.StoreInt32(&blockProcPend, 0) atomic.StoreInt32(&blockProcPend, 0)
}() }()
} }
@ -61,12 +59,8 @@ func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1) pm.wg.Add(1)
defer pm.wg.Done() defer pm.wg.Done()
// Take a batch of blocks, but abort if there's an invalid head or if the chain's empty // Short circuit if no blocks are available for insertion
blocks, err := pm.downloader.TakeBlocks() blocks := pm.downloader.TakeBlocks()
if err != nil {
glog.V(logger.Warn).Infof("Block processing failed: %v", err)
return err
}
if len(blocks) == 0 { if len(blocks) == 0 {
return nil return nil
} }
@ -77,6 +71,7 @@ func (pm *ProtocolManager) processBlocks() error {
_, err := pm.chainman.InsertChain(blocks[:max]) _, err := pm.chainman.InsertChain(blocks[:max])
if err != nil { if err != nil {
glog.V(logger.Warn).Infof("Block insertion failed: %v", err) glog.V(logger.Warn).Infof("Block insertion failed: %v", err)
pm.downloader.Cancel()
return err return err
} }
blocks = blocks[max:] blocks = blocks[max:]

Loading…
Cancel
Save