eth, eth/downloader: remove references to LightChain, LightSync (#29711)

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
pull/29603/head
jwasinger 4 months ago committed by GitHub
parent af0a3274be
commit e517183719
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      eth/backend.go
  2. 6
      eth/downloader/beaconsync.go
  3. 65
      eth/downloader/downloader.go
  4. 44
      eth/downloader/downloader_test.go
  5. 15
      eth/downloader/modes.go
  6. 2
      eth/handler.go

@ -19,7 +19,6 @@ package eth
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math/big" "math/big"
"runtime" "runtime"
@ -105,9 +104,6 @@ type Ethereum struct {
// whose lifecycle will be managed by the provided node. // whose lifecycle will be managed by the provided node.
func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Ensure configuration values are compatible and sane // Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, light mode has been deprecated")
}
if !config.SyncMode.IsValid() { if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
} }

@ -202,7 +202,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
case SnapSync: case SnapSync:
chainHead = d.blockchain.CurrentSnapBlock() chainHead = d.blockchain.CurrentSnapBlock()
default: default:
chainHead = d.lightchain.CurrentHeader() panic("unknown sync mode")
} }
number := chainHead.Number.Uint64() number := chainHead.Number.Uint64()
@ -222,7 +222,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
case SnapSync: case SnapSync:
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1) linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
default: default:
linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1) panic("unknown sync mode")
} }
if !linked { if !linked {
// This is a programming error. The chain backfiller was called with a // This is a programming error. The chain backfiller was called with a
@ -257,7 +257,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
case SnapSync: case SnapSync:
known = d.blockchain.HasFastBlock(h.Hash(), n) known = d.blockchain.HasFastBlock(h.Hash(), n)
default: default:
known = d.lightchain.HasHeader(h.Hash(), n) panic("unknown sync mode")
} }
if !known { if !known {
end = check end = check

@ -67,7 +67,6 @@ var (
errCancelContentProcessing = errors.New("content processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)")
errNoPivotHeader = errors.New("pivot header is not found") errNoPivotHeader = errors.New("pivot header is not found")
ErrMergeTransition = errors.New("legacy sync reached the merge")
) )
// peerDropFn is a callback type for dropping a peer detected as malicious. // peerDropFn is a callback type for dropping a peer detected as malicious.
@ -98,7 +97,6 @@ type Downloader struct {
syncStatsChainHeight uint64 // Highest block number known when syncing started syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
lightchain LightChain
blockchain BlockChain blockchain BlockChain
// Callbacks // Callbacks
@ -143,8 +141,8 @@ type Downloader struct {
syncLogTime time.Time // Time instance when status was last reported syncLogTime time.Time // Time instance when status was last reported
} }
// LightChain encapsulates functions required to synchronise a light chain. // BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type LightChain interface { type BlockChain interface {
// HasHeader verifies a header's presence in the local chain. // HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash, uint64) bool HasHeader(common.Hash, uint64) bool
@ -162,11 +160,6 @@ type LightChain interface {
// SetHead rewinds the local chain to a new head. // SetHead rewinds the local chain to a new head.
SetHead(uint64) error SetHead(uint64) error
}
// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
LightChain
// HasBlock verifies a block's presence in the local chain. // HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool HasBlock(common.Hash, uint64) bool
@ -201,17 +194,13 @@ type BlockChain interface {
} }
// New creates a new downloader to fetch hashes and blocks from remote peers. // New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{ dl := &Downloader{
stateDB: stateDb, stateDB: stateDb,
mux: mux, mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(), peers: newPeerSet(),
blockchain: chain, blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer, dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1), headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}), quitCh: make(chan struct{}),
@ -240,15 +229,13 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
current := uint64(0) current := uint64(0)
mode := d.getMode() mode := d.getMode()
switch { switch mode {
case d.blockchain != nil && mode == FullSync: case FullSync:
current = d.blockchain.CurrentBlock().Number.Uint64() current = d.blockchain.CurrentBlock().Number.Uint64()
case d.blockchain != nil && mode == SnapSync: case SnapSync:
current = d.blockchain.CurrentSnapBlock().Number.Uint64() current = d.blockchain.CurrentSnapBlock().Number.Uint64()
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default: default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode) log.Error("Unknown downloader mode", "mode", mode)
} }
progress, pending := d.SnapSyncer.Progress() progress, pending := d.SnapSyncer.Progress()
@ -402,7 +389,7 @@ func (d *Downloader) syncToHead() (err error) {
if err != nil { if err != nil {
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
} else { } else {
latest := d.lightchain.CurrentHeader() latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest}) d.mux.Post(DoneEvent{latest})
} }
}() }()
@ -520,7 +507,7 @@ func (d *Downloader) syncToHead() (err error) {
} }
// Rewind the ancient store and blockchain if reorg happens. // Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen { if origin+1 < frozen {
if err := d.lightchain.SetHead(origin); err != nil { if err := d.blockchain.SetHead(origin); err != nil {
return err return err
} }
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin) log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
@ -690,34 +677,32 @@ func (d *Downloader) processHeaders(origin uint64) error {
chunkHashes := hashes[:limit] chunkHashes := hashes[:limit]
// In case of header only syncing, validate the chunk immediately // In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync { if mode == SnapSync {
// Although the received headers might be all valid, a legacy // Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure // PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point. // that any transition is rejected at this point.
if len(chunkHeaders) > 0 { if len(chunkHeaders) > 0 {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil { if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err) return fmt.Errorf("%w: %v", errInvalidChain, err)
} }
} }
} }
// Unless we're doing light chains, schedule the headers for associated content retrieval // If we've reached the allowed number of pending headers, stall a bit
if mode == FullSync || mode == SnapSync { for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
// If we've reached the allowed number of pending headers, stall a bit timer.Reset(time.Second)
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select {
timer.Reset(time.Second) case <-d.cancelCh:
select { return errCanceled
case <-d.cancelCh: case <-timer.C:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
} }
} }
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
}
headers = headers[limit:] headers = headers[limit:]
hashes = hashes[limit:] hashes = hashes[limit:]
origin += uint64(limit) origin += uint64(limit)
@ -1056,7 +1041,7 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea
headers []*types.Header headers []*types.Header
) )
for { for {
parent := d.lightchain.GetHeaderByHash(current.ParentHash) parent := d.blockchain.GetHeaderByHash(current.ParentHash)
if parent == nil { if parent == nil {
break // The chain is not continuous, or the chain is exhausted break // The chain is not continuous, or the chain is exhausted
} }

@ -76,7 +76,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester {
chain: chain, chain: chain,
peers: make(map[string]*downloadTesterPeer), peers: make(map[string]*downloadTesterPeer),
} }
tester.downloader = New(db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success) tester.downloader = New(db, new(event.TypeMux), tester.chain, tester.dropPeer, success)
return tester return tester
} }
@ -384,9 +384,6 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
t.Helper() t.Helper()
headers, blocks, receipts := length, length, length headers, blocks, receipts := length, length, length
if tester.downloader.getMode() == LightSync {
blocks, receipts = 1, 1
}
if hs := int(tester.chain.CurrentHeader().Number.Uint64()) + 1; hs != headers { if hs := int(tester.chain.CurrentHeader().Number.Uint64()) + 1; hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
} }
@ -398,9 +395,8 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
} }
} }
func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) } func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) }
func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) } func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) }
func TestCanonicalSynchronisation68Light(t *testing.T) { testCanonSync(t, eth.ETH68, LightSync) }
func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{}) success := make(chan struct{})
@ -505,9 +501,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
} }
// Tests that a canceled download wipes all previously accumulated state. // Tests that a canceled download wipes all previously accumulated state.
func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) } func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) }
func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) } func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) }
func TestCancel68Light(t *testing.T) { testCancel(t, eth.ETH68, LightSync) }
func testCancel(t *testing.T, protocol uint, mode SyncMode) { func testCancel(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{}) complete := make(chan struct{})
@ -538,9 +533,8 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) {
// Tests that synchronisations behave well in multi-version protocol environments // Tests that synchronisations behave well in multi-version protocol environments
// and not wreak havoc on other nodes in the network. // and not wreak havoc on other nodes in the network.
func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) } func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) }
func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) } func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) }
func TestMultiProtoSynchronisation68Light(t *testing.T) { testMultiProtoSync(t, eth.ETH68, LightSync) }
func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{}) complete := make(chan struct{})
@ -578,9 +572,8 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
// Tests that if a block is empty (e.g. header only), no body request should be // Tests that if a block is empty (e.g. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself. // made, and instead the header should be assembled into a whole block in itself.
func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) } func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) }
func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) } func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) }
func TestEmptyShortCircuit68Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{}) success := make(chan struct{})
@ -619,7 +612,7 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
// Validate the number of block bodies that should have been requested // Validate the number of block bodies that should have been requested
bodiesNeeded, receiptsNeeded := 0, 0 bodiesNeeded, receiptsNeeded := 0, 0
for _, block := range chain.blocks[1:] { for _, block := range chain.blocks[1:] {
if mode != LightSync && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { if len(block.Transactions()) > 0 || len(block.Uncles()) > 0 {
bodiesNeeded++ bodiesNeeded++
} }
} }
@ -694,9 +687,8 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
// Tests that synchronisation progress (origin block number, current block number // Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly. // and highest block number) is tracked and updated correctly.
func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) } func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) }
func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) } func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) }
func TestSyncProgress68Light(t *testing.T) { testSyncProgress(t, eth.ETH68, LightSync) }
func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{}) success := make(chan struct{})
@ -734,17 +726,7 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
t.Fatalf("failed to beacon-sync chain: %v", err) t.Fatalf("failed to beacon-sync chain: %v", err)
} }
var startingBlock uint64 startingBlock := uint64(len(chain.blocks)/2 - 1)
if mode == LightSync {
// in light-sync mode:
// * the starting block is 0 on the second sync cycle because blocks
// are never downloaded.
// * The current/highest blocks reported in the progress reflect the
// current/highest header.
startingBlock = 0
} else {
startingBlock = uint64(len(chain.blocks)/2 - 1)
}
select { select {
case <-success: case <-success:

@ -23,13 +23,12 @@ import "fmt"
type SyncMode uint32 type SyncMode uint32
const ( const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
SnapSync // Download the chain and the state via compact snapshots SnapSync // Download the chain and the state via compact snapshots
LightSync // Download only the headers and terminate afterwards
) )
func (mode SyncMode) IsValid() bool { func (mode SyncMode) IsValid() bool {
return mode >= FullSync && mode <= LightSync return mode == FullSync || mode == SnapSync
} }
// String implements the stringer interface. // String implements the stringer interface.
@ -39,8 +38,6 @@ func (mode SyncMode) String() string {
return "full" return "full"
case SnapSync: case SnapSync:
return "snap" return "snap"
case LightSync:
return "light"
default: default:
return "unknown" return "unknown"
} }
@ -52,8 +49,6 @@ func (mode SyncMode) MarshalText() ([]byte, error) {
return []byte("full"), nil return []byte("full"), nil
case SnapSync: case SnapSync:
return []byte("snap"), nil return []byte("snap"), nil
case LightSync:
return []byte("light"), nil
default: default:
return nil, fmt.Errorf("unknown sync mode %d", mode) return nil, fmt.Errorf("unknown sync mode %d", mode)
} }
@ -65,10 +60,8 @@ func (mode *SyncMode) UnmarshalText(text []byte) error {
*mode = FullSync *mode = FullSync
case "snap": case "snap":
*mode = SnapSync *mode = SnapSync
case "light":
*mode = LightSync
default: default:
return fmt.Errorf(`unknown sync mode %q, want "full", "snap" or "light"`, text) return fmt.Errorf(`unknown sync mode %q, want "full" or "snap"`, text)
} }
return nil return nil
} }

@ -180,7 +180,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return nil, errors.New("snap sync not supported with snapshots disabled") return nil, errors.New("snap sync not supported with snapshots disabled")
} }
// Construct the downloader (long sync) // Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, h.enableSyncedFeatures) h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures)
fetchTx := func(peer string, hashes []common.Hash) error { fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer) p := h.peers.peer(peer)

Loading…
Cancel
Save