From e3f3e015042504d0d304ff8979f229a1752ca026 Mon Sep 17 00:00:00 2001 From: ucwong Date: Wed, 23 Aug 2023 20:19:04 +0100 Subject: [PATCH] les: use new atomic types (#27856) Co-authored-by: Felix Lange --- les/api_test.go | 13 ++++++------- les/costtracker.go | 10 +++++----- les/servingqueue.go | 13 +++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/les/api_test.go b/les/api_test.go index 88a950afce..484c95504c 100644 --- a/les/api_test.go +++ b/les/api_test.go @@ -147,7 +147,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { var wg sync.WaitGroup stop := make(chan struct{}) - reqCount := make([]uint64, len(clientRpcClients)) + reqCount := make([]atomic.Uint64, len(clientRpcClients)) // Send light request like crazy. for i, c := range clientRpcClients { @@ -157,7 +157,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { defer wg.Done() queue := make(chan struct{}, 100) - reqCount[i] = 0 + reqCount[i].Store(0) for { select { case queue <- struct{}{}: @@ -173,8 +173,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { wg.Done() <-queue if ok { - count := atomic.AddUint64(&reqCount[i], 1) - if count%10000 == 0 { + if reqCount[i].Add(1)%10000 == 0 { freezeClient(ctx, t, serverRpcClient, clients[i].ID()) } } @@ -192,7 +191,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { processedSince := func(start []uint64) []uint64 { res := make([]uint64, len(reqCount)) for i := range reqCount { - res[i] = atomic.LoadUint64(&reqCount[i]) + res[i] = reqCount[i].Load() if start != nil { res[i] -= start[i] } @@ -292,8 +291,8 @@ func testCapacityAPI(t *testing.T, clientCount int) { close(stop) wg.Wait() - for i, count := range reqCount { - t.Log("client", i, "processed", count) + for i := range reqCount { + t.Log("client", i, "processed", reqCount[i].Load()) } return true }) { diff --git a/les/costtracker.go b/les/costtracker.go index 43e32a5b2d..695d54e141 100644 --- a/les/costtracker.go +++ b/les/costtracker.go @@ -128,7 +128,7 @@ type costTracker struct { reqInfoCh chan reqInfo totalRechargeCh chan uint64 - stats map[uint64][]uint64 // Used for testing purpose. + stats map[uint64][]atomic.Uint64 // Used for testing purpose. // TestHooks testing bool // Disable real cost evaluation for testing purpose. @@ -152,9 +152,9 @@ func newCostTracker(db ethdb.Database, config *ethconfig.Config) (*costTracker, ct.outSizeFactor = utilTarget / float64(config.LightEgress) } if makeCostStats { - ct.stats = make(map[uint64][]uint64) + ct.stats = make(map[uint64][]atomic.Uint64) for code := range reqAvgTimeCost { - ct.stats[code] = make([]uint64, 10) + ct.stats[code] = make([]atomic.Uint64, 10) } } ct.gfLoop() @@ -423,7 +423,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) { l++ realCost >>= 1 } - atomic.AddUint64(&ct.stats[code][l], 1) + ct.stats[code][l].Add(1) } } @@ -454,7 +454,7 @@ func (ct *costTracker) printStats() { return } for code, arr := range ct.stats { - log.Info("Request cost statistics", "code", code, "1/16", arr[0], "1/8", arr[1], "1/4", arr[2], "1/2", arr[3], "1", arr[4], "2", arr[5], "4", arr[6], "8", arr[7], "16", arr[8], ">16", arr[9]) + log.Info("Request cost statistics", "code", code, "1/16", arr[0].Load(), "1/8", arr[1].Load(), "1/4", arr[2].Load(), "1/2", arr[3].Load(), "1", arr[4].Load(), "2", arr[5].Load(), "4", arr[6].Load(), "8", arr[7].Load(), "16", arr[8].Load(), ">16", arr[9].Load()) } } diff --git a/les/servingqueue.go b/les/servingqueue.go index 68cad9cb5a..b258fc3caf 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -28,10 +28,11 @@ import ( // servingQueue allows running tasks in a limited number of threads and puts the // waiting tasks in a priority queue type servingQueue struct { - recentTime, queuedTime, servingTimeDiff uint64 - burstLimit, burstDropLimit uint64 - burstDecRate float64 - lastUpdate mclock.AbsTime + recentTime, queuedTime uint64 + servingTimeDiff atomic.Uint64 + burstLimit, burstDropLimit uint64 + burstDecRate float64 + lastUpdate mclock.AbsTime queueAddCh, queueBestCh chan *servingTask stopThreadCh, quit chan struct{} @@ -100,7 +101,7 @@ func (t *servingTask) done() uint64 { t.timeAdded = t.servingTime if t.expTime > diff { t.expTime -= diff - atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime) + t.sq.servingTimeDiff.Add(t.expTime) } else { t.expTime = 0 } @@ -249,7 +250,7 @@ func (sq *servingQueue) freezePeers() { // updateRecentTime recalculates the recent serving time value func (sq *servingQueue) updateRecentTime() { - subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0) + subTime := sq.servingTimeDiff.Swap(0) now := mclock.Now() dt := now - sq.lastUpdate sq.lastUpdate = now