|
|
|
@ -115,7 +115,7 @@ type queue struct { |
|
|
|
|
// Headers are "special", they download in batches, supported by a skeleton chain
|
|
|
|
|
headerHead common.Hash // Hash of the last queued header to verify order
|
|
|
|
|
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
|
|
|
|
|
headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for
|
|
|
|
|
headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
|
|
|
|
|
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
|
|
|
|
|
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
|
|
|
|
|
headerResults []*types.Header // Result cache accumulating the completed headers
|
|
|
|
@ -124,13 +124,13 @@ type queue struct { |
|
|
|
|
headerContCh chan bool // Channel to notify when header download finishes
|
|
|
|
|
|
|
|
|
|
// All data retrievals below are based on an already assembles header chain
|
|
|
|
|
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
|
|
|
|
blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for
|
|
|
|
|
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
|
|
|
|
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
|
|
|
|
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
|
|
|
|
|
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
|
|
|
|
|
|
|
|
|
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
|
|
|
|
receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for
|
|
|
|
|
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
|
|
|
|
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
|
|
|
|
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
|
|
|
|
|
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
|
|
|
|
|
|
|
|
|
resultCache *resultStore // Downloaded but not yet delivered fetch results
|
|
|
|
|
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
|
|
|
|
@ -147,8 +147,8 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { |
|
|
|
|
lock := new(sync.RWMutex) |
|
|
|
|
q := &queue{ |
|
|
|
|
headerContCh: make(chan bool), |
|
|
|
|
blockTaskQueue: prque.New[int64, *types.Header](nil), |
|
|
|
|
receiptTaskQueue: prque.New[int64, *types.Header](nil), |
|
|
|
|
blockTaskQueue: prque.New(nil), |
|
|
|
|
receiptTaskQueue: prque.New(nil), |
|
|
|
|
active: sync.NewCond(lock), |
|
|
|
|
lock: lock, |
|
|
|
|
} |
|
|
|
@ -262,7 +262,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { |
|
|
|
|
} |
|
|
|
|
// Schedule all the header retrieval tasks for the skeleton assembly
|
|
|
|
|
q.headerTaskPool = make(map[uint64]*types.Header) |
|
|
|
|
q.headerTaskQueue = prque.New[int64, uint64](nil) |
|
|
|
|
q.headerTaskQueue = prque.New(nil) |
|
|
|
|
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
|
|
|
|
|
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) |
|
|
|
|
q.headerProced = 0 |
|
|
|
@ -424,12 +424,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { |
|
|
|
|
for send == 0 && !q.headerTaskQueue.Empty() { |
|
|
|
|
from, _ := q.headerTaskQueue.Pop() |
|
|
|
|
if q.headerPeerMiss[p.id] != nil { |
|
|
|
|
if _, ok := q.headerPeerMiss[p.id][from]; ok { |
|
|
|
|
skip = append(skip, from) |
|
|
|
|
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { |
|
|
|
|
skip = append(skip, from.(uint64)) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
send = from |
|
|
|
|
send = from.(uint64) |
|
|
|
|
} |
|
|
|
|
// Merge all the skipped batches back
|
|
|
|
|
for _, from := range skip { |
|
|
|
@ -481,7 +481,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo |
|
|
|
|
// item - the fetchRequest
|
|
|
|
|
// progress - whether any progress was made
|
|
|
|
|
// throttle - if the caller should throttle for a while
|
|
|
|
|
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], |
|
|
|
|
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, |
|
|
|
|
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) { |
|
|
|
|
// Short circuit if the pool has been depleted, or if the peer's already
|
|
|
|
|
// downloading something (sanity check not to corrupt state)
|
|
|
|
@ -499,8 +499,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common |
|
|
|
|
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ { |
|
|
|
|
// the task queue will pop items in order, so the highest prio block
|
|
|
|
|
// is also the lowest block number.
|
|
|
|
|
header, _ := taskQueue.Peek() |
|
|
|
|
|
|
|
|
|
h, _ := taskQueue.Peek() |
|
|
|
|
header := h.(*types.Header) |
|
|
|
|
// we can ask the resultcache if this header is within the
|
|
|
|
|
// "prioritized" segment of blocks. If it is not, we need to throttle
|
|
|
|
|
|
|
|
|
@ -591,12 +591,12 @@ func (q *queue) CancelReceipts(request *fetchRequest) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
|
|
|
|
|
func (q *queue) cancel(request *fetchRequest, taskQueue interface{}, pendPool map[string]*fetchRequest) { |
|
|
|
|
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { |
|
|
|
|
if request.From > 0 { |
|
|
|
|
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) |
|
|
|
|
taskQueue.Push(request.From, -int64(request.From)) |
|
|
|
|
} |
|
|
|
|
for _, header := range request.Headers { |
|
|
|
|
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) |
|
|
|
|
taskQueue.Push(header, -int64(header.Number.Uint64())) |
|
|
|
|
} |
|
|
|
|
delete(pendPool, request.Peer.id) |
|
|
|
|
} |
|
|
|
@ -655,7 +655,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { |
|
|
|
|
// Note, this method expects the queue lock to be already held. The
|
|
|
|
|
// reason the lock is not obtained in here is because the parameters already need
|
|
|
|
|
// to access the queue, so they already need a lock anyway.
|
|
|
|
|
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue interface{}, timeoutMeter metrics.Meter) map[string]int { |
|
|
|
|
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { |
|
|
|
|
// Iterate over the expired requests and return each to the queue
|
|
|
|
|
expiries := make(map[string]int) |
|
|
|
|
for id, request := range pendPool { |
|
|
|
@ -665,10 +665,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, |
|
|
|
|
|
|
|
|
|
// Return any non satisfied requests to the pool
|
|
|
|
|
if request.From > 0 { |
|
|
|
|
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) |
|
|
|
|
taskQueue.Push(request.From, -int64(request.From)) |
|
|
|
|
} |
|
|
|
|
for _, header := range request.Headers { |
|
|
|
|
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) |
|
|
|
|
taskQueue.Push(header, -int64(header.Number.Uint64())) |
|
|
|
|
} |
|
|
|
|
// Add the peer to the expiry report along the number of failed requests
|
|
|
|
|
expiries[id] = len(request.Headers) |
|
|
|
@ -831,7 +831,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, |
|
|
|
|
// reason this lock is not obtained in here is because the parameters already need
|
|
|
|
|
// to access the queue, so they already need a lock anyway.
|
|
|
|
|
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, |
|
|
|
|
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer metrics.Timer, |
|
|
|
|
taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer, |
|
|
|
|
results int, validate func(index int, header *types.Header) error, |
|
|
|
|
reconstruct func(index int, result *fetchResult)) (int, error) { |
|
|
|
|
// Short circuit if the data was never requested
|
|
|
|
|