// Copyright 2021 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package downloader import ( "encoding/json" "errors" "fmt" "math/big" "os" "sync/atomic" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" ) // hookedBackfiller is a tester backfiller with all interface methods mocked and // hooked so tests can implement only the things they need. type hookedBackfiller struct { // suspendHook is an optional hook to be called when the filler is requested // to be suspended. suspendHook func() // resumeHook is an optional hook to be called when the filler is requested // to be resumed. resumeHook func() } // newHookedBackfiller creates a hooked backfiller with all callbacks disabled, // essentially acting as a noop. func newHookedBackfiller() backfiller { return new(hookedBackfiller) } // suspend requests the backfiller to abort any running full or snap sync // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even // on initial sartup. func (hf *hookedBackfiller) suspend() { if hf.suspendHook != nil { hf.suspendHook() } } // resume requests the backfiller to start running fill or snap sync based on // the skeleton chain as it has successfully been linked. Appending new heads // to the end of the chain will not result in suspend/resume cycles. func (hf *hookedBackfiller) resume() { if hf.resumeHook != nil { hf.resumeHook() } } // skeletonTestPeer is a mock peer that can only serve header requests from a // pre-perated header chain (which may be arbitrarily wrong for testing). // // Requesting anything else from these peers will hard panic. Note, do *not* // implement any other methods. We actually want to make sure that the skeleton // syncer only depends on - and will only ever do so - on header requests. type skeletonTestPeer struct { id string // Unique identifier of the mock peer headers []*types.Header // Headers to serve when requested serve func(origin uint64) []*types.Header // Hook to allow custom responses served uint64 // Number of headers served by this peer dropped uint64 // Flag whether the peer was dropped (stop responding) } // newSkeletonTestPeer creates a new mock peer to test the skeleton sync with. func newSkeletonTestPeer(id string, headers []*types.Header) *skeletonTestPeer { return &skeletonTestPeer{ id: id, headers: headers, } } // newSkeletonTestPeer creates a new mock peer to test the skeleton sync with, // and sets an optional serve hook that can return headers for delivery instead // of the predefined chain. Useful for emulating malicious behavior that would // otherwise require dedicated peer types. func newSkeletonTestPeerWithHook(id string, headers []*types.Header, serve func(origin uint64) []*types.Header) *skeletonTestPeer { return &skeletonTestPeer{ id: id, headers: headers, serve: serve, } } // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) { // Since skeleton test peer are in-memory mocks, dropping the does not make // them inaccepssible. As such, check a local `dropped` field to see if the // peer has been dropped and should not respond any more. if atomic.LoadUint64(&p.dropped) != 0 { return nil, errors.New("peer already dropped") } // Skeleton sync retrieves batches of headers going backward without gaps. // This ensures we can follow a clean parent progression without any reorg // hiccups. There is no need for any other type of header retrieval, so do // panic if there's such a request. if !reverse || skip != 0 { // Note, if other clients want to do these kinds of requests, it's their // problem, it will still work. We just don't want *us* making complicated // requests without a very strong reason to. panic(fmt.Sprintf("invalid header retrieval: reverse %v, want true; skip %d, want 0", reverse, skip)) } // If the skeleton syncer requests the genesis block, panic. Whilst it could // be considered a valid request, our code specifically should not request it // ever since we want to link up headers to an existing local chain, which at // worse will be the genesis. if int64(origin)-int64(amount) < 0 { panic(fmt.Sprintf("headers requested before (or at) genesis: origin %d, amount %d", origin, amount)) } // To make concurrency easier, the skeleton syncer always requests fixed size // batches of headers. Panic if the peer is requested an amount other than the // configured batch size (apart from the request leading to the genesis). if amount > requestHeaders || (amount < requestHeaders && origin > uint64(amount)) { panic(fmt.Sprintf("non-chunk size header batch requested: requested %d, want %d, origin %d", amount, requestHeaders, origin)) } // Simple reverse header retrieval. Fill from the peer's chain and return. // If the tester has a serve hook set, try to use that before falling back // to the default behavior. var headers []*types.Header if p.serve != nil { headers = p.serve(origin) } if headers == nil { headers = make([]*types.Header, 0, amount) if len(p.headers) > int(origin) { // Don't serve headers if we're missing the origin for i := 0; i < amount; i++ { // Consider nil headers as a form of attack and withhold them. Nil // cannot be decoded from RLP, so it's not possible to produce an // attack by sending/receiving those over eth. header := p.headers[int(origin)-i] if header == nil { continue } headers = append(headers, header) } } } atomic.AddUint64(&p.served, uint64(len(headers))) hashes := make([]common.Hash, len(headers)) for i, header := range headers { hashes[i] = header.Hash() } // Deliver the headers to the downloader req := ð.Request{ Peer: p.id, } res := ð.Response{ Req: req, Res: (*eth.BlockHeadersPacket)(&headers), Meta: hashes, Time: 1, Done: make(chan error), } go func() { sink <- res if err := <-res.Done; err != nil { log.Warn("Skeleton test peer response rejected", "err", err) atomic.AddUint64(&p.dropped, 1) } }() return req, nil } func (p *skeletonTestPeer) Head() (common.Hash, *big.Int) { panic("skeleton sync must not request the remote head") } func (p *skeletonTestPeer) RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error) { panic("skeleton sync must not request headers by hash") } func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) { panic("skeleton sync must not request block bodies") } func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) { panic("skeleton sync must not request receipts") } // Tests various sync initialzations based on previous leftovers in the database // and announced heads. func TestSkeletonSyncInit(t *testing.T) { // Create a few key headers var ( genesis = &types.Header{Number: big.NewInt(0)} block49 = &types.Header{Number: big.NewInt(49)} block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")} block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()} ) tests := []struct { headers []*types.Header // Database content (beside the genesis) oldstate []*subchain // Old sync state with various interrupted subchains head *types.Header // New head header to announce to reorg to newstate []*subchain // Expected sync state after the reorg }{ // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. { head: block50, newstate: []*subchain{{Head: 50, Tail: 50}}, }, // Empty database with only the genesis set with a leftover empty sync // progess. This is a synthetic case, just for the sake of covering things. { oldstate: []*subchain{}, head: block50, newstate: []*subchain{{Head: 50, Tail: 50}}, }, // A single leftover subchain is present, older than the new head. The // old subchain should be left as is and a new one appended to the sync // status. { oldstate: []*subchain{{Head: 10, Tail: 5}}, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 10, Tail: 5}, }, }, // Multiple leftover subchains are present, older than the new head. The // old subchains should be left as is and a new one appended to the sync // status. { oldstate: []*subchain{ {Head: 20, Tail: 15}, {Head: 10, Tail: 5}, }, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 20, Tail: 15}, {Head: 10, Tail: 5}, }, }, // A single leftover subchain is present, newer than the new head. The // newer subchain should be deleted and a fresh one created for the head. { oldstate: []*subchain{{Head: 65, Tail: 60}}, head: block50, newstate: []*subchain{{Head: 50, Tail: 50}}, }, // Multiple leftover subchain is present, newer than the new head. The // newer subchains should be deleted and a fresh one created for the head. { oldstate: []*subchain{ {Head: 75, Tail: 70}, {Head: 65, Tail: 60}, }, head: block50, newstate: []*subchain{{Head: 50, Tail: 50}}, }, // Two leftover subchains are present, one fully older and one fully // newer than the announced head. The head should delete the newer one, // keeping the older one. { oldstate: []*subchain{ {Head: 65, Tail: 60}, {Head: 10, Tail: 5}, }, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 10, Tail: 5}, }, }, // Multiple leftover subchains are present, some fully older and some // fully newer than the announced head. The head should delete the newer // ones, keeping the older ones. { oldstate: []*subchain{ {Head: 75, Tail: 70}, {Head: 65, Tail: 60}, {Head: 20, Tail: 15}, {Head: 10, Tail: 5}, }, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 20, Tail: 15}, {Head: 10, Tail: 5}, }, }, // A single leftover subchain is present and the new head is extending // it with one more header. We expect the subchain head to be pushed // forward. { headers: []*types.Header{block49}, oldstate: []*subchain{{Head: 49, Tail: 5}}, head: block50, newstate: []*subchain{{Head: 50, Tail: 5}}, }, // A single leftover subchain is present and although the new head does // extend it number wise, the hash chain does not link up. We expect a // new subchain to be created for the dangling head. { headers: []*types.Header{block49B}, oldstate: []*subchain{{Head: 49, Tail: 5}}, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 49, Tail: 5}, }, }, // A single leftover subchain is present. A new head is announced that // links into the middle of it, correctly anchoring into an existing // header. We expect the old subchain to be truncated and extended with // the new head. { headers: []*types.Header{block49}, oldstate: []*subchain{{Head: 100, Tail: 5}}, head: block50, newstate: []*subchain{{Head: 50, Tail: 5}}, }, // A single leftover subchain is present. A new head is announced that // links into the middle of it, but does not anchor into an existing // header. We expect the old subchain to be truncated and a new chain // be created for the dangling head. { headers: []*types.Header{block49B}, oldstate: []*subchain{{Head: 100, Tail: 5}}, head: block50, newstate: []*subchain{ {Head: 50, Tail: 50}, {Head: 49, Tail: 5}, }, }, } for i, tt := range tests { // Create a fresh database and initialize it with the starting state db := rawdb.NewMemoryDatabase() rawdb.WriteHeader(db, genesis) for _, header := range tt.headers { rawdb.WriteSkeletonHeader(db, header) } if tt.oldstate != nil { blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate}) rawdb.WriteSkeletonSyncStatus(db, blob) } // Create a skeleton sync and run a cycle wait := make(chan struct{}) skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, true) <-wait skeleton.Terminate() // Ensure the correct resulting sync status var progress skeletonProgress json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) if len(progress.Subchains) != len(tt.newstate) { t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) continue } for j := 0; j < len(progress.Subchains); j++ { if progress.Subchains[j].Head != tt.newstate[j].Head { t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) } if progress.Subchains[j].Tail != tt.newstate[j].Tail { t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) } } } } // Tests that a running skeleton sync can be extended with properly linked up // headers but not with side chains. func TestSkeletonSyncExtend(t *testing.T) { // Create a few key headers var ( genesis = &types.Header{Number: big.NewInt(0)} block49 = &types.Header{Number: big.NewInt(49)} block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")} block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()} block51 = &types.Header{Number: big.NewInt(51), ParentHash: block50.Hash()} ) tests := []struct { head *types.Header // New head header to announce to reorg to extend *types.Header // New head header to announce to extend with newstate []*subchain // Expected sync state after the reorg err error // Whether extension succeeds or not }{ // Initialize a sync and try to extend it with a subsequent block. { head: block49, extend: block50, newstate: []*subchain{ {Head: 50, Tail: 49}, }, }, // Initialize a sync and try to extend it with the existing head block. { head: block49, extend: block49, newstate: []*subchain{ {Head: 49, Tail: 49}, }, err: errReorgDenied, }, // Initialize a sync and try to extend it with a sibling block. { head: block49, extend: block49B, newstate: []*subchain{ {Head: 49, Tail: 49}, }, err: errReorgDenied, }, // Initialize a sync and try to extend it with a number-wise sequential // header, but a hash wise non-linking one. { head: block49B, extend: block50, newstate: []*subchain{ {Head: 49, Tail: 49}, }, err: errReorgDenied, }, // Initialize a sync and try to extend it with a non-linking future block. { head: block49, extend: block51, newstate: []*subchain{ {Head: 49, Tail: 49}, }, err: errReorgDenied, }, // Initialize a sync and try to extend it with a past canonical block. { head: block50, extend: block49, newstate: []*subchain{ {Head: 50, Tail: 50}, }, err: errReorgDenied, }, // Initialize a sync and try to extend it with a past sidechain block. { head: block50, extend: block49B, newstate: []*subchain{ {Head: 50, Tail: 50}, }, err: errReorgDenied, }, } for i, tt := range tests { // Create a fresh database and initialize it with the starting state db := rawdb.NewMemoryDatabase() rawdb.WriteHeader(db, genesis) // Create a skeleton sync and run a cycle wait := make(chan struct{}) skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, true) <-wait if err := skeleton.Sync(tt.extend, false); err != tt.err { t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err) } skeleton.Terminate() // Ensure the correct resulting sync status var progress skeletonProgress json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) if len(progress.Subchains) != len(tt.newstate) { t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) continue } for j := 0; j < len(progress.Subchains); j++ { if progress.Subchains[j].Head != tt.newstate[j].Head { t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) } if progress.Subchains[j].Tail != tt.newstate[j].Tail { t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) } } } } // Tests that the skeleton sync correctly retrieves headers from one or more // peers without duplicates or other strange side effects. func TestSkeletonSyncRetrievals(t *testing.T) { log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) // Since skeleton headers don't need to be meaningful, beyond a parent hash // progression, create a long fake chain to test with. chain := []*types.Header{{Number: big.NewInt(0)}} for i := 1; i < 10000; i++ { chain = append(chain, &types.Header{ ParentHash: chain[i-1].Hash(), Number: big.NewInt(int64(i)), }) } tests := []struct { headers []*types.Header // Database content (beside the genesis) oldstate []*subchain // Old sync state with various interrupted subchains head *types.Header // New head header to announce to reorg to peers []*skeletonTestPeer // Initial peer set to start the sync with midstate []*subchain // Expected sync state after initial cycle midserve uint64 // Expected number of header retrievals after initial cycle middrop uint64 // Expectd number of peers dropped after initial cycle newHead *types.Header // New header to annount on top of the old one newPeer *skeletonTestPeer // New peer to join the skeleton syncer endstate []*subchain // Expected sync state after the post-init event endserve uint64 // Expected number of header retrievals after the post-init event enddrop uint64 // Expectd number of peers dropped after the post-init event }{ // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. No peers however, so // the sync should be stuck without any progression. // // When a new peer is added, it should detect the join and fill the headers // to the genesis block. { head: chain[len(chain)-1], midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: uint64(len(chain) - 1)}}, newPeer: newSkeletonTestPeer("test-peer", chain), endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, endserve: uint64(len(chain) - 2), // len - head - genesis }, // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. With one valid peer, // the sync is expected to complete already in the initial round. // // Adding a second peer should not have any effect. { head: chain[len(chain)-1], peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-1", chain)}, midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, midserve: uint64(len(chain) - 2), // len - head - genesis newPeer: newSkeletonTestPeer("test-peer-2", chain), endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, endserve: uint64(len(chain) - 2), // len - head - genesis }, // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. With many valid peers, // the sync is expected to complete already in the initial round. // // Adding a new peer should not have any effect. { head: chain[len(chain)-1], peers: []*skeletonTestPeer{ newSkeletonTestPeer("test-peer-1", chain), newSkeletonTestPeer("test-peer-2", chain), newSkeletonTestPeer("test-peer-3", chain), }, midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, midserve: uint64(len(chain) - 2), // len - head - genesis newPeer: newSkeletonTestPeer("test-peer-4", chain), endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, endserve: uint64(len(chain) - 2), // len - head - genesis }, // This test checks if a peer tries to withhold a header - *on* the sync // boundary - instead of sending the requested amount. The malicious short // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:99]...), nil), chain[100:]...)), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 3, // len - head - genesis - missing middrop: 1, // penalize shortened header deliveries newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test checks if a peer tries to withhold a header - *off* the sync // boundary - instead of sending the requested amount. The malicious short // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:50]...), nil), chain[51:]...)), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 3, // len - head - genesis - missing middrop: 1, // penalize shortened header deliveries newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test checks if a peer tries to duplicate a header - *on* the sync // boundary - instead of sending the correct sequence. The malicious duped // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:99]...), chain[98]), chain[100:]...)), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 2, // len - head - genesis middrop: 1, // penalize invalid header sequences newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test checks if a peer tries to duplicate a header - *off* the sync // boundary - instead of sending the correct sequence. The malicious duped // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:50]...), chain[49]), chain[51:]...)), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 2, // len - head - genesis middrop: 1, // penalize invalid header sequences newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test checks if a peer tries to inject a different header - *on* // the sync boundary - instead of sending the correct sequence. The bad // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-changer", append( append( append([]*types.Header{}, chain[:99]...), &types.Header{ ParentHash: chain[98].Hash(), Number: big.NewInt(int64(99)), GasLimit: 1, }, ), chain[100:]..., ), ), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 2, // len - head - genesis middrop: 1, // different set of headers, drop // TODO(karalabe): maybe just diff sync? newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test checks if a peer tries to inject a different header - *off* // the sync boundary - instead of sending the correct sequence. The bad // package should not be accepted. // // Joining with a new peer should however unblock the sync. { head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-changer", append( append( append([]*types.Header{}, chain[:50]...), &types.Header{ ParentHash: chain[49].Hash(), Number: big.NewInt(int64(50)), GasLimit: 1, }, ), chain[51:]..., ), ), }, midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, midserve: requestHeaders + 101 - 2, // len - head - genesis middrop: 1, // different set of headers, drop newPeer: newSkeletonTestPeer("good-peer", chain), endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis enddrop: 1, // no new drops }, // This test reproduces a bug caught during review (kudos to @holiman) // where a subchain is merged with a previously interrupted one, causing // pending data in the scratch space to become "invalid" (since we jump // ahead during subchain merge). In that case it is expected to ignore // the queued up data instead of trying to process on top of a shifted // task set. // // The test is a bit convoluted since it needs to trigger a concurrency // issue. First we sync up an initial chain of 2x512 items. Then announce // 2x512+2 as head and delay delivering the head batch to fill the scratch // space first. The delivery head should merge with the previous download // and the scratch space must not be consumed further. { head: chain[2*requestHeaders], peers: []*skeletonTestPeer{ newSkeletonTestPeerWithHook("peer-1", chain, func(origin uint64) []*types.Header { if origin == chain[2*requestHeaders+1].Number.Uint64() { time.Sleep(100 * time.Millisecond) } return nil // Fallback to default behavior, just delayed }), newSkeletonTestPeerWithHook("peer-2", chain, func(origin uint64) []*types.Header { if origin == chain[2*requestHeaders+1].Number.Uint64() { time.Sleep(100 * time.Millisecond) } return nil // Fallback to default behavior, just delayed }), }, midstate: []*subchain{{Head: 2 * requestHeaders, Tail: 1}}, midserve: 2*requestHeaders - 1, // len - head - genesis newHead: chain[2*requestHeaders+2], endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}}, endserve: 4 * requestHeaders, }, } for i, tt := range tests { // Create a fresh database and initialize it with the starting state db := rawdb.NewMemoryDatabase() rawdb.WriteHeader(db, chain[0]) // Create a peer set to feed headers through peerset := newPeerSet() for _, peer := range tt.peers { peerset.Register(newPeerConnection(peer.id, eth.ETH66, peer, log.New("id", peer.id))) } // Create a peer dropper to track malicious peers dropped := make(map[string]int) drop := func(peer string) { if p := peerset.Peer(peer); p != nil { atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1) } peerset.Unregister(peer) dropped[peer]++ } // Create a skeleton sync and run a cycle skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller()) skeleton.Sync(tt.head, true) var progress skeletonProgress // Wait a bit (bleah) for the initial sync loop to go to idle. This might // be either a finish or a never-start hence why there's no event to hook. check := func() error { if len(progress.Subchains) != len(tt.midstate) { return fmt.Errorf("test %d, mid state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.midstate)) } for j := 0; j < len(progress.Subchains); j++ { if progress.Subchains[j].Head != tt.midstate[j].Head { return fmt.Errorf("test %d, mid state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.midstate[j].Head) } if progress.Subchains[j].Tail != tt.midstate[j].Tail { return fmt.Errorf("test %d, mid state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.midstate[j].Tail) } } return nil } waitStart := time.Now() for waitTime := 20 * time.Millisecond; time.Since(waitStart) < time.Second; waitTime = waitTime * 2 { time.Sleep(waitTime) // Check the post-init end state if it matches the required results json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) if err := check(); err == nil { break } } if err := check(); err != nil { t.Error(err) continue } var served uint64 for _, peer := range tt.peers { served += atomic.LoadUint64(&peer.served) } if served != tt.midserve { t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) } var drops uint64 for _, peer := range tt.peers { drops += atomic.LoadUint64(&peer.dropped) } if drops != tt.middrop { t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) } // Apply the post-init events if there's any if tt.newHead != nil { skeleton.Sync(tt.newHead, true) } if tt.newPeer != nil { if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { t.Errorf("test %d: failed to register new peer: %v", i, err) } } // Wait a bit (bleah) for the second sync loop to go to idle. This might // be either a finish or a never-start hence why there's no event to hook. check = func() error { if len(progress.Subchains) != len(tt.endstate) { return fmt.Errorf("test %d, end state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.endstate)) } for j := 0; j < len(progress.Subchains); j++ { if progress.Subchains[j].Head != tt.endstate[j].Head { return fmt.Errorf("test %d, end state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.endstate[j].Head) } if progress.Subchains[j].Tail != tt.endstate[j].Tail { return fmt.Errorf("test %d, end state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.endstate[j].Tail) } } return nil } waitStart = time.Now() for waitTime := 20 * time.Millisecond; time.Since(waitStart) < time.Second; waitTime = waitTime * 2 { time.Sleep(waitTime) // Check the post-init end state if it matches the required results json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) if err := check(); err == nil { break } } if err := check(); err != nil { t.Error(err) continue } // Check that the peers served no more headers than we actually needed served = 0 for _, peer := range tt.peers { served += atomic.LoadUint64(&peer.served) } if tt.newPeer != nil { served += atomic.LoadUint64(&tt.newPeer.served) } if served != tt.endserve { t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) } drops = 0 for _, peer := range tt.peers { drops += atomic.LoadUint64(&peer.dropped) } if tt.newPeer != nil { drops += atomic.LoadUint64(&tt.newPeer.dropped) } if drops != tt.middrop { t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) } // Clean up any leftover skeleton sync resources skeleton.Terminate() } }