Official Go implementation of the Ethereum protocol
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-ethereum/les/clientpool.go

454 lines
16 KiB

// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/rlp"
)
const (
defaultNegExpTC = 3600 // default time constant (in seconds) for exponentially reducing negative balance
// defaultConnectedBias is applied to already connected clients So that
// already connected client won't be kicked out very soon and we
// can ensure all connected clients can have enough time to request
// or sync some data.
//
// todo(rjl493456442) make it configurable. It can be the option of
// free trial time!
defaultConnectedBias = time.Minute * 3
inactiveTimeout = time.Second * 10
)
// clientPool implements a client database that assigns a priority to each client
// based on a positive and negative balance. Positive balance is externally assigned
// to prioritized clients and is decreased with connection time and processed
// requests (unless the price factors are zero). If the positive balance is zero
// then negative balance is accumulated.
//
// Balance tracking and priority calculation for connected clients is done by
// balanceTracker. activeQueue ensures that clients with the lowest positive or
// highest negative balance get evicted when the total capacity allowance is full
// and new clients with a better balance want to connect.
//
// Already connected nodes receive a small bias in their favor in order to avoid
// accepting and instantly kicking out clients. In theory, we try to ensure that
// each client can have several minutes of connection time.
//
// Balances of disconnected clients are stored in nodeDB including positive balance
// and negative banalce. Boeth positive balance and negative balance will decrease
// exponentially. If the balance is low enough, then the record will be dropped.
type clientPool struct {
vfs.BalanceTrackerSetup
vfs.PriorityPoolSetup
lock sync.Mutex
clock mclock.Clock
closed bool
removePeer func(enode.ID)
synced func() bool
ns *nodestate.NodeStateMachine
pp *vfs.PriorityPool
bt *vfs.BalanceTracker
defaultPosFactors, defaultNegFactors vfs.PriceFactors
posExpTC, negExpTC uint64
minCap uint64 // The minimal capacity value allowed for any client
connectedBias time.Duration
capLimit uint64
}
// clientPoolPeer represents a client peer in the pool.
// Positive balances are assigned to node key while negative balances are assigned
// to freeClientId. Currently network IP address without port is used because
// clients have a limited access to IP addresses while new node keys can be easily
// generated so it would be useless to assign a negative value to them.
type clientPoolPeer interface {
Node() *enode.Node
freeClientId() string
updateCapacity(uint64)
freeze()
allowInactive() bool
}
// clientInfo defines all information required by clientpool.
type clientInfo struct {
node *enode.Node
address string
peer clientPoolPeer
connected, priority bool
connectedAt mclock.AbsTime
balance *vfs.NodeBalance
}
// newClientPool creates a new client pool
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool {
pool := &clientPool{
ns: ns,
BalanceTrackerSetup: balanceTrackerSetup,
PriorityPoolSetup: priorityPoolSetup,
clock: clock,
minCap: minCap,
connectedBias: connectedBias,
removePeer: removePeer,
synced: synced,
}
pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
// set default expiration constants used by tests
// Note: server overwrites this if token sale is active
pool.bt.SetExpirationTCs(0, defaultNegExpTC)
ns.SubscribeState(pool.InactiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
if newState.Equals(pool.InactiveFlag) {
ns.AddTimeout(node, pool.InactiveFlag, inactiveTimeout)
}
if oldState.Equals(pool.InactiveFlag) && newState.Equals(pool.InactiveFlag.Or(pool.PriorityFlag)) {
ns.SetStateSub(node, pool.InactiveFlag, nodestate.Flags{}, 0) // remove timeout
}
})
ns.SubscribeState(pool.ActiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
return
}
c.priority = newState.HasAll(pool.PriorityFlag)
if newState.Equals(pool.ActiveFlag) {
cap, _ := ns.GetField(node, pool.CapacityField).(uint64)
if cap > minCap {
pool.pp.RequestCapacity(node, minCap, 0, true)
}
}
})
ns.SubscribeState(pool.InactiveFlag.Or(pool.ActiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
if oldState.IsEmpty() {
clientConnectedMeter.Mark(1)
log.Debug("Client connected", "id", node.ID())
}
if oldState.Equals(pool.InactiveFlag) && newState.Equals(pool.ActiveFlag) {
clientActivatedMeter.Mark(1)
log.Debug("Client activated", "id", node.ID())
}
if oldState.Equals(pool.ActiveFlag) && newState.Equals(pool.InactiveFlag) {
clientDeactivatedMeter.Mark(1)
log.Debug("Client deactivated", "id", node.ID())
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil || !c.peer.allowInactive() {
pool.removePeer(node.ID())
}
}
if newState.IsEmpty() {
clientDisconnectedMeter.Mark(1)
log.Debug("Client disconnected", "id", node.ID())
pool.removePeer(node.ID())
}
})
var totalConnected uint64
ns.SubscribeField(pool.CapacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
oldCap, _ := oldValue.(uint64)
newCap, _ := newValue.(uint64)
totalConnected += newCap - oldCap
totalConnectedGauge.Update(int64(totalConnected))
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
c.peer.updateCapacity(newCap)
}
})
return pool
}
// stop shuts the client pool down
func (f *clientPool) stop() {
f.lock.Lock()
f.closed = true
f.lock.Unlock()
f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
// enforces saving all balances in BalanceTracker
f.disconnectNode(node)
})
f.bt.Stop()
}
// connect should be called after a successful handshake. If the connection was
// rejected, there is no need to call disconnect.
func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) {
f.lock.Lock()
defer f.lock.Unlock()
// Short circuit if clientPool is already closed.
if f.closed {
return 0, fmt.Errorf("Client pool is already closed")
}
// Dedup connected peers.
node, freeID := peer.Node(), peer.freeClientId()
if f.ns.GetField(node, clientInfoField) != nil {
log.Debug("Client already connected", "address", freeID, "id", node.ID().String())
return 0, fmt.Errorf("Client already connected address=%s id=%s", freeID, node.ID().String())
}
now := f.clock.Now()
c := &clientInfo{
node: node,
address: freeID,
peer: peer,
connected: true,
connectedAt: now,
}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
f.disconnect(peer)
return 0, nil
}
c.balance.SetPriceFactors(f.defaultPosFactors, f.defaultNegFactors)
f.ns.SetState(node, f.InactiveFlag, nodestate.Flags{}, 0)
var allowed bool
f.ns.Operation(func() {
_, allowed = f.pp.RequestCapacity(node, f.minCap, f.connectedBias, true)
})
if allowed {
return f.minCap, nil
}
if !peer.allowInactive() {
f.disconnect(peer)
}
return 0, nil
}
// setConnectedBias sets the connection bias, which is applied to already connected clients
// So that already connected client won't be kicked out very soon and we can ensure all
// connected clients can have enough time to request or sync some data.
func (f *clientPool) setConnectedBias(bias time.Duration) {
f.lock.Lock()
defer f.lock.Unlock()
f.connectedBias = bias
f.pp.SetActiveBias(bias)
}
// disconnect should be called when a connection is terminated. If the disconnection
// was initiated by the pool itself using disconnectFn then calling disconnect is
// not necessary but permitted.
func (f *clientPool) disconnect(p clientPoolPeer) {
f.disconnectNode(p.Node())
}
// disconnectNode removes node fields and flags related to connected status
func (f *clientPool) disconnectNode(node *enode.Node) {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}
// setDefaultFactors sets the default price factors applied to subsequently connected clients
func (f *clientPool) setDefaultFactors(posFactors, negFactors vfs.PriceFactors) {
f.lock.Lock()
defer f.lock.Unlock()
f.defaultPosFactors = posFactors
f.defaultNegFactors = negFactors
}
// capacityInfo returns the total capacity allowance, the total capacity of connected
// clients and the total capacity of connected and prioritized clients
func (f *clientPool) capacityInfo() (uint64, uint64, uint64) {
f.lock.Lock()
defer f.lock.Unlock()
// total priority active cap will be supported when the token issuer module is added
_, activeCap := f.pp.Active()
return f.capLimit, activeCap, 0
}
// setLimits sets the maximum number and total capacity of connected clients,
// dropping some of them if necessary.
func (f *clientPool) setLimits(totalConn int, totalCap uint64) {
f.lock.Lock()
defer f.lock.Unlock()
f.capLimit = totalCap
f.pp.SetLimits(uint64(totalConn), totalCap)
}
// setCapacity sets the assigned capacity of a connected client
func (f *clientPool) setCapacity(node *enode.Node, freeID string, capacity uint64, bias time.Duration, setCap bool) (uint64, error) {
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
if setCap {
return 0, fmt.Errorf("client %064x is not connected", node.ID())
}
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return 0, fmt.Errorf("BalanceField of %064x is missing", node.ID())
}
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
}
var (
minPriority int64
allowed bool
)
f.ns.Operation(func() {
if !setCap || c.priority {
// check clientInfo.priority inside Operation to ensure thread safety
minPriority, allowed = f.pp.RequestCapacity(node, capacity, bias, setCap)
}
})
if allowed {
return 0, nil
}
missing := c.balance.PosBalanceMissing(minPriority, capacity, bias)
if missing < 1 {
// ensure that we never return 0 missing and insufficient priority error
missing = 1
}
return missing, errNoPriority
}
// setCapacityLocked is the equivalent of setCapacity used when f.lock is already locked
func (f *clientPool) setCapacityLocked(node *enode.Node, freeID string, capacity uint64, minConnTime time.Duration, setCap bool) (uint64, error) {
f.lock.Lock()
defer f.lock.Unlock()
return f.setCapacity(node, freeID, capacity, minConnTime, setCap)
}
// forClients calls the supplied callback for either the listed node IDs or all connected
// nodes. It passes a valid clientInfo to the callback and ensures that the necessary
// fields and flags are set in order for BalanceTracker and PriorityPool to work even if
// the node is not connected.
func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
f.lock.Lock()
defer f.lock.Unlock()
if len(ids) == 0 {
f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
cb(c)
}
})
} else {
for _, id := range ids {
node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
cb(c)
} else {
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, "")
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance != nil {
cb(c)
} else {
log.Error("BalanceField is missing")
}
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}
}
}
}
// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
// and a bias time value. For each given token amount it calculates the maximum achievable
// capacity in case the amount is added to the balance.
func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
var req vflux.CapacityQueryReq
if rlp.DecodeBytes(data, &req) != nil {
return nil
}
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
if !f.synced() {
capacityQueryZeroMeter.Mark(1)
reply, _ := rlp.EncodeToBytes(&result)
return reply
}
node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return nil
}
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
}
pb, _ := c.balance.GetBalance()
for i, addTokens := range req.AddTokens {
add := addTokens.Int64()
result[i] = curve.MaxCapacity(func(capacity uint64) int64 {
return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity)
})
if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap {
result[i] = f.minCap
}
if result[i] < f.minCap {
result[i] = 0
}
}
// add first result to metrics (don't care about priority client multi-queries yet)
if result[0] == 0 {
capacityQueryZeroMeter.Mark(1)
} else {
capacityQueryNonZeroMeter.Mark(1)
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}