From e8b22b9253da400cc350dbc673d07789f93f57bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 08:07:58 +0300 Subject: [PATCH 1/3] eth/downloader: prevent a peer from dripping bad hashes --- eth/downloader/downloader.go | 20 ++++++++++++-------- eth/downloader/downloader_test.go | 25 ++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d817b223cc..f3a8664414 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,10 +28,11 @@ var ( ) var ( - errLowTd = errors.New("peer's TD is too low") + errLowTd = errors.New("peers TD is too low") ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") ErrBadPeer = errors.New("action from bad peer ignored") + ErrStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") @@ -283,15 +284,18 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return ErrBadPeer } if !done { + // Check that the peer is not stalling the sync + if len(inserts) < maxHashFetch { + return ErrStallingPeer + } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch - if len(inserts) > 1 { - cross := inserts[rand.Intn(len(inserts)-1)] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + cross := inserts[rand.Intn(len(inserts)-1)] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + + d.checks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) - } // Also fetch a fresh active.getHashes(head) continue diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 19d64ac67f..8ed3289c69 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -53,6 +53,8 @@ type downloadTester struct { blocks map[common.Hash]*types.Block // Blocks associated with the hashes chain []common.Hash // Block-chain being constructed + maxHashFetch int // Overrides the maximum number of retrieved hashes + t *testing.T pcount int done chan bool @@ -133,8 +135,12 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { // getHashes retrieves a batch of hashes for reconstructing the chain. func (dl *downloadTester) getHashes(head common.Hash) error { + limit := maxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } // Gather the next batch of hashes - hashes := make([]common.Hash, 0, maxHashFetch) + hashes := make([]common.Hash, 0, limit) for i, hash := range dl.hashes { if hash == head { i++ @@ -469,6 +475,23 @@ func TestMadeupHashChainAttack(t *testing.T) { } } +// Tests that if a malicious peer makes up a random hash chain, and tries to push +// indefinitely, one hash at a time, it actually gets caught with it. The reason +// this is separate from the classical made up chain attack is that sending hashes +// one by one prevents reliable block/parent verification. +func TestMadeupHashChainDrippingAttack(t *testing.T) { + // Create a random chain of hashes to drip + hashes := createHashes(0, 16*blockCacheLimit) + tester := newTester(t, hashes, nil) + + // Try and sync with the attacker, one hash at a time + tester.maxHashFetch = 1 + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + } +} + // Tests that if a malicious peer makes up a random block chain, and tried to // push indefinitely, it actually gets caught with it. func TestMadeupBlockChainAttack(t *testing.T) { From 52db6d8be577669bd5ba659ac223acf61956b05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 08:37:27 +0300 Subject: [PATCH 2/3] eth/downloader: circumvent a forged block chain with known parent attack --- eth/downloader/downloader.go | 33 +++++++++++++++++----------- eth/downloader/downloader_test.go | 36 ++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f3a8664414..f0629a551e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -61,13 +61,18 @@ type hashPack struct { hashes []common.Hash } +type crossCheck struct { + expire time.Time + parent common.Hash +} + type Downloader struct { mux *event.TypeMux mu sync.RWMutex - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed - checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain // Callbacks hasBlock hashCheckFn @@ -158,7 +163,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() - d.checks = make(map[common.Hash]time.Time) + d.checks = make(map[common.Hash]*crossCheck) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -290,11 +295,15 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch - cross := inserts[rand.Intn(len(inserts)-1)] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + cross := rand.Intn(len(inserts) - 1) + origin, parent := inserts[cross], inserts[cross+1] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent) - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) + d.checks[origin] = &crossCheck{ + expire: time.Now().Add(blockTTL), + parent: parent, + } + active.getBlocks([]common.Hash{origin}) // Also fetch a fresh active.getHashes(head) @@ -314,8 +323,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } block := blockPack.blocks[0] - if _, ok := d.checks[block.Hash()]; ok { - if !d.queue.Has(block.ParentHash()) { + if check, ok := d.checks[block.Hash()]; ok { + if block.ParentHash() != check.parent { return ErrCrossCheckFailed } delete(d.checks, block.Hash()) @@ -323,8 +332,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case <-crossTicker.C: // Iterate over all the cross checks and fail the hash chain if they're not verified - for hash, deadline := range d.checks { - if time.Now().After(deadline) { + for hash, check := range d.checks { + if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) return ErrCrossCheckFailed } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8ed3289c69..d623a7c767 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -502,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 32*blockCacheLimit) + hashes := createHashes(0, 16*blockCacheLimit) blocks := createBlocksFromHashes(hashes) gapped := make([]common.Hash, len(hashes)/2) @@ -525,3 +525,37 @@ func TestMadeupBlockChainAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Advanced form of the above forged blockchain attack, where not only does the +// attacker make up a valid hashes for random blocks, but also forges the block +// parents to point to existing hashes. +func TestMadeupParentBlockChainAttack(t *testing.T) { + defaultBlockTTL := blockTTL + defaultCrossCheckCycle := crossCheckCycle + + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of blocks and simulate an invalid chain by dropping every second + hashes := createHashes(0, 16*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + forges := createBlocksFromHashes(hashes) + for hash, block := range forges { + block.ParentHeaderHash = hash // Simulate pointing to already known hash + } + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, forges) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + blockTTL = defaultBlockTTL + crossCheckCycle = defaultCrossCheckCycle + + tester.blocks = blocks + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} From 06a041589f3c2d4b3e66a1ce51e3e03e209fdbff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 18:16:04 +0300 Subject: [PATCH 3/3] eth, eth/downloader: remove duplicate consts, bump hash fetch to 2K --- eth/downloader/downloader.go | 10 ++++++---- eth/downloader/downloader_test.go | 2 +- eth/downloader/queue.go | 2 +- eth/handler.go | 6 +++--- eth/peer.go | 5 +++-- eth/protocol.go | 2 -- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f0629a551e..fd588d2f30 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -15,8 +15,10 @@ import ( ) const ( - maxHashFetch = 512 // Amount of hashes to be fetched per chunk - maxBlockFetch = 128 // Amount of blocks to be fetched per chunk + MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling + MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) @@ -290,7 +292,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } if !done { // Check that the peer is not stalling the sync - if len(inserts) < maxHashFetch { + if len(inserts) < MinHashFetch { return ErrStallingPeer } // Try and fetch a random block to verify the hash batch @@ -451,7 +453,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer, maxBlockFetch) + request := d.queue.Reserve(peer, MaxBlockFetch) if request == nil { continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d623a7c767..6299ea1622 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -135,7 +135,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { // getHashes retrieves a batch of hashes for reconstructing the chain. func (dl *downloadTester) getHashes(head common.Hash) error { - limit := maxHashFetch + limit := MaxHashFetch if dl.maxHashFetch > 0 { limit = dl.maxHashFetch } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 13ec9a5209..591a37773d 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -17,7 +17,7 @@ import ( ) const ( - blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download ) // fetchRequest is a currently running block retrieval operation. diff --git a/eth/handler.go b/eth/handler.go index 835097d843..63b1d50f61 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -206,8 +206,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "->msg %v: %v", msg, err) } - if request.Amount > maxHashes { - request.Amount = maxHashes + if request.Amount > downloader.MaxHashFetch { + request.Amount = downloader.MaxHashFetch } hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) @@ -254,7 +254,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if block != nil { blocks = append(blocks, block) } - if i == maxBlocks { + if i == downloader.MaxBlockFetch { break } } diff --git a/eth/peer.go b/eth/peer.go index a23449acd3..1ef032d384 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -100,8 +101,8 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { } func (p *peer) requestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) + glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) } func (p *peer) requestBlocks(hashes []common.Hash) error { diff --git a/eth/protocol.go b/eth/protocol.go index 48f37b59c9..948051ed1f 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -12,8 +12,6 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - maxHashes = 512 - maxBlocks = 128 ) // eth protocol message codes