les: use new atomic types (#27856)

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/27995/head
ucwong 1 year ago committed by GitHub
parent f0f8703bf2
commit e3f3e01504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      les/api_test.go
  2. 10
      les/costtracker.go
  3. 13
      les/servingqueue.go

@ -147,7 +147,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
var wg sync.WaitGroup var wg sync.WaitGroup
stop := make(chan struct{}) stop := make(chan struct{})
reqCount := make([]uint64, len(clientRpcClients)) reqCount := make([]atomic.Uint64, len(clientRpcClients))
// Send light request like crazy. // Send light request like crazy.
for i, c := range clientRpcClients { for i, c := range clientRpcClients {
@ -157,7 +157,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
defer wg.Done() defer wg.Done()
queue := make(chan struct{}, 100) queue := make(chan struct{}, 100)
reqCount[i] = 0 reqCount[i].Store(0)
for { for {
select { select {
case queue <- struct{}{}: case queue <- struct{}{}:
@ -173,8 +173,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
wg.Done() wg.Done()
<-queue <-queue
if ok { if ok {
count := atomic.AddUint64(&reqCount[i], 1) if reqCount[i].Add(1)%10000 == 0 {
if count%10000 == 0 {
freezeClient(ctx, t, serverRpcClient, clients[i].ID()) freezeClient(ctx, t, serverRpcClient, clients[i].ID())
} }
} }
@ -192,7 +191,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
processedSince := func(start []uint64) []uint64 { processedSince := func(start []uint64) []uint64 {
res := make([]uint64, len(reqCount)) res := make([]uint64, len(reqCount))
for i := range reqCount { for i := range reqCount {
res[i] = atomic.LoadUint64(&reqCount[i]) res[i] = reqCount[i].Load()
if start != nil { if start != nil {
res[i] -= start[i] res[i] -= start[i]
} }
@ -292,8 +291,8 @@ func testCapacityAPI(t *testing.T, clientCount int) {
close(stop) close(stop)
wg.Wait() wg.Wait()
for i, count := range reqCount { for i := range reqCount {
t.Log("client", i, "processed", count) t.Log("client", i, "processed", reqCount[i].Load())
} }
return true return true
}) { }) {

@ -128,7 +128,7 @@ type costTracker struct {
reqInfoCh chan reqInfo reqInfoCh chan reqInfo
totalRechargeCh chan uint64 totalRechargeCh chan uint64
stats map[uint64][]uint64 // Used for testing purpose. stats map[uint64][]atomic.Uint64 // Used for testing purpose.
// TestHooks // TestHooks
testing bool // Disable real cost evaluation for testing purpose. testing bool // Disable real cost evaluation for testing purpose.
@ -152,9 +152,9 @@ func newCostTracker(db ethdb.Database, config *ethconfig.Config) (*costTracker,
ct.outSizeFactor = utilTarget / float64(config.LightEgress) ct.outSizeFactor = utilTarget / float64(config.LightEgress)
} }
if makeCostStats { if makeCostStats {
ct.stats = make(map[uint64][]uint64) ct.stats = make(map[uint64][]atomic.Uint64)
for code := range reqAvgTimeCost { for code := range reqAvgTimeCost {
ct.stats[code] = make([]uint64, 10) ct.stats[code] = make([]atomic.Uint64, 10)
} }
} }
ct.gfLoop() ct.gfLoop()
@ -423,7 +423,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
l++ l++
realCost >>= 1 realCost >>= 1
} }
atomic.AddUint64(&ct.stats[code][l], 1) ct.stats[code][l].Add(1)
} }
} }
@ -454,7 +454,7 @@ func (ct *costTracker) printStats() {
return return
} }
for code, arr := range ct.stats { for code, arr := range ct.stats {
log.Info("Request cost statistics", "code", code, "1/16", arr[0], "1/8", arr[1], "1/4", arr[2], "1/2", arr[3], "1", arr[4], "2", arr[5], "4", arr[6], "8", arr[7], "16", arr[8], ">16", arr[9]) log.Info("Request cost statistics", "code", code, "1/16", arr[0].Load(), "1/8", arr[1].Load(), "1/4", arr[2].Load(), "1/2", arr[3].Load(), "1", arr[4].Load(), "2", arr[5].Load(), "4", arr[6].Load(), "8", arr[7].Load(), "16", arr[8].Load(), ">16", arr[9].Load())
} }
} }

@ -28,10 +28,11 @@ import (
// servingQueue allows running tasks in a limited number of threads and puts the // servingQueue allows running tasks in a limited number of threads and puts the
// waiting tasks in a priority queue // waiting tasks in a priority queue
type servingQueue struct { type servingQueue struct {
recentTime, queuedTime, servingTimeDiff uint64 recentTime, queuedTime uint64
burstLimit, burstDropLimit uint64 servingTimeDiff atomic.Uint64
burstDecRate float64 burstLimit, burstDropLimit uint64
lastUpdate mclock.AbsTime burstDecRate float64
lastUpdate mclock.AbsTime
queueAddCh, queueBestCh chan *servingTask queueAddCh, queueBestCh chan *servingTask
stopThreadCh, quit chan struct{} stopThreadCh, quit chan struct{}
@ -100,7 +101,7 @@ func (t *servingTask) done() uint64 {
t.timeAdded = t.servingTime t.timeAdded = t.servingTime
if t.expTime > diff { if t.expTime > diff {
t.expTime -= diff t.expTime -= diff
atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime) t.sq.servingTimeDiff.Add(t.expTime)
} else { } else {
t.expTime = 0 t.expTime = 0
} }
@ -249,7 +250,7 @@ func (sq *servingQueue) freezePeers() {
// updateRecentTime recalculates the recent serving time value // updateRecentTime recalculates the recent serving time value
func (sq *servingQueue) updateRecentTime() { func (sq *servingQueue) updateRecentTime() {
subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0) subTime := sq.servingTimeDiff.Swap(0)
now := mclock.Now() now := mclock.Now()
dt := now - sq.lastUpdate dt := now - sq.lastUpdate
sq.lastUpdate = now sq.lastUpdate = now

Loading…
Cancel
Save