From bf1798e04ee52eda35a908b9a67d52f881d71401 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 9 Feb 2023 13:03:54 +0200 Subject: [PATCH] common/prque: generic priority queue (#26290) * common, core, eth, les, trie: make prque generic * les/vflux/server: fixed issues in priorityPool * common, core, eth, les, trie: make priority also generic in prque * les/flowcontrol: add test case for priority accumulator overflow * les/flowcontrol: avoid priority value overflow * common/prque: use int priority in some tests No need to convert to int64 when we can just change the type used by the queue. * common/prque: remove comment about int64 range --------- Co-authored-by: Zsolt Felfoldi Co-authored-by: Felix Lange --- common/prque/lazyqueue.go | 78 +++++++++++++-------------- common/prque/prque.go | 44 +++++++-------- common/prque/prque_test.go | 27 +++++----- common/prque/sstack.go | 65 ++++++++++------------ common/prque/sstack_test.go | 30 +++++------ core/blockchain.go | 22 ++++---- core/rawdb/chain_iterator.go | 8 +-- core/txpool/txpool.go | 6 +-- eth/downloader/fetchers_concurrent.go | 8 ++- eth/downloader/queue.go | 48 +++++++++-------- eth/fetcher/block_fetcher.go | 10 ++-- go.mod | 6 +-- go.sum | 14 ++--- les/downloader/queue.go | 46 ++++++++-------- les/fetcher/block_fetcher.go | 10 ++-- les/flowcontrol/manager.go | 25 +++++++-- les/flowcontrol/manager_test.go | 22 +++++--- les/servingqueue.go | 16 +++--- les/vflux/server/prioritypool.go | 52 +++++++++--------- trie/sync.go | 4 +- 20 files changed, 277 insertions(+), 264 deletions(-) diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go index 13ef3ed2cd..59bda72fa7 100644 --- a/common/prque/lazyqueue.go +++ b/common/prque/lazyqueue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "golang.org/x/exp/constraints" ) // LazyQueue is a priority queue data structure where priorities can change over @@ -32,31 +33,31 @@ import ( // // If the upper estimate is exceeded then Update should be called for that item. // A global Refresh function should also be called periodically. -type LazyQueue struct { +type LazyQueue[P constraints.Ordered, V any] struct { clock mclock.Clock // Items are stored in one of two internal queues ordered by estimated max // priority until the next and the next-after-next refresh. Update and Refresh // always places items in queue[1]. - queue [2]*sstack - popQueue *sstack + queue [2]*sstack[P, V] + popQueue *sstack[P, V] period time.Duration maxUntil mclock.AbsTime indexOffset int - setIndex SetIndexCallback - priority PriorityCallback - maxPriority MaxPriorityCallback + setIndex SetIndexCallback[V] + priority PriorityCallback[P, V] + maxPriority MaxPriorityCallback[P, V] lastRefresh1, lastRefresh2 mclock.AbsTime } type ( - PriorityCallback func(data interface{}) int64 // actual priority callback - MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback + PriorityCallback[P constraints.Ordered, V any] func(data V) P // actual priority callback + MaxPriorityCallback[P constraints.Ordered, V any] func(data V, until mclock.AbsTime) P // estimated maximum priority callback ) // NewLazyQueue creates a new lazy queue -func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { - q := &LazyQueue{ - popQueue: newSstack(nil, false), +func NewLazyQueue[P constraints.Ordered, V any](setIndex SetIndexCallback[V], priority PriorityCallback[P, V], maxPriority MaxPriorityCallback[P, V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[P, V] { + q := &LazyQueue[P, V]{ + popQueue: newSstack[P, V](nil), setIndex: setIndex, priority: priority, maxPriority: maxPriority, @@ -71,13 +72,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior } // Reset clears the contents of the queue -func (q *LazyQueue) Reset() { - q.queue[0] = newSstack(q.setIndex0, false) - q.queue[1] = newSstack(q.setIndex1, false) +func (q *LazyQueue[P, V]) Reset() { + q.queue[0] = newSstack[P, V](q.setIndex0) + q.queue[1] = newSstack[P, V](q.setIndex1) } // Refresh performs queue re-evaluation if necessary -func (q *LazyQueue) Refresh() { +func (q *LazyQueue[P, V]) Refresh() { now := q.clock.Now() for time.Duration(now-q.lastRefresh2) >= q.period*2 { q.refresh(now) @@ -87,10 +88,10 @@ func (q *LazyQueue) Refresh() { } // refresh re-evaluates items in the older queue and swaps the two queues -func (q *LazyQueue) refresh(now mclock.AbsTime) { +func (q *LazyQueue[P, V]) refresh(now mclock.AbsTime) { q.maxUntil = now.Add(q.period) for q.queue[0].Len() != 0 { - q.Push(heap.Pop(q.queue[0]).(*item).value) + q.Push(heap.Pop(q.queue[0]).(*item[P, V]).value) } q.queue[0], q.queue[1] = q.queue[1], q.queue[0] q.indexOffset = 1 - q.indexOffset @@ -98,22 +99,22 @@ func (q *LazyQueue) refresh(now mclock.AbsTime) { } // Push adds an item to the queue -func (q *LazyQueue) Push(data interface{}) { - heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)}) +func (q *LazyQueue[P, V]) Push(data V) { + heap.Push(q.queue[1], &item[P, V]{data, q.maxPriority(data, q.maxUntil)}) } // Update updates the upper priority estimate for the item with the given queue index -func (q *LazyQueue) Update(index int) { +func (q *LazyQueue[P, V]) Update(index int) { q.Push(q.Remove(index)) } // Pop removes and returns the item with the greatest actual priority -func (q *LazyQueue) Pop() (interface{}, int64) { +func (q *LazyQueue[P, V]) Pop() (V, P) { var ( - resData interface{} - resPri int64 + resData V + resPri P ) - q.MultiPop(func(data interface{}, priority int64) bool { + q.MultiPop(func(data V, priority P) bool { resData = data resPri = priority return false @@ -123,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) { // peekIndex returns the index of the internal queue where the item with the // highest estimated priority is or -1 if both are empty -func (q *LazyQueue) peekIndex() int { +func (q *LazyQueue[P, V]) peekIndex() int { if q.queue[0].Len() != 0 { if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority { return 1 @@ -139,17 +140,17 @@ func (q *LazyQueue) peekIndex() int { // MultiPop pops multiple items from the queue and is more efficient than calling // Pop multiple times. Popped items are passed to the callback. MultiPop returns // when the callback returns false or there are no more items to pop. -func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) { +func (q *LazyQueue[P, V]) MultiPop(callback func(data V, priority P) bool) { nextIndex := q.peekIndex() for nextIndex != -1 { - data := heap.Pop(q.queue[nextIndex]).(*item).value - heap.Push(q.popQueue, &item{data, q.priority(data)}) + data := heap.Pop(q.queue[nextIndex]).(*item[P, V]).value + heap.Push(q.popQueue, &item[P, V]{data, q.priority(data)}) nextIndex = q.peekIndex() for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) { - i := heap.Pop(q.popQueue).(*item) + i := heap.Pop(q.popQueue).(*item[P, V]) if !callback(i.value, i.priority) { for q.popQueue.Len() != 0 { - q.Push(heap.Pop(q.popQueue).(*item).value) + q.Push(heap.Pop(q.popQueue).(*item[P, V]).value) } return } @@ -159,31 +160,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo } // PopItem pops the item from the queue only, dropping the associated priority value. -func (q *LazyQueue) PopItem() interface{} { +func (q *LazyQueue[P, V]) PopItem() V { i, _ := q.Pop() return i } // Remove removes the item with the given index. -func (q *LazyQueue) Remove(index int) interface{} { - if index < 0 { - return nil - } - return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value +func (q *LazyQueue[P, V]) Remove(index int) V { + return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[P, V]).value } // Empty checks whether the priority queue is empty. -func (q *LazyQueue) Empty() bool { +func (q *LazyQueue[P, V]) Empty() bool { return q.queue[0].Len() == 0 && q.queue[1].Len() == 0 } // Size returns the number of items in the priority queue. -func (q *LazyQueue) Size() int { +func (q *LazyQueue[P, V]) Size() int { return q.queue[0].Len() + q.queue[1].Len() } // setIndex0 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex0(data interface{}, index int) { +func (q *LazyQueue[P, V]) setIndex0(data V, index int) { if index == -1 { q.setIndex(data, -1) } else { @@ -192,6 +190,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) { } // setIndex1 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex1(data interface{}, index int) { +func (q *LazyQueue[P, V]) setIndex1(data V, index int) { q.setIndex(data, index+index+1) } diff --git a/common/prque/prque.go b/common/prque/prque.go index fb02e3418c..0e8c9f897f 100755 --- a/common/prque/prque.go +++ b/common/prque/prque.go @@ -19,65 +19,59 @@ package prque import ( "container/heap" + + "golang.org/x/exp/constraints" ) // Priority queue data structure. -type Prque struct { - cont *sstack +type Prque[P constraints.Ordered, V any] struct { + cont *sstack[P, V] } // New creates a new priority queue. -func New(setIndex SetIndexCallback) *Prque { - return &Prque{newSstack(setIndex, false)} -} - -// NewWrapAround creates a new priority queue with wrap-around priority handling. -func NewWrapAround(setIndex SetIndexCallback) *Prque { - return &Prque{newSstack(setIndex, true)} +func New[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *Prque[P, V] { + return &Prque[P, V]{newSstack[P, V](setIndex)} } // Pushes a value with a given priority into the queue, expanding if necessary. -func (p *Prque) Push(data interface{}, priority int64) { - heap.Push(p.cont, &item{data, priority}) +func (p *Prque[P, V]) Push(data V, priority P) { + heap.Push(p.cont, &item[P, V]{data, priority}) } // Peek returns the value with the greatest priority but does not pop it off. -func (p *Prque) Peek() (interface{}, int64) { +func (p *Prque[P, V]) Peek() (V, P) { item := p.cont.blocks[0][0] return item.value, item.priority } // Pops the value with the greatest priority off the stack and returns it. // Currently no shrinking is done. -func (p *Prque) Pop() (interface{}, int64) { - item := heap.Pop(p.cont).(*item) +func (p *Prque[P, V]) Pop() (V, P) { + item := heap.Pop(p.cont).(*item[P, V]) return item.value, item.priority } // Pops only the item from the queue, dropping the associated priority value. -func (p *Prque) PopItem() interface{} { - return heap.Pop(p.cont).(*item).value +func (p *Prque[P, V]) PopItem() V { + return heap.Pop(p.cont).(*item[P, V]).value } // Remove removes the element with the given index. -func (p *Prque) Remove(i int) interface{} { - if i < 0 { - return nil - } - return heap.Remove(p.cont, i) +func (p *Prque[P, V]) Remove(i int) V { + return heap.Remove(p.cont, i).(*item[P, V]).value } // Checks whether the priority queue is empty. -func (p *Prque) Empty() bool { +func (p *Prque[P, V]) Empty() bool { return p.cont.Len() == 0 } // Returns the number of element in the priority queue. -func (p *Prque) Size() int { +func (p *Prque[P, V]) Size() int { return p.cont.Len() } // Clears the contents of the priority queue. -func (p *Prque) Reset() { - *p = *New(p.cont.setIndex) +func (p *Prque[P, V]) Reset() { + *p = *New[P, V](p.cont.setIndex) } diff --git a/common/prque/prque_test.go b/common/prque/prque_test.go index 1cffcebad4..c4910f205a 100644 --- a/common/prque/prque_test.go +++ b/common/prque/prque_test.go @@ -21,22 +21,24 @@ func TestPrque(t *testing.T) { for i := 0; i < size; i++ { data[i] = rand.Int() } - queue := New(nil) + queue := New[int, int](nil) + for rep := 0; rep < 2; rep++ { // Fill a priority queue with the above data for i := 0; i < size; i++ { - queue.Push(data[i], int64(prio[i])) + queue.Push(data[i], prio[i]) if queue.Size() != i+1 { t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) } } // Create a map the values to the priorities for easier verification - dict := make(map[int64]int) + dict := make(map[int]int) for i := 0; i < size; i++ { - dict[int64(prio[i])] = data[i] + dict[prio[i]] = data[i] } + // Pop out the elements in priority order and verify them - prevPrio := int64(size + 1) + prevPrio := size + 1 for !queue.Empty() { val, prio := queue.Pop() if prio > prevPrio { @@ -59,22 +61,23 @@ func TestReset(t *testing.T) { for i := 0; i < size; i++ { data[i] = rand.Int() } - queue := New(nil) + queue := New[int, int](nil) + for rep := 0; rep < 2; rep++ { // Fill a priority queue with the above data for i := 0; i < size; i++ { - queue.Push(data[i], int64(prio[i])) + queue.Push(data[i], prio[i]) if queue.Size() != i+1 { t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) } } // Create a map the values to the priorities for easier verification - dict := make(map[int64]int) + dict := make(map[int]int) for i := 0; i < size; i++ { - dict[int64(prio[i])] = data[i] + dict[prio[i]] = data[i] } // Pop out half the elements in priority order and verify them - prevPrio := int64(size + 1) + prevPrio := size + 1 for i := 0; i < size/2; i++ { val, prio := queue.Pop() if prio > prevPrio { @@ -104,7 +107,7 @@ func BenchmarkPush(b *testing.B) { } // Execute the benchmark b.ResetTimer() - queue := New(nil) + queue := New[int64, int](nil) for i := 0; i < len(data); i++ { queue.Push(data[i], prio[i]) } @@ -118,7 +121,7 @@ func BenchmarkPop(b *testing.B) { data[i] = rand.Int() prio[i] = rand.Int63() } - queue := New(nil) + queue := New[int64, int](nil) for i := 0; i < len(data); i++ { queue.Push(data[i], prio[i]) } diff --git a/common/prque/sstack.go b/common/prque/sstack.go index b06a95413d..5dcd1d9dd0 100755 --- a/common/prque/sstack.go +++ b/common/prque/sstack.go @@ -10,53 +10,50 @@ package prque +import "golang.org/x/exp/constraints" + // The size of a block of data const blockSize = 4096 // A prioritized item in the sorted stack. -// -// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0. -// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63. -type item struct { - value interface{} - priority int64 +type item[P constraints.Ordered, V any] struct { + value V + priority P } // SetIndexCallback is called when the element is moved to a new index. // Providing SetIndexCallback is optional, it is needed only if the application needs // to delete elements other than the top one. -type SetIndexCallback func(data interface{}, index int) +type SetIndexCallback[V any] func(data V, index int) // Internal sortable stack data structure. Implements the Push and Pop ops for // the stack (heap) functionality and the Len, Less and Swap methods for the // sortability requirements of the heaps. -type sstack struct { - setIndex SetIndexCallback - size int - capacity int - offset int - wrapAround bool +type sstack[P constraints.Ordered, V any] struct { + setIndex SetIndexCallback[V] + size int + capacity int + offset int - blocks [][]*item - active []*item + blocks [][]*item[P, V] + active []*item[P, V] } // Creates a new, empty stack. -func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack { - result := new(sstack) +func newSstack[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *sstack[P, V] { + result := new(sstack[P, V]) result.setIndex = setIndex - result.active = make([]*item, blockSize) - result.blocks = [][]*item{result.active} + result.active = make([]*item[P, V], blockSize) + result.blocks = [][]*item[P, V]{result.active} result.capacity = blockSize - result.wrapAround = wrapAround return result } // Pushes a value onto the stack, expanding it if necessary. Required by // heap.Interface. -func (s *sstack) Push(data interface{}) { +func (s *sstack[P, V]) Push(data any) { if s.size == s.capacity { - s.active = make([]*item, blockSize) + s.active = make([]*item[P, V], blockSize) s.blocks = append(s.blocks, s.active) s.capacity += blockSize s.offset = 0 @@ -65,16 +62,16 @@ func (s *sstack) Push(data interface{}) { s.offset = 0 } if s.setIndex != nil { - s.setIndex(data.(*item).value, s.size) + s.setIndex(data.(*item[P, V]).value, s.size) } - s.active[s.offset] = data.(*item) + s.active[s.offset] = data.(*item[P, V]) s.offset++ s.size++ } // Pops a value off the stack and returns it. Currently no shrinking is done. // Required by heap.Interface. -func (s *sstack) Pop() (res interface{}) { +func (s *sstack[P, V]) Pop() (res any) { s.size-- s.offset-- if s.offset < 0 { @@ -83,28 +80,24 @@ func (s *sstack) Pop() (res interface{}) { } res, s.active[s.offset] = s.active[s.offset], nil if s.setIndex != nil { - s.setIndex(res.(*item).value, -1) + s.setIndex(res.(*item[P, V]).value, -1) } return } // Returns the length of the stack. Required by sort.Interface. -func (s *sstack) Len() int { +func (s *sstack[P, V]) Len() int { return s.size } // Compares the priority of two elements of the stack (higher is first). // Required by sort.Interface. -func (s *sstack) Less(i, j int) bool { - a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority - if s.wrapAround { - return a-b > 0 - } - return a > b +func (s *sstack[P, V]) Less(i, j int) bool { + return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority } // Swaps two elements in the stack. Required by sort.Interface. -func (s *sstack) Swap(i, j int) { +func (s *sstack[P, V]) Swap(i, j int) { ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize a, b := s.blocks[jb][jo], s.blocks[ib][io] if s.setIndex != nil { @@ -115,6 +108,6 @@ func (s *sstack) Swap(i, j int) { } // Resets the stack, effectively clearing its contents. -func (s *sstack) Reset() { - *s = *newSstack(s.setIndex, false) +func (s *sstack[P, V]) Reset() { + *s = *newSstack[P, V](s.setIndex) } diff --git a/common/prque/sstack_test.go b/common/prque/sstack_test.go index bc6298979c..edc99955e8 100644 --- a/common/prque/sstack_test.go +++ b/common/prque/sstack_test.go @@ -17,23 +17,23 @@ import ( func TestSstack(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), rand.Int63()} + data[i] = &item[int64, int]{rand.Int(), rand.Int63()} } - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second - secs := []*item{} + secs := []*item[int64, int]{} for i := 0; i < size; i++ { stack.Push(data[i]) if i%2 == 0 { - secs = append(secs, stack.Pop().(*item)) + secs = append(secs, stack.Pop().(*item[int64, int])) } } - rest := []*item{} + rest := []*item[int64, int]{} for stack.Len() > 0 { - rest = append(rest, stack.Pop().(*item)) + rest = append(rest, stack.Pop().(*item[int64, int])) } // Make sure the contents of the resulting slices are ok for i := 0; i < size; i++ { @@ -50,12 +50,12 @@ func TestSstack(t *testing.T) { func TestSstackSort(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), int64(i)} + data[i] = &item[int64, int]{rand.Int(), int64(i)} } // Push all the data into the stack - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for _, val := range data { stack.Push(val) } @@ -72,18 +72,18 @@ func TestSstackSort(t *testing.T) { func TestSstackReset(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), rand.Int63()} + data[i] = &item[int64, int]{rand.Int(), rand.Int63()} } - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second - secs := []*item{} + secs := []*item[int64, int]{} for i := 0; i < size; i++ { stack.Push(data[i]) if i%2 == 0 { - secs = append(secs, stack.Pop().(*item)) + secs = append(secs, stack.Pop().(*item[int64, int])) } } // Reset and verify both pulled and stack contents diff --git a/core/blockchain.go b/core/blockchain.go index c049f8955a..98d2e7a774 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -169,14 +169,14 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - lastWrite uint64 // Last block when the state was flushed - flushInterval int64 // Time interval (processing time) after which to flush a state - triedb *trie.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + lastWrite uint64 // Last block when the state was flushed + flushInterval int64 // Time interval (processing time) after which to flush a state + triedb *trie.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -261,7 +261,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis db: db, triedb: triedb, flushInterval: int64(cacheConfig.TrieTimeLimit), - triegc: prque.New(nil), + triegc: prque.New[int64, common.Hash](nil), quit: make(chan struct{}), chainmu: syncx.NewClosableMutex(), bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), @@ -957,7 +957,7 @@ func (bc *BlockChain) Stop() { } } for !bc.triegc.Empty() { - triedb.Dereference(bc.triegc.PopItem().(common.Hash)) + triedb.Dereference(bc.triegc.PopItem()) } if size, _ := triedb.Size(); size != 0 { log.Error("Dangling trie nodes after full cleanup") @@ -1391,7 +1391,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.triegc.Push(root, number) break } - bc.triedb.Dereference(root.(common.Hash)) + bc.triedb.Dereference(root) } return nil } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 121f6d39dd..85ad88e291 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -191,7 +191,7 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan // in to be [to-1]. Therefore, setting lastNum to means that the // prqueue gap-evaluation will work correctly lastNum = to - queue = prque.New(nil) + queue = prque.New[int64, *blockTxHashes](nil) // for stats reporting blocks, txs = 0, 0 ) @@ -210,7 +210,7 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan break } // Next block available, pop it off and index it - delivery := queue.PopItem().(*blockTxHashes) + delivery := queue.PopItem() lastNum = delivery.number WriteTxLookupEntries(batch, delivery.number, delivery.hashes) blocks++ @@ -282,7 +282,7 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch // we expect the first number to come in to be [from]. Therefore, setting // nextNum to from means that the prqueue gap-evaluation will work correctly nextNum = from - queue = prque.New(nil) + queue = prque.New[int64, *blockTxHashes](nil) // for stats reporting blocks, txs = 0, 0 ) @@ -299,7 +299,7 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch if hook != nil && !hook(nextNum) { break } - delivery := queue.PopItem().(*blockTxHashes) + delivery := queue.PopItem() nextNum = delivery.number + 1 DeleteTxLookupEntries(batch, delivery.hashes) txs += len(delivery.hashes) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 6b878ed586..c805201866 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -1395,7 +1395,7 @@ func (pool *TxPool) truncatePending() { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first - spammers := prque.New(nil) + spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { @@ -1407,12 +1407,12 @@ func (pool *TxPool) truncatePending() { for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Address)) + offenders = append(offenders, offender) // Equalize balances until all the same or below threshold if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders - threshold := pool.pending[offender.(common.Address)].Len() + threshold := pool.pending[offender].Len() // Iteratively reduce all offenders until below limit or threshold reached for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 44e6aa8f8d..ceec06de4f 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -91,8 +91,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { } }() ordering := make(map[*eth.Request]int) - timeouts := prque.New(func(data interface{}, index int) { - ordering[data.(*eth.Request)] = index + timeouts := prque.New[int64, *eth.Request](func(data *eth.Request, index int) { + ordering[data] = index }) timeout := time.NewTimer(0) @@ -268,14 +268,12 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // below is purely for to catch programming errors, given the correct // code, there's no possible order of events that should result in a // timeout firing for a non-existent event. - item, exp := timeouts.Peek() + req, exp := timeouts.Peek() if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) { log.Error("Timeout triggered but not reached", "left", at.Sub(now)) timeout.Reset(at.Sub(now)) continue } - req := item.(*eth.Request) - // Stop tracking the timed out request from a timing perspective, // cancel it, so it's not considered in-flight anymore, but keep // the peer marked busy to prevent assigning a second request and diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c71b36466d..13b3021b24 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -115,7 +115,7 @@ type queue struct { // Headers are "special", they download in batches, supported by a skeleton chain headerHead common.Hash // Hash of the last queued header to verify order headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for + headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations headerResults []*types.Header // Result cache accumulating the completed headers @@ -125,15 +125,15 @@ type queue struct { headerContCh chan bool // Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain - blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers - blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for - blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations - blockWakeCh chan bool // Channel to notify the block fetcher of new tasks + blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations + blockWakeCh chan bool // Channel to notify the block fetcher of new tasks - receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers - receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for - receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations - receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks + receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations + receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks resultCache *resultStore // Downloaded but not yet delivered fetch results resultSize common.StorageSize // Approximate size of a block (exponential moving average) @@ -150,9 +150,9 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ headerContCh: make(chan bool, 1), - blockTaskQueue: prque.New(nil), + blockTaskQueue: prque.New[int64, *types.Header](nil), blockWakeCh: make(chan bool, 1), - receiptTaskQueue: prque.New(nil), + receiptTaskQueue: prque.New[int64, *types.Header](nil), receiptWakeCh: make(chan bool, 1), active: sync.NewCond(lock), lock: lock, @@ -258,7 +258,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { } // Schedule all the header retrieval tasks for the skeleton assembly q.headerTaskPool = make(map[uint64]*types.Header) - q.headerTaskQueue = prque.New(nil) + q.headerTaskQueue = prque.New[int64, uint64](nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch) @@ -428,12 +428,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { - if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { - skip = append(skip, from.(uint64)) + if _, ok := q.headerPeerMiss[p.id][from]; ok { + skip = append(skip, from) continue } } - send = from.(uint64) + send = from } // Merge all the skipped batches back for _, from := range skip { @@ -485,7 +485,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // item - the fetchRequest // progress - whether any progress was made // throttle - if the caller should throttle for a while -func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) @@ -503,8 +503,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ { // the task queue will pop items in order, so the highest prio block // is also the lowest block number. - h, _ := taskQueue.Peek() - header := h.(*types.Header) + header, _ := taskQueue.Peek() + // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle @@ -627,12 +627,14 @@ func (q *queue) ExpireReceipts(peer string) int { } // expire is the generic check that moves a specific expired task from a pending -// pool back into a task pool. +// pool back into a task pool. The syntax on the passed taskQueue is a bit weird +// as we would need a generic expire method to handle both types, but that is not +// supported at the moment at least (Go 1.19). // // Note, this method expects the queue lock to be already held. The reason the // lock is not obtained in here is that the parameters already need to access // the queue, so they already need a lock anyway. -func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) int { +func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue interface{}) int { // Retrieve the request being expired and log an error if it's non-existent, // as there's no order of events that should lead to such expirations. req := pendPool[peer] @@ -644,10 +646,10 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue // Return any non-satisfied requests to the pool if req.From > 0 { - taskQueue.Push(req.From, -int64(req.From)) + taskQueue.(*prque.Prque[int64, uint64]).Push(req.From, -int64(req.From)) } for _, header := range req.Headers { - taskQueue.Push(header, -int64(header.Number.Uint64())) + taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) } return len(req.Headers) } @@ -824,7 +826,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei // reason this lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, - taskQueue *prque.Prque, pendPool map[string]*fetchRequest, + taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter, results int, validate func(index int, header *types.Header) error, reconstruct func(index int, result *fetchResult)) (int, error) { diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 156d07e913..50081d2e54 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -175,9 +175,9 @@ type BlockFetcher struct { completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + queue *prque.Prque[int64, *blockOrHeaderInject] // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain @@ -212,7 +212,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr fetching: make(map[common.Hash]*blockAnnounce), fetched: make(map[common.Hash][]*blockAnnounce), completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), + queue: prque.New[int64, *blockOrHeaderInject](nil), queues: make(map[string]int), queued: make(map[common.Hash]*blockOrHeaderInject), getHeader: getHeader, @@ -351,7 +351,7 @@ func (f *BlockFetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - op := f.queue.PopItem().(*blockOrHeaderInject) + op := f.queue.PopItem() hash := op.hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) diff --git a/go.mod b/go.mod index 5f185e628e..2945dcbda9 100644 --- a/go.mod +++ b/go.mod @@ -59,11 +59,12 @@ require ( github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa golang.org/x/crypto v0.1.0 + golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sys v0.3.0 golang.org/x/text v0.4.0 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba - golang.org/x/tools v0.1.12 + golang.org/x/tools v0.2.0 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce ) @@ -116,8 +117,7 @@ require ( github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect + golang.org/x/mod v0.6.0 // indirect golang.org/x/net v0.1.0 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect google.golang.org/protobuf v1.27.1 // indirect diff --git a/go.sum b/go.sum index b96fec4487..23b798cc36 100644 --- a/go.sum +++ b/go.sum @@ -298,7 +298,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64= @@ -657,8 +657,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 h1:rxKZ2gOnYxjfmakvUUqh9Gyb6KXfrj7JWTxORTYqb0E= -golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -680,8 +680,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= +golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -872,8 +872,8 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/les/downloader/queue.go b/les/downloader/queue.go index 5b7054cf35..6896b09b38 100644 --- a/les/downloader/queue.go +++ b/les/downloader/queue.go @@ -115,7 +115,7 @@ type queue struct { // Headers are "special", they download in batches, supported by a skeleton chain headerHead common.Hash // Hash of the last queued header to verify order headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for + headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations headerResults []*types.Header // Result cache accumulating the completed headers @@ -124,13 +124,13 @@ type queue struct { headerContCh chan bool // Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain - blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers - blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for - blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations + blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations - receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers - receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for - receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations + receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations resultCache *resultStore // Downloaded but not yet delivered fetch results resultSize common.StorageSize // Approximate size of a block (exponential moving average) @@ -147,8 +147,8 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ headerContCh: make(chan bool), - blockTaskQueue: prque.New(nil), - receiptTaskQueue: prque.New(nil), + blockTaskQueue: prque.New[int64, *types.Header](nil), + receiptTaskQueue: prque.New[int64, *types.Header](nil), active: sync.NewCond(lock), lock: lock, } @@ -262,7 +262,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { } // Schedule all the header retrieval tasks for the skeleton assembly q.headerTaskPool = make(map[uint64]*types.Header) - q.headerTaskQueue = prque.New(nil) + q.headerTaskQueue = prque.New[int64, uint64](nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 @@ -424,12 +424,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { - if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { - skip = append(skip, from.(uint64)) + if _, ok := q.headerPeerMiss[p.id][from]; ok { + skip = append(skip, from) continue } } - send = from.(uint64) + send = from } // Merge all the skipped batches back for _, from := range skip { @@ -481,7 +481,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // item - the fetchRequest // progress - whether any progress was made // throttle - if the caller should throttle for a while -func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) @@ -499,8 +499,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ { // the task queue will pop items in order, so the highest prio block // is also the lowest block number. - h, _ := taskQueue.Peek() - header := h.(*types.Header) + header, _ := taskQueue.Peek() + // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle @@ -591,12 +591,12 @@ func (q *queue) CancelReceipts(request *fetchRequest) { } // Cancel aborts a fetch request, returning all pending hashes to the task queue. -func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { +func (q *queue) cancel(request *fetchRequest, taskQueue interface{}, pendPool map[string]*fetchRequest) { if request.From > 0 { - taskQueue.Push(request.From, -int64(request.From)) + taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -int64(header.Number.Uint64())) + taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) } delete(pendPool, request.Peer.id) } @@ -655,7 +655,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { // Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue interface{}, timeoutMeter metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue expiries := make(map[string]int) for id, request := range pendPool { @@ -665,10 +665,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // Return any non satisfied requests to the pool if request.From > 0 { - taskQueue.Push(request.From, -int64(request.From)) + taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -int64(header.Number.Uint64())) + taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) } // Add the peer to the expiry report along the number of failed requests expiries[id] = len(request.Headers) @@ -831,7 +831,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, // reason this lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, - taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer, + taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer metrics.Timer, results int, validate func(index int, header *types.Header) error, reconstruct func(index int, result *fetchResult)) (int, error) { // Short circuit if the data was never requested diff --git a/les/fetcher/block_fetcher.go b/les/fetcher/block_fetcher.go index 42cf9500a2..c76f20ced3 100644 --- a/les/fetcher/block_fetcher.go +++ b/les/fetcher/block_fetcher.go @@ -177,9 +177,9 @@ type BlockFetcher struct { completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + queue *prque.Prque[int64, *blockOrHeaderInject] // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain @@ -214,7 +214,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr fetching: make(map[common.Hash]*blockAnnounce), fetched: make(map[common.Hash][]*blockAnnounce), completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), + queue: prque.New[int64, *blockOrHeaderInject](nil), queues: make(map[string]int), queued: make(map[common.Hash]*blockOrHeaderInject), getHeader: getHeader, @@ -353,7 +353,7 @@ func (f *BlockFetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - op := f.queue.PopItem().(*blockOrHeaderInject) + op := f.queue.PopItem() hash := op.hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go index 497f91eeda..b7cc9bd903 100644 --- a/les/flowcontrol/manager.go +++ b/les/flowcontrol/manager.go @@ -75,10 +75,11 @@ type ClientManager struct { // (totalRecharge / sumRecharge)*FixedPointMultiplier or 0 if sumRecharge==0 rcLastUpdate mclock.AbsTime // last time the recharge integrator was updated rcLastIntValue int64 // last updated value of the recharge integrator + priorityOffset int64 // offset for prque priority values ensures that all priorities stay in the int64 range // recharge queue is a priority queue with currently recharging client nodes // as elements. The priority value is rcFullIntValue which allows to quickly // determine which client will first finish recharge. - rcQueue *prque.Prque + rcQueue *prque.Prque[int64, *ClientNode] } // NewClientManager returns a new client manager. @@ -107,7 +108,7 @@ type ClientManager struct { func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager { cm := &ClientManager{ clock: clock, - rcQueue: prque.NewWrapAround(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }), + rcQueue: prque.New[int64, *ClientNode](func(a *ClientNode, i int) { a.queueIndex = i }), capLastUpdate: clock.Now(), stop: make(chan chan struct{}), } @@ -288,13 +289,13 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) { } dt := now - lastUpdate // fetch the client that finishes first - rcqNode := cm.rcQueue.PopItem().(*ClientNode) // if sumRecharge > 0 then the queue cannot be empty + rcqNode := cm.rcQueue.PopItem() // if sumRecharge > 0 then the queue cannot be empty // check whether it has already finished dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio) if dt < dtNext { // not finished yet, put it back, update integrator according // to current bonusRatio and return - cm.rcQueue.Push(rcqNode, -rcqNode.rcFullIntValue) + cm.addToQueue(rcqNode) cm.rcLastIntValue += int64(bonusRatio * float64(dt)) return } @@ -308,6 +309,20 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) { } } +func (cm *ClientManager) addToQueue(node *ClientNode) { + if cm.priorityOffset-node.rcFullIntValue < -0x4000000000000000 { + cm.priorityOffset += 0x4000000000000000 + // recreate priority queue with new offset to avoid overflow; should happen very rarely + newRcQueue := prque.New[int64, *ClientNode](func(a *ClientNode, i int) { a.queueIndex = i }) + for cm.rcQueue.Size() > 0 { + n := cm.rcQueue.PopItem() + newRcQueue.Push(n, cm.priorityOffset-n.rcFullIntValue) + } + cm.rcQueue = newRcQueue + } + cm.rcQueue.Push(node, cm.priorityOffset-node.rcFullIntValue) +} + // updateNodeRc updates a node's corrBufValue and adds an external correction value. // It also adds or removes the rcQueue entry and updates ServerParams and sumRecharge if necessary. func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *ServerParams, now mclock.AbsTime) { @@ -344,7 +359,7 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve } node.rcLastIntValue = cm.rcLastIntValue node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge) - cm.rcQueue.Push(node, -node.rcFullIntValue) + cm.addToQueue(node) } } diff --git a/les/flowcontrol/manager_test.go b/les/flowcontrol/manager_test.go index 564d813f15..3afc31272f 100644 --- a/les/flowcontrol/manager_test.go +++ b/les/flowcontrol/manager_test.go @@ -17,6 +17,7 @@ package flowcontrol import ( + "math" "math/rand" "testing" "time" @@ -44,16 +45,17 @@ const ( // maximum permitted rate. The max capacity nodes are changed multiple times during // a single test. func TestConstantTotalCapacity(t *testing.T) { - testConstantTotalCapacity(t, 10, 1, 0) - testConstantTotalCapacity(t, 10, 1, 1) - testConstantTotalCapacity(t, 30, 1, 0) - testConstantTotalCapacity(t, 30, 2, 3) - testConstantTotalCapacity(t, 100, 1, 0) - testConstantTotalCapacity(t, 100, 3, 5) - testConstantTotalCapacity(t, 100, 5, 10) + testConstantTotalCapacity(t, 10, 1, 0, false) + testConstantTotalCapacity(t, 10, 1, 1, false) + testConstantTotalCapacity(t, 30, 1, 0, false) + testConstantTotalCapacity(t, 30, 2, 3, false) + testConstantTotalCapacity(t, 100, 1, 0, false) + testConstantTotalCapacity(t, 100, 3, 5, false) + testConstantTotalCapacity(t, 100, 5, 10, false) + testConstantTotalCapacity(t, 100, 3, 5, true) } -func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, randomSend int) { +func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, randomSend int, priorityOverflow bool) { clock := &mclock.Simulated{} nodes := make([]*testNode, nodeCount) var totalCapacity uint64 @@ -62,6 +64,10 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random totalCapacity += nodes[i].capacity } m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock) + if priorityOverflow { + // provoke a situation where rcLastUpdate overflow needs to be handled + m.rcLastIntValue = math.MaxInt64 - 10000000000 + } for _, n := range nodes { n.bufLimit = n.capacity * 6000 n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity}) diff --git a/les/servingqueue.go b/les/servingqueue.go index 10c7e6f48c..b4b53d8df5 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -38,10 +38,10 @@ type servingQueue struct { setThreadsCh chan int wg sync.WaitGroup - threadCount int // number of currently running threads - queue *prque.Prque // priority queue for waiting or suspended tasks - best *servingTask // the highest priority task (not included in the queue) - suspendBias int64 // priority bias against suspending an already running task + threadCount int // number of currently running threads + queue *prque.Prque[int64, *servingTask] // priority queue for waiting or suspended tasks + best *servingTask // the highest priority task (not included in the queue) + suspendBias int64 // priority bias against suspending an already running task } // servingTask represents a request serving task. Tasks can be implemented to @@ -123,7 +123,7 @@ func (t *servingTask) waitOrStop() bool { // newServingQueue returns a new servingQueue func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue { sq := &servingQueue{ - queue: prque.NewWrapAround(nil), + queue: prque.New[int64, *servingTask](nil), suspendBias: suspendBias, queueAddCh: make(chan *servingTask, 100), queueBestCh: make(chan *servingTask), @@ -214,7 +214,7 @@ func (sq *servingQueue) freezePeers() { } sq.best = nil for sq.queue.Size() > 0 { - task := sq.queue.PopItem().(*servingTask) + task := sq.queue.PopItem() tasks := peerMap[task.peer] if tasks == nil { bufValue, bufLimit := task.peer.fcClient.BufferStatus() @@ -251,7 +251,7 @@ func (sq *servingQueue) freezePeers() { } } if sq.queue.Size() > 0 { - sq.best = sq.queue.PopItem().(*servingTask) + sq.best = sq.queue.PopItem() } } @@ -310,7 +310,7 @@ func (sq *servingQueue) queueLoop() { if sq.queue.Size() == 0 { sq.best = nil } else { - sq.best, _ = sq.queue.PopItem().(*servingTask) + sq.best = sq.queue.PopItem() } case <-sq.quit: return diff --git a/les/vflux/server/prioritypool.go b/les/vflux/server/prioritypool.go index 059dac0d46..766026a808 100644 --- a/les/vflux/server/prioritypool.go +++ b/les/vflux/server/prioritypool.go @@ -77,8 +77,8 @@ type priorityPool struct { // temporary state if tempState is not empty tempState []*ppNodeInfo activeCount, activeCap uint64 - activeQueue *prque.LazyQueue - inactiveQueue *prque.Prque + activeQueue *prque.LazyQueue[int64, *ppNodeInfo] + inactiveQueue *prque.Prque[int64, *ppNodeInfo] } // ppNodeInfo is the internal node descriptor of priorityPool @@ -104,7 +104,7 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m setup: setup, ns: ns, clock: clock, - inactiveQueue: prque.New(inactiveSetIndex), + inactiveQueue: prque.New[int64, *ppNodeInfo](inactiveSetIndex), minCap: minCap, activeBias: activeBias, capacityStepDiv: capacityStepDiv, @@ -183,8 +183,7 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u } pp.setTempCapacity(c, maxTarget) c.minTarget = minTarget - pp.activeQueue.Remove(c.activeIndex) - pp.inactiveQueue.Remove(c.inactiveIndex) + pp.removeFromQueues(c) pp.activeQueue.Push(c) pp.enforceLimits() updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity) @@ -250,13 +249,13 @@ func (pp *priorityPool) Limits() (uint64, uint64) { } // inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue -func inactiveSetIndex(a interface{}, index int) { - a.(*ppNodeInfo).inactiveIndex = index +func inactiveSetIndex(a *ppNodeInfo, index int) { + a.inactiveIndex = index } // activeSetIndex callback updates ppNodeInfo item index in activeQueue -func activeSetIndex(a interface{}, index int) { - a.(*ppNodeInfo).activeIndex = index +func activeSetIndex(a *ppNodeInfo, index int) { + a.activeIndex = index } // invertPriority inverts a priority value. The active queue uses inverted priorities @@ -269,8 +268,7 @@ func invertPriority(p int64) int64 { } // activePriority callback returns actual priority of ppNodeInfo item in activeQueue -func activePriority(a interface{}) int64 { - c := a.(*ppNodeInfo) +func activePriority(c *ppNodeInfo) int64 { if c.bias == 0 { return invertPriority(c.nodePriority.priority(c.tempCapacity)) } else { @@ -279,8 +277,7 @@ func activePriority(a interface{}) int64 { } // activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue -func (pp *priorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 { - c := a.(*ppNodeInfo) +func (pp *priorityPool) activeMaxPriority(c *ppNodeInfo, until mclock.AbsTime) int64 { future := time.Duration(until - pp.clock.Now()) if future < 0 { future = 0 @@ -293,6 +290,16 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 { return p.nodePriority.priority(pp.minCap) } +// removeFromQueues removes the node from the active/inactive queues +func (pp *priorityPool) removeFromQueues(c *ppNodeInfo) { + if c.activeIndex >= 0 { + pp.activeQueue.Remove(c.activeIndex) + } + if c.inactiveIndex >= 0 { + pp.inactiveQueue.Remove(c.inactiveIndex) + } +} + // connectNode is called when a new node has been added to the pool (inactiveFlag set) // Note: this function should run inside a NodeStateMachine operation func (pp *priorityPool) connectNode(c *ppNodeInfo) { @@ -320,8 +327,7 @@ func (pp *priorityPool) disconnectNode(c *ppNodeInfo) { return } c.connected = false - pp.activeQueue.Remove(c.activeIndex) - pp.inactiveQueue.Remove(c.inactiveIndex) + pp.removeFromQueues(c) var updates []capUpdate if c.capacity != 0 { @@ -411,11 +417,11 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) { return nil, math.MinInt64 } var ( - c *ppNodeInfo + lastNode *ppNodeInfo maxActivePriority int64 ) - pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool { - c = data.(*ppNodeInfo) + pp.activeQueue.MultiPop(func(c *ppNodeInfo, priority int64) bool { + lastNode = c pp.setTempState(c) maxActivePriority = priority if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount { @@ -433,7 +439,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) { } return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount }) - return c, invertPriority(maxActivePriority) + return lastNode, invertPriority(maxActivePriority) } // finalizeChanges either commits or reverts temporary changes. The necessary capacity @@ -442,8 +448,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) { func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) { for _, c := range pp.tempState { // always remove and push back in order to update biased priority - pp.activeQueue.Remove(c.activeIndex) - pp.inactiveQueue.Remove(c.inactiveIndex) + pp.removeFromQueues(c) oldCapacity := c.capacity if commit { c.capacity = c.tempCapacity @@ -496,7 +501,7 @@ func (pp *priorityPool) updateFlags(updates []capUpdate) { // tryActivate tries to activate inactive nodes if possible func (pp *priorityPool) tryActivate(commit bool) []capUpdate { for pp.inactiveQueue.Size() > 0 { - c := pp.inactiveQueue.PopItem().(*ppNodeInfo) + c := pp.inactiveQueue.PopItem() pp.setTempState(c) pp.setTempBias(c, pp.activeBias) pp.setTempCapacity(c, pp.minCap) @@ -524,8 +529,7 @@ func (pp *priorityPool) updatePriority(node *enode.Node) { pp.lock.Unlock() return } - pp.activeQueue.Remove(c.activeIndex) - pp.inactiveQueue.Remove(c.inactiveIndex) + pp.removeFromQueues(c) if c.capacity != 0 { pp.activeQueue.Push(c) } else { diff --git a/trie/sync.go b/trie/sync.go index 46478b033a..4bf735c02f 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -160,7 +160,7 @@ type Sync struct { membatch *syncMemBatch // Memory buffer to avoid frequent database writes nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash - queue *prque.Prque // Priority queue with the pending requests + queue *prque.Prque[int64, any] // Priority queue with the pending requests fetches map[int]int // Number of active fetches per trie node depth } @@ -172,7 +172,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb membatch: newSyncMemBatch(), nodeReqs: make(map[string]*nodeRequest), codeReqs: make(map[common.Hash]*codeRequest), - queue: prque.New(nil), + queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy fetches: make(map[int]int), } ts.AddSubTrie(root, nil, common.Hash{}, nil, callback)