From a7de796840eeb267e298bcc98cdaa3a538234bef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Sat, 3 Aug 2019 14:36:10 +0200 Subject: [PATCH] les: implement new client pool (#19745) --- common/mclock/mclock.go | 19 ++ common/mclock/simclock.go | 59 +++- common/prque/lazyqueue.go | 182 ++++++++++ common/prque/lazyqueue_test.go | 119 +++++++ les/api_test.go | 94 +++--- les/balance.go | 381 +++++++++++++++++++++ les/clientpool.go | 599 +++++++++++++++++++++++++++++++++ les/clientpool_test.go | 180 ++++++++++ les/commons.go | 3 +- les/freeclient.go | 351 ------------------- les/freeclient_test.go | 145 -------- les/handler.go | 9 + les/metrics.go | 36 +- les/peer.go | 33 +- les/server.go | 27 +- 15 files changed, 1651 insertions(+), 586 deletions(-) create mode 100644 common/prque/lazyqueue.go create mode 100644 common/prque/lazyqueue_test.go create mode 100644 les/balance.go create mode 100644 les/clientpool.go create mode 100644 les/clientpool_test.go delete mode 100644 les/freeclient.go delete mode 100644 les/freeclient_test.go diff --git a/common/mclock/mclock.go b/common/mclock/mclock.go index dcac59c6ce..0c941082f3 100644 --- a/common/mclock/mclock.go +++ b/common/mclock/mclock.go @@ -42,6 +42,12 @@ type Clock interface { Now() AbsTime Sleep(time.Duration) After(time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) Event +} + +// Event represents a cancellable event returned by AfterFunc +type Event interface { + Cancel() bool } // System implements Clock using the system clock. @@ -61,3 +67,16 @@ func (System) Sleep(d time.Duration) { func (System) After(d time.Duration) <-chan time.Time { return time.After(d) } + +// AfterFunc implements Clock. +func (System) AfterFunc(d time.Duration, f func()) Event { + return (*SystemEvent)(time.AfterFunc(d, f)) +} + +// SystemEvent implements Event using time.Timer. +type SystemEvent time.Timer + +// Cancel implements Event. +func (e *SystemEvent) Cancel() bool { + return (*time.Timer)(e).Stop() +} diff --git a/common/mclock/simclock.go b/common/mclock/simclock.go index e014f56150..af0f71c430 100644 --- a/common/mclock/simclock.go +++ b/common/mclock/simclock.go @@ -35,30 +35,44 @@ type Simulated struct { scheduled []event mu sync.RWMutex cond *sync.Cond + lastId uint64 } type event struct { do func() at AbsTime + id uint64 +} + +// SimulatedEvent implements Event for a virtual clock. +type SimulatedEvent struct { + at AbsTime + id uint64 + s *Simulated } // Run moves the clock by the given duration, executing all timers before that duration. func (s *Simulated) Run(d time.Duration) { s.mu.Lock() - defer s.mu.Unlock() s.init() end := s.now + AbsTime(d) + var do []func() for len(s.scheduled) > 0 { ev := s.scheduled[0] if ev.at > end { break } s.now = ev.at - ev.do() + do = append(do, ev.do) s.scheduled = s.scheduled[1:] } s.now = end + s.mu.Unlock() + + for _, fn := range do { + fn() + } } func (s *Simulated) ActiveTimers() int { @@ -94,23 +108,26 @@ func (s *Simulated) Sleep(d time.Duration) { // After implements Clock. func (s *Simulated) After(d time.Duration) <-chan time.Time { after := make(chan time.Time, 1) - s.insert(d, func() { + s.AfterFunc(d, func() { after <- (time.Time{}).Add(time.Duration(s.now)) }) return after } -func (s *Simulated) insert(d time.Duration, do func()) { +// AfterFunc implements Clock. +func (s *Simulated) AfterFunc(d time.Duration, do func()) Event { s.mu.Lock() defer s.mu.Unlock() s.init() at := s.now + AbsTime(d) + s.lastId++ + id := s.lastId l, h := 0, len(s.scheduled) ll := h for l != h { m := (l + h) / 2 - if at < s.scheduled[m].at { + if (at < s.scheduled[m].at) || ((at == s.scheduled[m].at) && (id < s.scheduled[m].id)) { h = m } else { l = m + 1 @@ -118,8 +135,10 @@ func (s *Simulated) insert(d time.Duration, do func()) { } s.scheduled = append(s.scheduled, event{}) copy(s.scheduled[l+1:], s.scheduled[l:ll]) - s.scheduled[l] = event{do: do, at: at} + e := event{do: do, at: at, id: id} + s.scheduled[l] = e s.cond.Broadcast() + return &SimulatedEvent{at: at, id: id, s: s} } func (s *Simulated) init() { @@ -127,3 +146,31 @@ func (s *Simulated) init() { s.cond = sync.NewCond(&s.mu) } } + +// Cancel implements Event. +func (e *SimulatedEvent) Cancel() bool { + s := e.s + s.mu.Lock() + defer s.mu.Unlock() + + l, h := 0, len(s.scheduled) + ll := h + for l != h { + m := (l + h) / 2 + if e.id == s.scheduled[m].id { + l = m + break + } + if (e.at < s.scheduled[m].at) || ((e.at == s.scheduled[m].at) && (e.id < s.scheduled[m].id)) { + h = m + } else { + l = m + 1 + } + } + if l >= ll || s.scheduled[l].id != e.id { + return false + } + copy(s.scheduled[l:ll-1], s.scheduled[l+1:]) + s.scheduled = s.scheduled[:ll-1] + return true +} diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go new file mode 100644 index 0000000000..92ddd77f67 --- /dev/null +++ b/common/prque/lazyqueue.go @@ -0,0 +1,182 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package prque + +import ( + "container/heap" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +// LazyQueue is a priority queue data structure where priorities can change over +// time and are only evaluated on demand. +// Two callbacks are required: +// - priority evaluates the actual priority of an item +// - maxPriority gives an upper estimate for the priority in any moment between +// now and the given absolute time +// If the upper estimate is exceeded then Update should be called for that item. +// A global Refresh function should also be called periodically. +type LazyQueue struct { + clock mclock.Clock + // Items are stored in one of two internal queues ordered by estimated max + // priority until the next and the next-after-next refresh. Update and Refresh + // always places items in queue[1]. + queue [2]*sstack + popQueue *sstack + period time.Duration + maxUntil mclock.AbsTime + indexOffset int + setIndex SetIndexCallback + priority PriorityCallback + maxPriority MaxPriorityCallback +} + +type ( + PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback + MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback +) + +// NewLazyQueue creates a new lazy queue +func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { + q := &LazyQueue{ + popQueue: newSstack(nil), + setIndex: setIndex, + priority: priority, + maxPriority: maxPriority, + clock: clock, + period: refreshPeriod} + q.Reset() + q.Refresh() + return q +} + +// Reset clears the contents of the queue +func (q *LazyQueue) Reset() { + q.queue[0] = newSstack(q.setIndex0) + q.queue[1] = newSstack(q.setIndex1) +} + +// Refresh should be called at least with the frequency specified by the refreshPeriod parameter +func (q *LazyQueue) Refresh() { + q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period) + for q.queue[0].Len() != 0 { + q.Push(heap.Pop(q.queue[0]).(*item).value) + } + q.queue[0], q.queue[1] = q.queue[1], q.queue[0] + q.indexOffset = 1 - q.indexOffset + q.maxUntil += mclock.AbsTime(q.period) +} + +// Push adds an item to the queue +func (q *LazyQueue) Push(data interface{}) { + heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)}) +} + +// Update updates the upper priority estimate for the item with the given queue index +func (q *LazyQueue) Update(index int) { + q.Push(q.Remove(index)) +} + +// Pop removes and returns the item with the greatest actual priority +func (q *LazyQueue) Pop() (interface{}, int64) { + var ( + resData interface{} + resPri int64 + ) + q.MultiPop(func(data interface{}, priority int64) bool { + resData = data + resPri = priority + return false + }) + return resData, resPri +} + +// peekIndex returns the index of the internal queue where the item with the +// highest estimated priority is or -1 if both are empty +func (q *LazyQueue) peekIndex() int { + if q.queue[0].Len() != 0 { + if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority { + return 1 + } + return 0 + } + if q.queue[1].Len() != 0 { + return 1 + } + return -1 +} + +// MultiPop pops multiple items from the queue and is more efficient than calling +// Pop multiple times. Popped items are passed to the callback. MultiPop returns +// when the callback returns false or there are no more items to pop. +func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) { + now := q.clock.Now() + nextIndex := q.peekIndex() + for nextIndex != -1 { + data := heap.Pop(q.queue[nextIndex]).(*item).value + heap.Push(q.popQueue, &item{data, q.priority(data, now)}) + nextIndex = q.peekIndex() + for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) { + i := heap.Pop(q.popQueue).(*item) + if !callback(i.value, i.priority) { + for q.popQueue.Len() != 0 { + q.Push(heap.Pop(q.popQueue).(*item).value) + } + return + } + } + } +} + +// PopItem pops the item from the queue only, dropping the associated priority value. +func (q *LazyQueue) PopItem() interface{} { + i, _ := q.Pop() + return i +} + +// Remove removes removes the item with the given index. +func (q *LazyQueue) Remove(index int) interface{} { + if index < 0 { + return nil + } + return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value +} + +// Empty checks whether the priority queue is empty. +func (q *LazyQueue) Empty() bool { + return q.queue[0].Len() == 0 && q.queue[1].Len() == 0 +} + +// Size returns the number of items in the priority queue. +func (q *LazyQueue) Size() int { + return q.queue[0].Len() + q.queue[1].Len() +} + +// setIndex0 translates internal queue item index to the virtual index space of LazyQueue +func (q *LazyQueue) setIndex0(data interface{}, index int) { + if index == -1 { + q.setIndex(data, -1) + } else { + q.setIndex(data, index+index) + } +} + +// setIndex1 translates internal queue item index to the virtual index space of LazyQueue +func (q *LazyQueue) setIndex1(data interface{}, index int) { + q.setIndex(data, index+index+1) +} diff --git a/common/prque/lazyqueue_test.go b/common/prque/lazyqueue_test.go new file mode 100644 index 0000000000..0bd4fc6597 --- /dev/null +++ b/common/prque/lazyqueue_test.go @@ -0,0 +1,119 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package prque + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +const ( + testItems = 1000 + testPriorityStep = 100 + testSteps = 1000000 + testStepPeriod = time.Millisecond + testQueueRefresh = time.Second + testAvgRate = float64(testPriorityStep) / float64(testItems) / float64(testStepPeriod) +) + +type lazyItem struct { + p, maxp int64 + last mclock.AbsTime + index int +} + +func testPriority(a interface{}, now mclock.AbsTime) int64 { + return a.(*lazyItem).p +} + +func testMaxPriority(a interface{}, until mclock.AbsTime) int64 { + i := a.(*lazyItem) + dt := until - i.last + i.maxp = i.p + int64(float64(dt)*testAvgRate) + return i.maxp +} + +func testSetIndex(a interface{}, i int) { + a.(*lazyItem).index = i +} + +func TestLazyQueue(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + clock := &mclock.Simulated{} + q := NewLazyQueue(testSetIndex, testPriority, testMaxPriority, clock, testQueueRefresh) + + var ( + items [testItems]lazyItem + maxPri int64 + ) + + for i := range items[:] { + items[i].p = rand.Int63n(testPriorityStep * 10) + if items[i].p > maxPri { + maxPri = items[i].p + } + items[i].index = -1 + q.Push(&items[i]) + } + + var lock sync.Mutex + stopCh := make(chan chan struct{}) + go func() { + for { + select { + case <-clock.After(testQueueRefresh): + lock.Lock() + q.Refresh() + lock.Unlock() + case stop := <-stopCh: + close(stop) + return + } + } + }() + + for c := 0; c < testSteps; c++ { + i := rand.Intn(testItems) + lock.Lock() + items[i].p += rand.Int63n(testPriorityStep*2-1) + 1 + if items[i].p > maxPri { + maxPri = items[i].p + } + items[i].last = clock.Now() + if items[i].p > items[i].maxp { + q.Update(items[i].index) + } + if rand.Intn(100) == 0 { + p := q.PopItem().(*lazyItem) + if p.p != maxPri { + t.Fatalf("incorrect item (best known priority %d, popped %d)", maxPri, p.p) + } + q.Push(p) + } + lock.Unlock() + clock.Run(testStepPeriod) + clock.WaitForTimers(1) + } + + stop := make(chan struct{}) + stopCh <- stop + <-stop +} diff --git a/les/api_test.go b/les/api_test.go index 8b39a4e176..6e622313ca 100644 --- a/les/api_test.go +++ b/les/api_test.go @@ -96,17 +96,14 @@ func testCapacityAPI(t *testing.T, clientCount int) { t.Fatalf("Failed to obtain rpc client: %v", err) } headNum, headHash := getHead(ctx, t, serverRpcClient) - totalCap := getTotalCap(ctx, t, serverRpcClient) - minCap := getMinCap(ctx, t, serverRpcClient) + minCap, freeCap, totalCap := getCapacityInfo(ctx, t, serverRpcClient) testCap := totalCap * 3 / 4 t.Logf("Server testCap: %d minCap: %d head number: %d head hash: %064x\n", testCap, minCap, headNum, headHash) reqMinCap := uint64(float64(testCap) * minRelCap / (minRelCap + float64(len(clients)-1))) if minCap > reqMinCap { t.Fatalf("Minimum client capacity (%d) bigger than required minimum for this test (%d)", minCap, reqMinCap) } - freeIdx := rand.Intn(len(clients)) - freeCap := getFreeCap(ctx, t, serverRpcClient) for i, client := range clients { var err error @@ -146,7 +143,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { i, c := i, c go func() { queue := make(chan struct{}, 100) - var count uint64 + reqCount[i] = 0 for { select { case queue <- struct{}{}: @@ -164,8 +161,10 @@ func testCapacityAPI(t *testing.T, clientCount int) { wg.Done() <-queue if ok { - count++ - atomic.StoreUint64(&reqCount[i], count) + count := atomic.AddUint64(&reqCount[i], 1) + if count%10000 == 0 { + freezeClient(ctx, t, serverRpcClient, clients[i].ID()) + } } }() } @@ -238,7 +237,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { default: } - totalCap = getTotalCap(ctx, t, serverRpcClient) + _, _, totalCap = getCapacityInfo(ctx, t, serverRpcClient) if totalCap < testCap { t.Log("Total capacity underrun") close(stop) @@ -327,58 +326,61 @@ func testRequest(ctx context.Context, t *testing.T, client *rpc.Client) bool { return err == nil } -func setCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID, cap uint64) { - if err := server.CallContext(ctx, nil, "les_setClientCapacity", clientID, cap); err != nil { - t.Fatalf("Failed to set client capacity: %v", err) +func freezeClient(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) { + if err := server.CallContext(ctx, nil, "debug_freezeClient", clientID); err != nil { + t.Fatalf("Failed to freeze client: %v", err) } + } -func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) uint64 { - var s string - if err := server.CallContext(ctx, &s, "les_getClientCapacity", clientID); err != nil { - t.Fatalf("Failed to get client capacity: %v", err) - } - cap, err := hexutil.DecodeUint64(s) - if err != nil { - t.Fatalf("Failed to decode client capacity: %v", err) +func setCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID, cap uint64) { + params := make(map[string]interface{}) + params["capacity"] = cap + if err := server.CallContext(ctx, nil, "les_setClientParams", []enode.ID{clientID}, []string{}, params); err != nil { + t.Fatalf("Failed to set client capacity: %v", err) } - return cap } -func getTotalCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 { - var s string - if err := server.CallContext(ctx, &s, "les_totalCapacity"); err != nil { - t.Fatalf("Failed to query total capacity: %v", err) +func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) uint64 { + var res map[enode.ID]map[string]interface{} + if err := server.CallContext(ctx, &res, "les_clientInfo", []enode.ID{clientID}, []string{}); err != nil { + t.Fatalf("Failed to get client info: %v", err) } - total, err := hexutil.DecodeUint64(s) - if err != nil { - t.Fatalf("Failed to decode total capacity: %v", err) + info, ok := res[clientID] + if !ok { + t.Fatalf("Missing client info") } - return total -} - -func getMinCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 { - var s string - if err := server.CallContext(ctx, &s, "les_minimumCapacity"); err != nil { - t.Fatalf("Failed to query minimum capacity: %v", err) + v, ok := info["capacity"] + if !ok { + t.Fatalf("Missing field in client info: capacity") } - min, err := hexutil.DecodeUint64(s) - if err != nil { - t.Fatalf("Failed to decode minimum capacity: %v", err) + vv, ok := v.(float64) + if !ok { + t.Fatalf("Failed to decode capacity field") } - return min + return uint64(vv) } -func getFreeCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 { - var s string - if err := server.CallContext(ctx, &s, "les_freeClientCapacity"); err != nil { - t.Fatalf("Failed to query free client capacity: %v", err) +func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (minCap, freeCap, totalCap uint64) { + var res map[string]interface{} + if err := server.CallContext(ctx, &res, "les_serverInfo"); err != nil { + t.Fatalf("Failed to query server info: %v", err) } - free, err := hexutil.DecodeUint64(s) - if err != nil { - t.Fatalf("Failed to decode free client capacity: %v", err) + decode := func(s string) uint64 { + v, ok := res[s] + if !ok { + t.Fatalf("Missing field in server info: %s", s) + } + vv, ok := v.(float64) + if !ok { + t.Fatalf("Failed to decode server info field: %s", s) + } + return uint64(vv) } - return free + minCap = decode("minimumCapacity") + freeCap = decode("freeClientCapacity") + totalCap = decode("totalCapacity") + return } func init() { diff --git a/les/balance.go b/les/balance.go new file mode 100644 index 0000000000..4f08a304eb --- /dev/null +++ b/les/balance.go @@ -0,0 +1,381 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +const ( + balanceCallbackQueue = iota + balanceCallbackZero + balanceCallbackCount +) + +// balanceTracker keeps track of the positive and negative balances of a connected +// client and calculates actual and projected future priority values required by +// prque.LazyQueue. +type balanceTracker struct { + lock sync.Mutex + clock mclock.Clock + stopped bool + capacity uint64 + balance balance + timeFactor, requestFactor float64 + negTimeFactor, negRequestFactor float64 + sumReqCost uint64 + lastUpdate, nextUpdate, initTime mclock.AbsTime + updateEvent mclock.Event + // since only a limited and fixed number of callbacks are needed, they are + // stored in a fixed size array ordered by priority threshold. + callbacks [balanceCallbackCount]balanceCallback + // callbackIndex maps balanceCallback constants to callbacks array indexes (-1 if not active) + callbackIndex [balanceCallbackCount]int + callbackCount int // number of active callbacks +} + +// balance represents a pair of positive and negative balances +type balance struct { + pos, neg uint64 +} + +// balanceCallback represents a single callback that is activated when client priority +// reaches the given threshold +type balanceCallback struct { + id int + threshold int64 + callback func() +} + +// init initializes balanceTracker +func (bt *balanceTracker) init(clock mclock.Clock, capacity uint64) { + bt.clock = clock + bt.initTime = clock.Now() + for i := range bt.callbackIndex { + bt.callbackIndex[i] = -1 + } + bt.capacity = capacity +} + +// stop shuts down the balance tracker +func (bt *balanceTracker) stop(now mclock.AbsTime) { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.stopped = true + bt.updateBalance(now) + bt.negTimeFactor = 0 + bt.negRequestFactor = 0 + bt.timeFactor = 0 + bt.requestFactor = 0 + if bt.updateEvent != nil { + bt.updateEvent.Cancel() + bt.updateEvent = nil + } +} + +// balanceToPriority converts a balance to a priority value. Higher priority means +// first to disconnect. Positive balance translates to negative priority. If positive +// balance is zero then negative balance translates to a positive priority. +func (bt *balanceTracker) balanceToPriority(b balance) int64 { + if b.pos > 0 { + return ^int64(b.pos / bt.capacity) + } + return int64(b.neg) +} + +// reducedBalance estimates the reduced balance at a given time in the fututre based +// on the current balance, the time factor and an estimated average request cost per time ratio +func (bt *balanceTracker) reducedBalance(at mclock.AbsTime, avgReqCost float64) balance { + dt := float64(at - bt.lastUpdate) + b := bt.balance + if b.pos != 0 { + factor := bt.timeFactor + bt.requestFactor*avgReqCost + diff := uint64(dt * factor) + if diff <= b.pos { + b.pos -= diff + dt = 0 + } else { + dt -= float64(b.pos) / factor + b.pos = 0 + } + } + if dt != 0 { + factor := bt.negTimeFactor + bt.negRequestFactor*avgReqCost + b.neg += uint64(dt * factor) + } + return b +} + +// timeUntil calculates the remaining time needed to reach a given priority level +// assuming that no requests are processed until then. If the given level is never +// reached then (0, false) is returned. +// Note: the function assumes that the balance has been recently updated and +// calculates the time starting from the last update. +func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) { + var dt float64 + if bt.balance.pos != 0 { + if bt.timeFactor < 1e-100 { + return 0, false + } + if priority < 0 { + newBalance := uint64(^priority) * bt.capacity + if newBalance > bt.balance.pos { + return 0, false + } + dt = float64(bt.balance.pos-newBalance) / bt.timeFactor + return time.Duration(dt), true + } else { + dt = float64(bt.balance.pos) / bt.timeFactor + } + } else { + if priority < 0 { + return 0, false + } + } + // if we have a positive balance then dt equals the time needed to get it to zero + if uint64(priority) > bt.balance.neg { + if bt.negTimeFactor < 1e-100 { + return 0, false + } + dt += float64(uint64(priority)-bt.balance.neg) / bt.negTimeFactor + } + return time.Duration(dt), true +} + +// getPriority returns the actual priority based on the current balance +func (bt *balanceTracker) getPriority(now mclock.AbsTime) int64 { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.updateBalance(now) + return bt.balanceToPriority(bt.balance) +} + +// estimatedPriority gives an upper estimate for the priority at a given time in the future. +// If addReqCost is true then an average request cost per time is assumed that is twice the +// average cost per time in the current session. If false, zero request cost is assumed. +func (bt *balanceTracker) estimatedPriority(at mclock.AbsTime, addReqCost bool) int64 { + bt.lock.Lock() + defer bt.lock.Unlock() + + var avgReqCost float64 + if addReqCost { + dt := time.Duration(bt.lastUpdate - bt.initTime) + if dt > time.Second { + avgReqCost = float64(bt.sumReqCost) * 2 / float64(dt) + } + } + return bt.balanceToPriority(bt.reducedBalance(at, avgReqCost)) +} + +// updateBalance updates balance based on the time factor +func (bt *balanceTracker) updateBalance(now mclock.AbsTime) { + if now > bt.lastUpdate { + bt.balance = bt.reducedBalance(now, 0) + bt.lastUpdate = now + } +} + +// checkCallbacks checks whether the threshold of any of the active callbacks +// have been reached and calls them if necessary. It also sets up or updates +// a scheduled event to ensure that is will be called again just after the next +// threshold has been reached. +// Note: checkCallbacks assumes that the balance has been recently updated. +func (bt *balanceTracker) checkCallbacks(now mclock.AbsTime) { + if bt.callbackCount == 0 { + return + } + pri := bt.balanceToPriority(bt.balance) + for bt.callbackCount != 0 && bt.callbacks[bt.callbackCount-1].threshold <= pri { + bt.callbackCount-- + bt.callbackIndex[bt.callbacks[bt.callbackCount].id] = -1 + go bt.callbacks[bt.callbackCount].callback() + } + if bt.callbackCount != 0 { + d, ok := bt.timeUntil(bt.callbacks[bt.callbackCount-1].threshold) + if !ok { + bt.nextUpdate = 0 + bt.updateAfter(0) + return + } + if bt.nextUpdate == 0 || bt.nextUpdate > now+mclock.AbsTime(d) { + if d > time.Second { + // Note: if the scheduled update is not in the very near future then we + // schedule the update a bit earlier. This way we do need to update a few + // extra times but don't need to reschedule every time a processed request + // brings the expected firing time a little bit closer. + d = ((d - time.Second) * 7 / 8) + time.Second + } + bt.nextUpdate = now + mclock.AbsTime(d) + bt.updateAfter(d) + } + } else { + bt.nextUpdate = 0 + bt.updateAfter(0) + } +} + +// updateAfter schedules a balance update and callback check in the future +func (bt *balanceTracker) updateAfter(dt time.Duration) { + if bt.updateEvent == nil || bt.updateEvent.Cancel() { + if dt == 0 { + bt.updateEvent = nil + } else { + bt.updateEvent = bt.clock.AfterFunc(dt, func() { + bt.lock.Lock() + defer bt.lock.Unlock() + + if bt.callbackCount != 0 { + now := bt.clock.Now() + bt.updateBalance(now) + bt.checkCallbacks(now) + } + }) + } + } +} + +// requestCost should be called after serving a request for the given peer +func (bt *balanceTracker) requestCost(cost uint64) { + bt.lock.Lock() + defer bt.lock.Unlock() + + if bt.stopped { + return + } + now := bt.clock.Now() + bt.updateBalance(now) + fcost := float64(cost) + + if bt.balance.pos != 0 { + if bt.requestFactor != 0 { + c := uint64(fcost * bt.requestFactor) + if bt.balance.pos >= c { + bt.balance.pos -= c + fcost = 0 + } else { + fcost *= 1 - float64(bt.balance.pos)/float64(c) + bt.balance.pos = 0 + } + bt.checkCallbacks(now) + } else { + fcost = 0 + } + } + if fcost > 0 { + if bt.negRequestFactor != 0 { + bt.balance.neg += uint64(fcost * bt.negRequestFactor) + bt.checkCallbacks(now) + } + } + bt.sumReqCost += cost +} + +// getBalance returns the current positive and negative balance +func (bt *balanceTracker) getBalance(now mclock.AbsTime) (uint64, uint64) { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.updateBalance(now) + return bt.balance.pos, bt.balance.neg +} + +// setBalance sets the positive and negative balance to the given values +func (bt *balanceTracker) setBalance(pos, neg uint64) error { + bt.lock.Lock() + defer bt.lock.Unlock() + + now := bt.clock.Now() + bt.updateBalance(now) + bt.balance.pos = pos + bt.balance.neg = neg + bt.checkCallbacks(now) + return nil +} + +// setFactors sets the price factors. timeFactor is the price of a nanosecond of +// connection while requestFactor is the price of a "realCost" unit. +func (bt *balanceTracker) setFactors(neg bool, timeFactor, requestFactor float64) { + bt.lock.Lock() + defer bt.lock.Unlock() + + if bt.stopped { + return + } + now := bt.clock.Now() + bt.updateBalance(now) + if neg { + bt.negTimeFactor = timeFactor + bt.negRequestFactor = requestFactor + } else { + bt.timeFactor = timeFactor + bt.requestFactor = requestFactor + } + bt.checkCallbacks(now) +} + +// setCallback sets up a one-time callback to be called when priority reaches +// the threshold. If it has already reached the threshold the callback is called +// immediately. +func (bt *balanceTracker) addCallback(id int, threshold int64, callback func()) { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.removeCb(id) + idx := 0 + for idx < bt.callbackCount && threshold < bt.callbacks[idx].threshold { + idx++ + } + for i := bt.callbackCount - 1; i >= idx; i-- { + bt.callbackIndex[bt.callbacks[i].id]++ + bt.callbacks[i+1] = bt.callbacks[i] + } + bt.callbackCount++ + bt.callbackIndex[id] = idx + bt.callbacks[idx] = balanceCallback{id, threshold, callback} + now := bt.clock.Now() + bt.updateBalance(now) + bt.checkCallbacks(now) +} + +// removeCallback removes the given callback and returns true if it was active +func (bt *balanceTracker) removeCallback(id int) bool { + bt.lock.Lock() + defer bt.lock.Unlock() + + return bt.removeCb(id) +} + +// removeCb removes the given callback and returns true if it was active +// Note: should be called while bt.lock is held +func (bt *balanceTracker) removeCb(id int) bool { + idx := bt.callbackIndex[id] + if idx == -1 { + return false + } + bt.callbackIndex[id] = -1 + for i := idx; i < bt.callbackCount-1; i++ { + bt.callbackIndex[bt.callbacks[i+1].id]-- + bt.callbacks[i] = bt.callbacks[i+1] + } + bt.callbackCount-- + return true +} diff --git a/les/clientpool.go b/les/clientpool.go new file mode 100644 index 0000000000..4ee2fd5da6 --- /dev/null +++ b/les/clientpool.go @@ -0,0 +1,599 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "io" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + negBalanceExpTC = time.Hour // time constant for exponentially reducing negative balance + fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format + connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon + lazyQueueRefresh = time.Second * 10 // refresh period of the connected queue +) + +var ( + clientPoolDbKey = []byte("clientPool") + clientBalanceDbKey = []byte("clientPool-balance") +) + +// clientPool implements a client database that assigns a priority to each client +// based on a positive and negative balance. Positive balance is externally assigned +// to prioritized clients and is decreased with connection time and processed +// requests (unless the price factors are zero). If the positive balance is zero +// then negative balance is accumulated. Balance tracking and priority calculation +// for connected clients is done by balanceTracker. connectedQueue ensures that +// clients with the lowest positive or highest negative balance get evicted when +// the total capacity allowance is full and new clients with a better balance want +// to connect. Already connected nodes receive a small bias in their favor in order +// to avoid accepting and instantly kicking out clients. +// Balances of disconnected clients are stored in posBalanceQueue and negBalanceQueue +// and are also saved in the database. Negative balance is transformed into a +// logarithmic form with a constantly shifting linear offset in order to implement +// an exponential decrease. negBalanceQueue has a limited size and drops the smallest +// values when necessary. Positive balances are stored in the database as long as +// they exist, posBalanceQueue only acts as a cache for recently accessed entries. +type clientPool struct { + db ethdb.Database + lock sync.Mutex + clock mclock.Clock + stopCh chan chan struct{} + closed bool + removePeer func(enode.ID) + + queueLimit, countLimit int + freeClientCap, capacityLimit, connectedCapacity uint64 + + connectedMap map[enode.ID]*clientInfo + posBalanceMap map[enode.ID]*posBalance + negBalanceMap map[string]*negBalance + connectedQueue *prque.LazyQueue + posBalanceQueue, negBalanceQueue *prque.Prque + posFactors, negFactors priceFactors + posBalanceAccessCounter int64 + startupTime mclock.AbsTime + logOffsetAtStartup int64 +} + +// clientPeer represents a client in the pool. +// Positive balances are assigned to node key while negative balances are assigned +// to freeClientId. Currently network IP address without port is used because +// clients have a limited access to IP addresses while new node keys can be easily +// generated so it would be useless to assign a negative value to them. +type clientPeer interface { + ID() enode.ID + freeClientId() string + updateCapacity(uint64) +} + +// clientInfo represents a connected client +type clientInfo struct { + address string + id enode.ID + capacity uint64 + priority bool + pool *clientPool + peer clientPeer + queueIndex int // position in connectedQueue + balanceTracker balanceTracker +} + +// connSetIndex callback updates clientInfo item index in connectedQueue +func connSetIndex(a interface{}, index int) { + a.(*clientInfo).queueIndex = index +} + +// connPriority callback returns actual priority of clientInfo item in connectedQueue +func connPriority(a interface{}, now mclock.AbsTime) int64 { + c := a.(*clientInfo) + return c.balanceTracker.getPriority(now) +} + +// connMaxPriority callback returns estimated maximum priority of clientInfo item in connectedQueue +func connMaxPriority(a interface{}, until mclock.AbsTime) int64 { + c := a.(*clientInfo) + pri := c.balanceTracker.estimatedPriority(until, true) + c.balanceTracker.addCallback(balanceCallbackQueue, pri+1, func() { + c.pool.lock.Lock() + if c.queueIndex != -1 { + c.pool.connectedQueue.Update(c.queueIndex) + } + c.pool.lock.Unlock() + }) + return pri +} + +// priceFactors determine the pricing policy (may apply either to positive or +// negative balances which may have different factors). +// - timeFactor is cost unit per nanosecond of connection time +// - capacityFactor is cost unit per nanosecond of connection time per 1000000 capacity +// - requestFactor is cost unit per request "realCost" unit +type priceFactors struct { + timeFactor, capacityFactor, requestFactor float64 +} + +// newClientPool creates a new client pool +func newClientPool(db ethdb.Database, freeClientCap uint64, queueLimit int, clock mclock.Clock, removePeer func(enode.ID)) *clientPool { + pool := &clientPool{ + db: db, + clock: clock, + connectedMap: make(map[enode.ID]*clientInfo), + posBalanceMap: make(map[enode.ID]*posBalance), + negBalanceMap: make(map[string]*negBalance), + connectedQueue: prque.NewLazyQueue(connSetIndex, connPriority, connMaxPriority, clock, lazyQueueRefresh), + negBalanceQueue: prque.New(negSetIndex), + posBalanceQueue: prque.New(posSetIndex), + freeClientCap: freeClientCap, + queueLimit: queueLimit, + removePeer: removePeer, + stopCh: make(chan chan struct{}), + } + pool.loadFromDb() + go func() { + for { + select { + case <-clock.After(lazyQueueRefresh): + pool.lock.Lock() + pool.connectedQueue.Refresh() + pool.lock.Unlock() + case stop := <-pool.stopCh: + close(stop) + return + } + } + }() + return pool +} + +// stop shuts the client pool down +func (f *clientPool) stop() { + stop := make(chan struct{}) + f.stopCh <- stop + <-stop + f.lock.Lock() + f.closed = true + f.saveToDb() + f.lock.Unlock() +} + +// registerPeer implements peerSetNotify +func (f *clientPool) registerPeer(p *peer) { + c := f.connect(p, 0) + if c != nil { + p.balanceTracker = &c.balanceTracker + } +} + +// connect should be called after a successful handshake. If the connection was +// rejected, there is no need to call disconnect. +func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return nil + } + address := peer.freeClientId() + id := peer.ID() + idStr := peerIdToString(id) + if _, ok := f.connectedMap[id]; ok { + clientRejectedMeter.Mark(1) + log.Debug("Client already connected", "address", address, "id", idStr) + return nil + } + now := f.clock.Now() + // create a clientInfo but do not add it yet + e := &clientInfo{pool: f, peer: peer, address: address, queueIndex: -1, id: id} + posBalance := f.getPosBalance(id).value + e.priority = posBalance != 0 + var negBalance uint64 + nb := f.negBalanceMap[address] + if nb != nil { + negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now)) / fixedPointMultiplier)) + } + if !e.priority { + capacity = f.freeClientCap + } + // check whether it fits into connectedQueue + if capacity < f.freeClientCap { + capacity = f.freeClientCap + } + e.capacity = capacity + e.balanceTracker.init(f.clock, capacity) + e.balanceTracker.setBalance(posBalance, negBalance) + f.setClientPriceFactors(e) + newCapacity := f.connectedCapacity + capacity + newCount := f.connectedQueue.Size() + 1 + if newCapacity > f.capacityLimit || newCount > f.countLimit { + var ( + kickList []*clientInfo + kickPriority int64 + ) + f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool { + c := data.(*clientInfo) + kickList = append(kickList, c) + kickPriority = priority + newCapacity -= c.capacity + newCount-- + return newCapacity > f.capacityLimit || newCount > f.countLimit + }) + if newCapacity > f.capacityLimit || newCount > f.countLimit || (e.balanceTracker.estimatedPriority(now+mclock.AbsTime(connectedBias), false)-kickPriority) > 0 { + // reject client + for _, c := range kickList { + f.connectedQueue.Push(c) + } + clientRejectedMeter.Mark(1) + log.Debug("Client rejected", "address", address, "id", idStr) + return nil + } + // accept new client, drop old ones + for _, c := range kickList { + f.dropClient(c, now, true) + } + } + // client accepted, finish setting it up + if nb != nil { + delete(f.negBalanceMap, address) + f.negBalanceQueue.Remove(nb.queueIndex) + } + if e.priority { + e.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) }) + } + f.connectedMap[id] = e + f.connectedQueue.Push(e) + f.connectedCapacity += e.capacity + totalConnectedGauge.Update(int64(f.connectedCapacity)) + if e.capacity != f.freeClientCap { + e.peer.updateCapacity(e.capacity) + } + clientConnectedMeter.Mark(1) + log.Debug("Client accepted", "address", address) + return e +} + +// unregisterPeer implements peerSetNotify +func (f *clientPool) unregisterPeer(p *peer) { + f.disconnect(p) +} + +// disconnect should be called when a connection is terminated. If the disconnection +// was initiated by the pool itself using disconnectFn then calling disconnect is +// not necessary but permitted. +func (f *clientPool) disconnect(p clientPeer) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return + } + address := p.freeClientId() + id := p.ID() + // Short circuit if the peer hasn't been registered. + e := f.connectedMap[id] + if e == nil { + log.Debug("Client not connected", "address", address, "id", peerIdToString(id)) + return + } + f.dropClient(e, f.clock.Now(), false) +} + +// dropClient removes a client from the connected queue and finalizes its balance. +// If kick is true then it also initiates the disconnection. +func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) { + if _, ok := f.connectedMap[e.id]; !ok { + return + } + f.finalizeBalance(e, now) + f.connectedQueue.Remove(e.queueIndex) + delete(f.connectedMap, e.id) + f.connectedCapacity -= e.capacity + totalConnectedGauge.Update(int64(f.connectedCapacity)) + if kick { + clientKickedMeter.Mark(1) + log.Debug("Client kicked out", "address", e.address) + f.removePeer(e.id) + } else { + clientDisconnectedMeter.Mark(1) + log.Debug("Client disconnected", "address", e.address) + } +} + +// finalizeBalance stops the balance tracker, retrieves the final balances and +// stores them in posBalanceQueue and negBalanceQueue +func (f *clientPool) finalizeBalance(c *clientInfo, now mclock.AbsTime) { + c.balanceTracker.stop(now) + pos, neg := c.balanceTracker.getBalance(now) + pb := f.getPosBalance(c.id) + pb.value = pos + f.storePosBalance(pb) + if neg < 1 { + neg = 1 + } + nb := &negBalance{address: c.address, queueIndex: -1, logValue: int64(math.Log(float64(neg))*fixedPointMultiplier) + f.logOffset(now)} + f.negBalanceMap[c.address] = nb + f.negBalanceQueue.Push(nb, -nb.logValue) + if f.negBalanceQueue.Size() > f.queueLimit { + nn := f.negBalanceQueue.PopItem().(*negBalance) + delete(f.negBalanceMap, nn.address) + } +} + +// balanceExhausted callback is called by balanceTracker when positive balance is exhausted. +// It revokes priority status and also reduces the client capacity if necessary. +func (f *clientPool) balanceExhausted(id enode.ID) { + f.lock.Lock() + defer f.lock.Unlock() + + c := f.connectedMap[id] + if c == nil || !c.priority { + return + } + c.priority = false + if c.capacity != f.freeClientCap { + f.connectedCapacity += f.freeClientCap - c.capacity + totalConnectedGauge.Update(int64(f.connectedCapacity)) + c.capacity = f.freeClientCap + c.peer.updateCapacity(c.capacity) + } +} + +// setConnLimit sets the maximum number and total capacity of connected clients, +// dropping some of them if necessary. +func (f *clientPool) setLimits(count int, totalCap uint64) { + f.lock.Lock() + defer f.lock.Unlock() + + f.countLimit = count + f.capacityLimit = totalCap + now := mclock.Now() + f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool { + c := data.(*clientInfo) + f.dropClient(c, now, true) + return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit + }) +} + +// logOffset calculates the time-dependent offset for the logarithmic +// representation of negative balance +func (f *clientPool) logOffset(now mclock.AbsTime) int64 { + // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor + // is to avoid int64 overflow. We assume that int64(negBalanceExpTC) >> fixedPointMultiplier. + logDecay := int64((time.Duration(now - f.startupTime)) / (negBalanceExpTC / fixedPointMultiplier)) + return f.logOffsetAtStartup + logDecay +} + +// setPriceFactors changes pricing factors for both positive and negative balances. +// Applies to connected clients and also future connections. +func (f *clientPool) setPriceFactors(posFactors, negFactors priceFactors) { + f.lock.Lock() + defer f.lock.Unlock() + + f.posFactors, f.negFactors = posFactors, negFactors + for _, c := range f.connectedMap { + f.setClientPriceFactors(c) + } +} + +// setClientPriceFactors sets the pricing factors for an individual connected client +func (f *clientPool) setClientPriceFactors(c *clientInfo) { + c.balanceTracker.setFactors(true, f.negFactors.timeFactor+float64(c.capacity)*f.negFactors.capacityFactor/1000000, f.negFactors.requestFactor) + c.balanceTracker.setFactors(false, f.posFactors.timeFactor+float64(c.capacity)*f.posFactors.capacityFactor/1000000, f.posFactors.requestFactor) +} + +// clientPoolStorage is the RLP representation of the pool's database storage +type clientPoolStorage struct { + LogOffset uint64 + List []*negBalance +} + +// loadFromDb restores pool status from the database storage +// (automatically called at initialization) +func (f *clientPool) loadFromDb() { + enc, err := f.db.Get(clientPoolDbKey) + if err != nil { + return + } + var storage clientPoolStorage + err = rlp.DecodeBytes(enc, &storage) + if err != nil { + log.Error("Failed to decode client list", "err", err) + return + } + f.logOffsetAtStartup = int64(storage.LogOffset) + f.startupTime = f.clock.Now() + for _, e := range storage.List { + log.Debug("Loaded free client record", "address", e.address, "logValue", e.logValue) + f.negBalanceMap[e.address] = e + f.negBalanceQueue.Push(e, -e.logValue) + } +} + +// saveToDb saves pool status to the database storage +// (automatically called during shutdown) +func (f *clientPool) saveToDb() { + now := f.clock.Now() + storage := clientPoolStorage{ + LogOffset: uint64(f.logOffset(now)), + } + for _, c := range f.connectedMap { + f.finalizeBalance(c, now) + } + i := 0 + storage.List = make([]*negBalance, len(f.negBalanceMap)) + for _, e := range f.negBalanceMap { + storage.List[i] = e + i++ + } + enc, err := rlp.EncodeToBytes(storage) + if err != nil { + log.Error("Failed to encode negative balance list", "err", err) + } else { + f.db.Put(clientPoolDbKey, enc) + } +} + +// storePosBalance stores a single positive balance entry in the database +func (f *clientPool) storePosBalance(b *posBalance) { + if b.value == b.lastStored { + return + } + enc, err := rlp.EncodeToBytes(b) + if err != nil { + log.Error("Failed to encode client balance", "err", err) + } else { + f.db.Put(append(clientBalanceDbKey, b.id[:]...), enc) + b.lastStored = b.value + } +} + +// getPosBalance retrieves a single positive balance entry from cache or the database +func (f *clientPool) getPosBalance(id enode.ID) *posBalance { + if b, ok := f.posBalanceMap[id]; ok { + f.posBalanceQueue.Remove(b.queueIndex) + f.posBalanceAccessCounter-- + f.posBalanceQueue.Push(b, f.posBalanceAccessCounter) + return b + } + balance := &posBalance{} + if enc, err := f.db.Get(append(clientBalanceDbKey, id[:]...)); err == nil { + if err := rlp.DecodeBytes(enc, balance); err != nil { + log.Error("Failed to decode client balance", "err", err) + balance = &posBalance{} + } + } + balance.id = id + balance.queueIndex = -1 + if f.posBalanceQueue.Size() >= f.queueLimit { + b := f.posBalanceQueue.PopItem().(*posBalance) + f.storePosBalance(b) + delete(f.posBalanceMap, b.id) + } + f.posBalanceAccessCounter-- + f.posBalanceQueue.Push(balance, f.posBalanceAccessCounter) + f.posBalanceMap[id] = balance + return balance +} + +// addBalance updates the positive balance of a client. +// If setTotal is false then the given amount is added to the balance. +// If setTotal is true then amount represents the total amount ever added to the +// given ID and positive balance is increased by (amount-lastTotal) while lastTotal +// is updated to amount. This method also allows removing positive balance. +func (f *clientPool) addBalance(id enode.ID, amount uint64, setTotal bool) { + f.lock.Lock() + defer f.lock.Unlock() + + pb := f.getPosBalance(id) + c := f.connectedMap[id] + var negBalance uint64 + if c != nil { + pb.value, negBalance = c.balanceTracker.getBalance(f.clock.Now()) + } + if setTotal { + if pb.value+amount > pb.lastTotal { + pb.value += amount - pb.lastTotal + } else { + pb.value = 0 + } + pb.lastTotal = amount + } else { + pb.value += amount + pb.lastTotal += amount + } + f.storePosBalance(pb) + if c != nil { + c.balanceTracker.setBalance(pb.value, negBalance) + if !c.priority && pb.value > 0 { + c.priority = true + c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) }) + } + } +} + +// posBalance represents a recently accessed positive balance entry +type posBalance struct { + id enode.ID + value, lastStored, lastTotal uint64 + queueIndex int // position in posBalanceQueue +} + +// EncodeRLP implements rlp.Encoder +func (e *posBalance) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.value, e.lastTotal}) +} + +// DecodeRLP implements rlp.Decoder +func (e *posBalance) DecodeRLP(s *rlp.Stream) error { + var entry struct { + Value, LastTotal uint64 + } + if err := s.Decode(&entry); err != nil { + return err + } + e.value = entry.Value + e.lastStored = entry.Value + e.lastTotal = entry.LastTotal + return nil +} + +// posSetIndex callback updates posBalance item index in posBalanceQueue +func posSetIndex(a interface{}, index int) { + a.(*posBalance).queueIndex = index +} + +// negBalance represents a negative balance entry of a disconnected client +type negBalance struct { + address string + logValue int64 + queueIndex int // position in negBalanceQueue +} + +// EncodeRLP implements rlp.Encoder +func (e *negBalance) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.address, uint64(e.logValue)}) +} + +// DecodeRLP implements rlp.Decoder +func (e *negBalance) DecodeRLP(s *rlp.Stream) error { + var entry struct { + Address string + LogValue uint64 + } + if err := s.Decode(&entry); err != nil { + return err + } + e.address = entry.Address + e.logValue = int64(entry.LogValue) + e.queueIndex = -1 + return nil +} + +// negSetIndex callback updates negBalance item index in negBalanceQueue +func negSetIndex(a interface{}, index int) { + a.(*negBalance).queueIndex = index +} diff --git a/les/clientpool_test.go b/les/clientpool_test.go new file mode 100644 index 0000000000..fba328fbf0 --- /dev/null +++ b/les/clientpool_test.go @@ -0,0 +1,180 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +func TestClientPoolL10C100Free(t *testing.T) { + testClientPool(t, 10, 100, 0, true) +} + +func TestClientPoolL40C200Free(t *testing.T) { + testClientPool(t, 40, 200, 0, true) +} + +func TestClientPoolL100C300Free(t *testing.T) { + testClientPool(t, 100, 300, 0, true) +} + +func TestClientPoolL10C100P4(t *testing.T) { + testClientPool(t, 10, 100, 4, false) +} + +func TestClientPoolL40C200P30(t *testing.T) { + testClientPool(t, 40, 200, 30, false) +} + +func TestClientPoolL100C300P20(t *testing.T) { + testClientPool(t, 100, 300, 20, false) +} + +const testClientPoolTicks = 500000 + +type poolTestPeer int + +func (i poolTestPeer) ID() enode.ID { + return enode.ID{byte(i % 256), byte(i >> 8)} +} + +func (i poolTestPeer) freeClientId() string { + return fmt.Sprintf("addr #%d", i) +} + +func (i poolTestPeer) updateCapacity(uint64) {} + +func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomDisconnect bool) { + rand.Seed(time.Now().UnixNano()) + var ( + clock mclock.Simulated + db = rawdb.NewMemoryDatabase() + connected = make([]bool, clientCount) + connTicks = make([]int, clientCount) + disconnCh = make(chan int, clientCount) + disconnFn = func(id enode.ID) { + disconnCh <- int(id[0]) + int(id[1])<<8 + } + pool = newClientPool(db, 1, 10000, &clock, disconnFn) + ) + pool.setLimits(connLimit, uint64(connLimit)) + pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + + // pool should accept new peers up to its connected limit + for i := 0; i < connLimit; i++ { + if pool.connect(poolTestPeer(i), 0) != nil { + connected[i] = true + } else { + t.Fatalf("Test peer #%d rejected", i) + } + } + // since all accepted peers are new and should not be kicked out, the next one should be rejected + if pool.connect(poolTestPeer(connLimit), 0) != nil { + connected[connLimit] = true + t.Fatalf("Peer accepted over connected limit") + } + + // randomly connect and disconnect peers, expect to have a similar total connection time at the end + for tickCounter := 0; tickCounter < testClientPoolTicks; tickCounter++ { + clock.Run(1 * time.Second) + //time.Sleep(time.Microsecond * 100) + + if tickCounter == testClientPoolTicks/4 { + // give a positive balance to some of the peers + amount := uint64(testClientPoolTicks / 2 * 1000000000) // enough for half of the simulation period + for i := 0; i < paidCount; i++ { + pool.addBalance(poolTestPeer(i).ID(), amount, false) + } + } + + i := rand.Intn(clientCount) + if connected[i] { + if randomDisconnect { + pool.disconnect(poolTestPeer(i)) + connected[i] = false + connTicks[i] += tickCounter + } + } else { + if pool.connect(poolTestPeer(i), 0) != nil { + connected[i] = true + connTicks[i] -= tickCounter + } + } + pollDisconnects: + for { + select { + case i := <-disconnCh: + pool.disconnect(poolTestPeer(i)) + if connected[i] { + connTicks[i] += tickCounter + connected[i] = false + } + default: + break pollDisconnects + } + } + } + + expTicks := testClientPoolTicks/2*connLimit/clientCount + testClientPoolTicks/2*(connLimit-paidCount)/(clientCount-paidCount) + expMin := expTicks - expTicks/10 + expMax := expTicks + expTicks/10 + paidTicks := testClientPoolTicks/2*connLimit/clientCount + testClientPoolTicks/2 + paidMin := paidTicks - paidTicks/10 + paidMax := paidTicks + paidTicks/10 + + // check if the total connected time of peers are all in the expected range + for i, c := range connected { + if c { + connTicks[i] += testClientPoolTicks + } + min, max := expMin, expMax + if i < paidCount { + // expect a higher amount for clients with a positive balance + min, max = paidMin, paidMax + } + if connTicks[i] < min || connTicks[i] > max { + t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], min, max) + } + } + + // a previously unknown peer should be accepted now + if pool.connect(poolTestPeer(54321), 0) == nil { + t.Fatalf("Previously unknown peer rejected") + } + + // close and restart pool + pool.stop() + pool = newClientPool(db, 1, 10000, &clock, func(id enode.ID) {}) + pool.setLimits(connLimit, uint64(connLimit)) + + // try connecting all known peers (connLimit should be filled up) + for i := 0; i < clientCount; i++ { + pool.connect(poolTestPeer(i), 0) + } + // expect pool to remember known nodes and kick out one of them to accept a new one + if pool.connect(poolTestPeer(54322), 0) == nil { + t.Errorf("Previously unknown peer rejected after restarting pool") + } + pool.stop() +} diff --git a/les/commons.go b/les/commons.go index 7eaf39c843..ef3c470e58 100644 --- a/les/commons.go +++ b/les/commons.go @@ -17,7 +17,6 @@ package les import ( - "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -64,7 +63,7 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol { return c.protocolManager.runPeer(version, p, rw) }, PeerInfo: func(id enode.ID) interface{} { - if p := c.protocolManager.peers.Peer(fmt.Sprintf("%x", id.Bytes())); p != nil { + if p := c.protocolManager.peers.Peer(peerIdToString(id)); p != nil { return p.Info() } return nil diff --git a/les/freeclient.go b/les/freeclient.go deleted file mode 100644 index 08613e6ce0..0000000000 --- a/les/freeclient.go +++ /dev/null @@ -1,351 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package les - -import ( - "io" - "math" - "net" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/common/prque" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" -) - -// freeClientPool implements a client database that limits the connection time -// of each client and manages accepting/rejecting incoming connections and even -// kicking out some connected clients. The pool calculates recent usage time -// for each known client (a value that increases linearly when the client is -// connected and decreases exponentially when not connected). Clients with lower -// recent usage are preferred, unknown nodes have the highest priority. Already -// connected nodes receive a small bias in their favor in order to avoid accepting -// and instantly kicking out clients. -// -// Note: the pool can use any string for client identification. Using signature -// keys for that purpose would not make sense when being known has a negative -// value for the client. Currently the LES protocol manager uses IP addresses -// (without port address) to identify clients. -type freeClientPool struct { - db ethdb.Database - lock sync.Mutex - clock mclock.Clock - closed bool - removePeer func(string) - - connectedLimit, totalLimit int - freeClientCap uint64 - connectedCap uint64 - - addressMap map[string]*freeClientPoolEntry - connPool, disconnPool *prque.Prque - startupTime mclock.AbsTime - logOffsetAtStartup int64 -} - -const ( - recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage - fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format - connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon -) - -// newFreeClientPool creates a new free client pool -func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool { - pool := &freeClientPool{ - db: db, - clock: clock, - addressMap: make(map[string]*freeClientPoolEntry), - connPool: prque.New(poolSetIndex), - disconnPool: prque.New(poolSetIndex), - freeClientCap: freeClientCap, - totalLimit: totalLimit, - removePeer: removePeer, - } - pool.loadFromDb() - return pool -} - -func (f *freeClientPool) stop() { - f.lock.Lock() - f.closed = true - f.saveToDb() - f.lock.Unlock() -} - -// freeClientId returns a string identifier for the peer. Multiple peers with the -// same identifier can not be in the free client pool simultaneously. -func freeClientId(p *peer) string { - if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { - if addr.IP.IsLoopback() { - // using peer id instead of loopback ip address allows multiple free - // connections from local machine to own server - return p.id - } else { - return addr.IP.String() - } - } - return "" -} - -// registerPeer implements clientPool -func (f *freeClientPool) registerPeer(p *peer) { - if freeId := freeClientId(p); freeId != "" { - if !f.connect(freeId, p.id) { - f.removePeer(p.id) - } - } -} - -// connect should be called after a successful handshake. If the connection was -// rejected, there is no need to call disconnect. -func (f *freeClientPool) connect(address, id string) bool { - f.lock.Lock() - defer f.lock.Unlock() - - if f.closed { - return false - } - if f.connectedLimit == 0 { - log.Debug("Client rejected", "address", address) - return false - } - e := f.addressMap[address] - now := f.clock.Now() - var recentUsage int64 - if e == nil { - e = &freeClientPoolEntry{address: address, index: -1, id: id} - f.addressMap[address] = e - } else { - if e.connected { - log.Debug("Client already connected", "address", address) - return false - } - recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier)) - } - e.linUsage = recentUsage - int64(now) - // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool - if f.connPool.Size() == f.connectedLimit { - i := f.connPool.PopItem().(*freeClientPoolEntry) - if e.linUsage+int64(connectedBias)-i.linUsage < 0 { - // kick it out and accept the new client - f.dropClient(i, now) - clientKickedMeter.Mark(1) - f.connectedCap -= f.freeClientCap - } else { - // keep the old client and reject the new one - f.connPool.Push(i, i.linUsage) - log.Debug("Client rejected", "address", address) - clientRejectedMeter.Mark(1) - return false - } - } - f.disconnPool.Remove(e.index) - e.connected = true - e.id = id - f.connPool.Push(e, e.linUsage) - if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { - f.disconnPool.Pop() - } - f.connectedCap += f.freeClientCap - totalConnectedGauge.Update(int64(f.connectedCap)) - clientConnectedMeter.Mark(1) - log.Debug("Client accepted", "address", address) - return true -} - -// unregisterPeer implements clientPool -func (f *freeClientPool) unregisterPeer(p *peer) { - if freeId := freeClientId(p); freeId != "" { - f.disconnect(freeId) - } -} - -// disconnect should be called when a connection is terminated. If the disconnection -// was initiated by the pool itself using disconnectFn then calling disconnect is -// not necessary but permitted. -func (f *freeClientPool) disconnect(address string) { - f.lock.Lock() - defer f.lock.Unlock() - - if f.closed { - return - } - // Short circuit if the peer hasn't been registered. - e := f.addressMap[address] - if e == nil { - return - } - now := f.clock.Now() - if !e.connected { - log.Debug("Client already disconnected", "address", address) - return - } - f.connPool.Remove(e.index) - f.calcLogUsage(e, now) - e.connected = false - f.disconnPool.Push(e, -e.logUsage) - f.connectedCap -= f.freeClientCap - totalConnectedGauge.Update(int64(f.connectedCap)) - log.Debug("Client disconnected", "address", address) -} - -// setConnLimit sets the maximum number of free client slots and also drops -// some peers if necessary -func (f *freeClientPool) setLimits(count int, totalCap uint64) { - f.lock.Lock() - defer f.lock.Unlock() - - f.connectedLimit = int(totalCap / f.freeClientCap) - if count < f.connectedLimit { - f.connectedLimit = count - } - now := mclock.Now() - for f.connPool.Size() > f.connectedLimit { - i := f.connPool.PopItem().(*freeClientPoolEntry) - f.dropClient(i, now) - f.connectedCap -= f.freeClientCap - } - totalConnectedGauge.Update(int64(f.connectedCap)) -} - -// dropClient disconnects a client and also moves it from the connected to the -// disconnected pool -func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) { - f.connPool.Remove(i.index) - f.calcLogUsage(i, now) - i.connected = false - f.disconnPool.Push(i, -i.logUsage) - log.Debug("Client kicked out", "address", i.address) - f.removePeer(i.id) -} - -// logOffset calculates the time-dependent offset for the logarithmic -// representation of recent usage -func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 { - // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor - // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier. - logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier)) - return f.logOffsetAtStartup + logDecay -} - -// calcLogUsage converts recent usage from linear to logarithmic representation -// when disconnecting a peer or closing the client pool -func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) { - dt := e.linUsage + int64(now) - if dt < 1 { - dt = 1 - } - e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now) -} - -// freeClientPoolStorage is the RLP representation of the pool's database storage -type freeClientPoolStorage struct { - LogOffset uint64 - List []*freeClientPoolEntry -} - -// loadFromDb restores pool status from the database storage -// (automatically called at initialization) -func (f *freeClientPool) loadFromDb() { - enc, err := f.db.Get([]byte("freeClientPool")) - if err != nil { - return - } - var storage freeClientPoolStorage - err = rlp.DecodeBytes(enc, &storage) - if err != nil { - log.Error("Failed to decode client list", "err", err) - return - } - f.logOffsetAtStartup = int64(storage.LogOffset) - f.startupTime = f.clock.Now() - for _, e := range storage.List { - log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage) - f.addressMap[e.address] = e - f.disconnPool.Push(e, -e.logUsage) - } -} - -// saveToDb saves pool status to the database storage -// (automatically called during shutdown) -func (f *freeClientPool) saveToDb() { - now := f.clock.Now() - storage := freeClientPoolStorage{ - LogOffset: uint64(f.logOffset(now)), - List: make([]*freeClientPoolEntry, len(f.addressMap)), - } - i := 0 - for _, e := range f.addressMap { - if e.connected { - f.calcLogUsage(e, now) - } - storage.List[i] = e - i++ - } - enc, err := rlp.EncodeToBytes(storage) - if err != nil { - log.Error("Failed to encode client list", "err", err) - } else { - f.db.Put([]byte("freeClientPool"), enc) - } -} - -// freeClientPoolEntry represents a client address known by the pool. -// When connected, recent usage is calculated as linUsage + int64(clock.Now()) -// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset -// also grows linearly with time while the server is running. -// Conversion between linear and logarithmic representation happens when connecting -// or disconnecting the node. -// -// Note: linUsage and logUsage are values used with constantly growing offsets so -// even though they are close to each other at any time they may wrap around int64 -// limits over time. Comparison should be performed accordingly. -type freeClientPoolEntry struct { - address, id string - connected bool - disconnectFn func() - linUsage, logUsage int64 - index int -} - -func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)}) -} - -func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error { - var entry struct { - Address string - LogUsage uint64 - } - if err := s.Decode(&entry); err != nil { - return err - } - e.address = entry.Address - e.logUsage = int64(entry.LogUsage) - e.connected = false - e.index = -1 - return nil -} - -// poolSetIndex callback is used by both priority queues to set/update the index of -// the element in the queue. Index is needed to remove elements other than the top one. -func poolSetIndex(a interface{}, i int) { - a.(*freeClientPoolEntry).index = i -} diff --git a/les/freeclient_test.go b/les/freeclient_test.go deleted file mode 100644 index 5c4f585604..0000000000 --- a/les/freeclient_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package les - -import ( - "fmt" - "math/rand" - "strconv" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/core/rawdb" -) - -func TestFreeClientPoolL10C100(t *testing.T) { - testFreeClientPool(t, 10, 100) -} - -func TestFreeClientPoolL40C200(t *testing.T) { - testFreeClientPool(t, 40, 200) -} - -func TestFreeClientPoolL100C300(t *testing.T) { - testFreeClientPool(t, 100, 300) -} - -const testFreeClientPoolTicks = 500000 - -func testFreeClientPool(t *testing.T, connLimit, clientCount int) { - var ( - clock mclock.Simulated - db = rawdb.NewMemoryDatabase() - connected = make([]bool, clientCount) - connTicks = make([]int, clientCount) - disconnCh = make(chan int, clientCount) - peerAddress = func(i int) string { - return fmt.Sprintf("addr #%d", i) - } - peerId = func(i int) string { - return fmt.Sprintf("id #%d", i) - } - disconnFn = func(id string) { - i, err := strconv.Atoi(id[4:]) - if err != nil { - panic(err) - } - disconnCh <- i - } - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) - ) - pool.setLimits(connLimit, uint64(connLimit)) - - // pool should accept new peers up to its connected limit - for i := 0; i < connLimit; i++ { - if pool.connect(peerAddress(i), peerId(i)) { - connected[i] = true - } else { - t.Fatalf("Test peer #%d rejected", i) - } - } - // since all accepted peers are new and should not be kicked out, the next one should be rejected - if pool.connect(peerAddress(connLimit), peerId(connLimit)) { - connected[connLimit] = true - t.Fatalf("Peer accepted over connected limit") - } - - // randomly connect and disconnect peers, expect to have a similar total connection time at the end - for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ { - clock.Run(1 * time.Second) - - i := rand.Intn(clientCount) - if connected[i] { - pool.disconnect(peerAddress(i)) - connected[i] = false - connTicks[i] += tickCounter - } else { - if pool.connect(peerAddress(i), peerId(i)) { - connected[i] = true - connTicks[i] -= tickCounter - } - } - pollDisconnects: - for { - select { - case i := <-disconnCh: - pool.disconnect(peerAddress(i)) - if connected[i] { - connTicks[i] += tickCounter - connected[i] = false - } - default: - break pollDisconnects - } - } - } - - expTicks := testFreeClientPoolTicks * connLimit / clientCount - expMin := expTicks - expTicks/10 - expMax := expTicks + expTicks/10 - - // check if the total connected time of peers are all in the expected range - for i, c := range connected { - if c { - connTicks[i] += testFreeClientPoolTicks - } - if connTicks[i] < expMin || connTicks[i] > expMax { - t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax) - } - } - - // a previously unknown peer should be accepted now - if !pool.connect("newAddr", "newId") { - t.Fatalf("Previously unknown peer rejected") - } - - // close and restart pool - pool.stop() - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) - pool.setLimits(connLimit, uint64(connLimit)) - - // try connecting all known peers (connLimit should be filled up) - for i := 0; i < clientCount; i++ { - pool.connect(peerAddress(i), peerId(i)) - } - // expect pool to remember known nodes and kick out one of them to accept a new one - if !pool.connect("newAddr2", "newId2") { - t.Errorf("Previously unknown peer rejected after restarting pool") - } - pool.stop() -} diff --git a/les/handler.go b/les/handler.go index 53e2911e4f..ac1eb173cb 100644 --- a/les/handler.go +++ b/les/handler.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" @@ -304,8 +305,14 @@ func (pm *ProtocolManager) handle(p *peer) error { p.Log().Error("Light Ethereum peer registration failed", "err", err) return err } + if !pm.client && p.balanceTracker == nil { + // add dummy balance tracker for tests + p.balanceTracker = &balanceTracker{} + p.balanceTracker.init(&mclock.System{}, 1) + } connectedAt := time.Now() defer func() { + p.balanceTracker = nil pm.removePeer(p.id) connectionTimer.UpdateSince(connectedAt) }() @@ -400,6 +407,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() var deliverMsg *Msg + balanceTracker := p.balanceTracker sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) { p.responseLock.Lock() @@ -418,6 +426,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize) if amount != 0 { pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost) + balanceTracker.requestCost(realCost) } } else { realCost = maxCost diff --git a/les/metrics.go b/les/metrics.go index 4c6737a4e8..4fe7031163 100644 --- a/les/metrics.go +++ b/les/metrics.go @@ -29,24 +29,24 @@ var ( connectionTimer = metrics.NewRegisteredTimer("les/connectionTime", nil) - totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) - totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) - totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) - blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) - requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil) - requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil) - requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil) - relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015)) - recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil) - recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil) - sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil) - sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil) - clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil) - clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil) - clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil) - // clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil) - clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil) - clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil) + totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) + totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) + totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) + blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil) + requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil) + requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil) + relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015)) + recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil) + recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil) + sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil) + sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil) + clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil) + clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil) + clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil) + clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil) + clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil) + clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil) ) // meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of diff --git a/les/peer.go b/les/peer.go index 1aa1613b0e..bcd91cd835 100644 --- a/les/peer.go +++ b/les/peer.go @@ -21,6 +21,7 @@ import ( "fmt" "math/big" "math/rand" + "net" "sync" "sync/atomic" "time" @@ -33,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -105,10 +107,11 @@ type peer struct { updateTime mclock.AbsTime frozen uint32 // 1 if client is in frozen state - fcClient *flowcontrol.ClientNode // nil if the peer is server only - fcServer *flowcontrol.ServerNode // nil if the peer is client only - fcParams flowcontrol.ServerParams - fcCosts requestCostTable + fcClient *flowcontrol.ClientNode // nil if the peer is server only + fcServer *flowcontrol.ServerNode // nil if the peer is client only + fcParams flowcontrol.ServerParams + fcCosts requestCostTable + balanceTracker *balanceTracker // set by clientPool.connect, used and removed by ProtocolManager.handle trusted bool onlyAnnounce bool @@ -122,12 +125,32 @@ func newPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgR rw: rw, version: version, network: network, - id: fmt.Sprintf("%x", p.ID().Bytes()), + id: peerIdToString(p.ID()), trusted: trusted, errCh: make(chan error, 1), } } +// peerIdToString converts enode.ID to a string form +func peerIdToString(id enode.ID) string { + return fmt.Sprintf("%x", id.Bytes()) +} + +// freeClientId returns a string identifier for the peer. Multiple peers with the +// same identifier can not be connected in free mode simultaneously. +func (p *peer) freeClientId() string { + if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { + if addr.IP.IsLoopback() { + // using peer id instead of loopback ip address allows multiple free + // connections from local machine to own server + return p.id + } else { + return addr.IP.String() + } + } + return p.id +} + // rejectUpdate returns true if a parameter update has to be rejected because // the size and/or rate of updates exceed the capacity limitation func (p *peer) rejectUpdate(size uint64) bool { diff --git a/les/server.go b/les/server.go index 0795baf9fd..97e82a42b2 100644 --- a/les/server.go +++ b/les/server.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -55,9 +56,9 @@ type LesServer struct { thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode - maxPeers int - minCapacity, freeClientCap uint64 - freeClientPool *freeClientPool + maxPeers int + minCapacity, maxCapacity, freeClientCap uint64 + clientPool *clientPool } func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { @@ -158,7 +159,7 @@ func (s *LesServer) startEventLoop() { } updateRecharge() totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh) - s.freeClientPool.setLimits(s.maxPeers, totalCapacity) + s.clientPool.setLimits(s.maxPeers, totalCapacity) var maxFreePeers uint64 go func() { @@ -175,7 +176,7 @@ func (s *LesServer) startEventLoop() { log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers) } maxFreePeers = newFreePeers - s.freeClientPool.setLimits(s.maxPeers, totalCapacity) + s.clientPool.setLimits(s.maxPeers, totalCapacity) case <-s.protocolManager.quitSync: s.protocolManager.wg.Done() return @@ -205,14 +206,14 @@ func (s *LesServer) Start(srvr *p2p.Server) { } } - maxCapacity := s.freeClientCap * uint64(s.maxPeers) - if totalRecharge > maxCapacity { - maxCapacity = totalRecharge + s.maxCapacity = s.freeClientCap * uint64(s.maxPeers) + if totalRecharge > s.maxCapacity { + s.maxCapacity = totalRecharge } - s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2) - s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }) - s.protocolManager.peers.notify(s.freeClientPool) - + s.fcManager.SetCapacityLimits(s.freeClientCap, s.maxCapacity, s.freeClientCap*2) + s.clientPool = newClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id enode.ID) { go s.protocolManager.removePeer(peerIdToString(id)) }) + s.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) + s.protocolManager.peers.notify(s.clientPool) s.startEventLoop() s.protocolManager.Start(s.config.LightPeers) if srvr.DiscV5 != nil { @@ -250,7 +251,7 @@ func (s *LesServer) Stop() { go func() { <-s.protocolManager.noMorePeers }() - s.freeClientPool.stop() + s.clientPool.stop() s.costTracker.stop() s.protocolManager.Stop() }