From 4fabd9cbd2b8fc6bdc643fefb5ea583c5019171c Mon Sep 17 00:00:00 2001 From: gary rong Date: Wed, 26 Feb 2020 17:41:24 +0800 Subject: [PATCH] les: separate peer into clientPeer and serverPeer (#19991) * les: separate peer into clientPeer and serverPeer * les: address comments --- les/benchmark.go | 53 +- les/client.go | 9 +- les/client_handler.go | 61 +- les/clientpool.go | 12 +- les/commons.go | 1 - les/distributor.go | 10 +- les/distributor_test.go | 3 +- les/fetcher.go | 52 +- les/handler_test.go | 35 +- les/odr.go | 6 +- les/odr_requests.go | 76 +-- les/odr_test.go | 8 +- les/peer.go | 1396 +++++++++++++++++++++++---------------- les/peer_test.go | 377 ++++------- les/request_test.go | 3 +- les/retrieve.go | 8 +- les/server.go | 9 +- les/server_handler.go | 47 +- les/serverpool.go | 6 +- les/servingqueue.go | 10 +- les/sync.go | 6 +- les/sync_test.go | 4 +- les/test_helper.go | 44 +- les/txrelay.go | 44 +- les/ulc_test.go | 18 +- 25 files changed, 1210 insertions(+), 1088 deletions(-) diff --git a/les/benchmark.go b/les/benchmark.go index 42eeef10f3..dbb10a5c2a 100644 --- a/les/benchmark.go +++ b/les/benchmark.go @@ -42,7 +42,7 @@ type requestBenchmark interface { // init initializes the generator for generating the given number of randomized requests init(h *serverHandler, count int) error // request initiates sending a single request to the given peer - request(peer *peer, index int) error + request(peer *serverPeer, index int) error } // benchmarkBlockHeaders implements requestBenchmark @@ -72,11 +72,11 @@ func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkBlockHeaders) request(peer *peer, index int) error { +func (b *benchmarkBlockHeaders) request(peer *serverPeer, index int) error { if b.byHash { - return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse) + return peer.requestHeadersByHash(0, b.hashes[index], b.amount, b.skip, b.reverse) } else { - return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse) + return peer.requestHeadersByNumber(0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse) } } @@ -95,11 +95,11 @@ func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error { +func (b *benchmarkBodiesOrReceipts) request(peer *serverPeer, index int) error { if b.receipts { - return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]}) + return peer.requestReceipts(0, []common.Hash{b.hashes[index]}) } else { - return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]}) + return peer.requestBodies(0, []common.Hash{b.hashes[index]}) } } @@ -114,13 +114,13 @@ func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkProofsOrCode) request(peer *peer, index int) error { +func (b *benchmarkProofsOrCode) request(peer *serverPeer, index int) error { key := make([]byte, 32) rand.Read(key) if b.code { - return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}}) + return peer.requestCode(0, []CodeReq{{BHash: b.headHash, AccKey: key}}) } else { - return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}}) + return peer.requestProofs(0, []ProofReq{{BHash: b.headHash, Key: key}}) } } @@ -144,7 +144,7 @@ func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkHelperTrie) request(peer *peer, index int) error { +func (b *benchmarkHelperTrie) request(peer *serverPeer, index int) error { reqs := make([]HelperTrieReq, b.reqCount) if b.bloom { @@ -163,7 +163,7 @@ func (b *benchmarkHelperTrie) request(peer *peer, index int) error { } } - return peer.RequestHelperTrieProofs(0, 0, reqs) + return peer.requestHelperTrieProofs(0, reqs) } // benchmarkTxSend implements requestBenchmark @@ -189,9 +189,9 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkTxSend) request(peer *peer, index int) error { +func (b *benchmarkTxSend) request(peer *serverPeer, index int) error { enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]}) - return peer.SendTxs(0, 0, enc) + return peer.sendTxs(0, enc) } // benchmarkTxStatus implements requestBenchmark @@ -201,10 +201,10 @@ func (b *benchmarkTxStatus) init(h *serverHandler, count int) error { return nil } -func (b *benchmarkTxStatus) request(peer *peer, index int) error { +func (b *benchmarkTxStatus) request(peer *serverPeer, index int) error { var hash common.Hash rand.Read(hash[:]) - return peer.RequestTxStatus(0, 0, []common.Hash{hash}) + return peer.requestTxStatus(0, []common.Hash{hash}) } // benchmarkSetup stores measurement data for a single benchmark type @@ -283,18 +283,17 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error { var id enode.ID rand.Read(id[:]) - clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe) - serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe) - serverPeer.sendQueue = newExecQueue(count) - serverPeer.announceType = announceTypeNone - serverPeer.fcCosts = make(requestCostTable) + peer1 := newServerPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe) + peer2 := newClientPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe) + peer2.announceType = announceTypeNone + peer2.fcCosts = make(requestCostTable) c := &requestCosts{} for code := range requests { - serverPeer.fcCosts[code] = c + peer2.fcCosts[code] = c } - serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1} - serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams) - defer serverPeer.fcClient.Disconnect() + peer2.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1} + peer2.fcClient = flowcontrol.NewClientNode(h.server.fcManager, peer2.fcParams) + defer peer2.fcClient.Disconnect() if err := setup.req.init(h, count); err != nil { return err @@ -305,7 +304,7 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error { go func() { for i := 0; i < count; i++ { - if err := setup.req.request(clientPeer, i); err != nil { + if err := setup.req.request(peer1, i); err != nil { errCh <- err return } @@ -313,7 +312,7 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error { }() go func() { for i := 0; i < count; i++ { - if err := h.handleMsg(serverPeer, &sync.WaitGroup{}); err != nil { + if err := h.handleMsg(peer2, &sync.WaitGroup{}); err != nil { errCh <- err return } diff --git a/les/client.go b/les/client.go index 36d5a8d2fe..dfd0909778 100644 --- a/les/client.go +++ b/les/client.go @@ -49,6 +49,7 @@ import ( type LightEthereum struct { lesCommons + peers *serverPeerSet reqDist *requestDistributor retriever *retrieveManager odr *LesOdr @@ -80,7 +81,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } log.Info("Initialised chain configuration", "config", chainConfig) - peers := newPeerSet() + peers := newServerPeerSet() leth := &LightEthereum{ lesCommons: lesCommons{ genesis: genesisHash, @@ -88,9 +89,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { chainConfig: chainConfig, iConfig: light.DefaultClientIndexerConfig, chainDb: chainDb, - peers: peers, closeCh: make(chan struct{}), }, + peers: peers, eventMux: ctx.EventMux, reqDist: newRequestDistributor(peers, &mclock.System{}), accountManager: ctx.AccountManager, @@ -225,7 +226,7 @@ func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux // network protocols to start. func (s *LightEthereum) Protocols() []p2p.Protocol { return s.makeProtocols(ClientProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} { - if p := s.peers.Peer(peerIdToString(id)); p != nil { + if p := s.peers.peer(peerIdToString(id)); p != nil { return p.Info() } return nil @@ -253,7 +254,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error { // Ethereum protocol. func (s *LightEthereum) Stop() error { close(s.closeCh) - s.peers.Close() + s.peers.close() s.reqDist.close() s.odr.Stop() s.relay.Stop() diff --git a/les/client_handler.go b/les/client_handler.go index 7fdb165719..d04574c8c7 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -65,7 +65,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T } handler.fetcher = newLightFetcher(handler) handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer) - handler.backend.peers.notify((*downloaderPeerNotify)(handler)) + handler.backend.peers.subscribe((*downloaderPeerNotify)(handler)) return handler } @@ -82,7 +82,8 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) if h.ulc != nil { trusted = h.ulc.trusted(p.ID()) } - peer := newPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version))) + peer := newServerPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version))) + defer peer.close() peer.poolEntry = h.backend.serverPool.connect(peer, peer.Node()) if peer.poolEntry == nil { return p2p.DiscRequested @@ -94,8 +95,8 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) return err } -func (h *clientHandler) handle(p *peer) error { - if h.backend.peers.Len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted { +func (h *clientHandler) handle(p *serverPeer) error { + if h.backend.peers.len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted { return p2p.DiscTooManyPeers } p.Log().Debug("Light Ethereum peer connected", "name", p.Name()) @@ -112,20 +113,20 @@ func (h *clientHandler) handle(p *peer) error { return err } // Register the peer locally - if err := h.backend.peers.Register(p); err != nil { + if err := h.backend.peers.register(p); err != nil { p.Log().Error("Light Ethereum peer registration failed", "err", err) return err } - serverConnectionGauge.Update(int64(h.backend.peers.Len())) + serverConnectionGauge.Update(int64(h.backend.peers.len())) connectedAt := mclock.Now() defer func() { - h.backend.peers.Unregister(p.id) + h.backend.peers.unregister(p.id) connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) - serverConnectionGauge.Update(int64(h.backend.peers.Len())) + serverConnectionGauge.Update(int64(h.backend.peers.len())) }() - h.fetcher.announce(p, p.headInfo) + h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}) // pool entry can be nil during the unit test. if p.poolEntry != nil { @@ -143,7 +144,7 @@ func (h *clientHandler) handle(p *peer) error { // handleMsg is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. -func (h *clientHandler) handleMsg(p *peer) error { +func (h *clientHandler) handleMsg(p *serverPeer) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { @@ -297,7 +298,7 @@ func (h *clientHandler) handleMsg(p *peer) error { Obj: resp.Status, } case StopMsg: - p.freezeServer(true) + p.freeze() h.backend.retriever.frozen(p) p.Log().Debug("Service stopped") case ResumeMsg: @@ -306,7 +307,7 @@ func (h *clientHandler) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ResumeFreeze(bv) - p.freezeServer(false) + p.unfreeze() p.Log().Debug("Service resumed") default: p.Log().Trace("Received invalid message", "code", msg.Code) @@ -315,8 +316,8 @@ func (h *clientHandler) handleMsg(p *peer) error { // Deliver the received response to retriever. if deliverMsg != nil { if err := h.backend.retriever.deliver(p, deliverMsg); err != nil { - p.responseErrors++ - if p.responseErrors > maxResponseErrors { + p.errCount++ + if p.errCount > maxResponseErrors { return err } } @@ -325,12 +326,12 @@ func (h *clientHandler) handleMsg(p *peer) error { } func (h *clientHandler) removePeer(id string) { - h.backend.peers.Unregister(id) + h.backend.peers.unregister(id) } type peerConnection struct { handler *clientHandler - peer *peer + peer *serverPeer } func (pc *peerConnection) Head() (common.Hash, *big.Int) { @@ -340,18 +341,18 @@ func (pc *peerConnection) Head() (common.Hash, *big.Int) { func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { rq := &distReq{ getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer := dp.(*serverPeer) + return peer.getRequestCost(GetBlockHeadersMsg, amount) }, canSend: func(dp distPeer) bool { - return dp.(*peer) == pc.peer + return dp.(*serverPeer) == pc.peer }, request: func(dp distPeer) func() { reqID := genReqID() - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer := dp.(*serverPeer) + cost := peer.getRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueuedRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.requestHeadersByHash(reqID, origin, amount, skip, reverse) } }, } _, ok := <-pc.handler.backend.reqDist.queue(rq) @@ -364,18 +365,18 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { rq := &distReq{ getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer := dp.(*serverPeer) + return peer.getRequestCost(GetBlockHeadersMsg, amount) }, canSend: func(dp distPeer) bool { - return dp.(*peer) == pc.peer + return dp.(*serverPeer) == pc.peer }, request: func(dp distPeer) func() { reqID := genReqID() - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer := dp.(*serverPeer) + cost := peer.getRequestCost(GetBlockHeadersMsg, amount) peer.fcServer.QueuedRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + return func() { peer.requestHeadersByNumber(reqID, origin, amount, skip, reverse) } }, } _, ok := <-pc.handler.backend.reqDist.queue(rq) @@ -388,7 +389,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip // downloaderPeerNotify implements peerSetNotify type downloaderPeerNotify clientHandler -func (d *downloaderPeerNotify) registerPeer(p *peer) { +func (d *downloaderPeerNotify) registerPeer(p *serverPeer) { h := (*clientHandler)(d) pc := &peerConnection{ handler: h, @@ -397,7 +398,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { h.downloader.RegisterLightPeer(p.id, ethVersion, pc) } -func (d *downloaderPeerNotify) unregisterPeer(p *peer) { +func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) { h := (*clientHandler)(d) h.downloader.UnregisterPeer(p.id) } diff --git a/les/clientpool.go b/les/clientpool.go index da76f08b91..b01c825a7a 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -97,12 +97,12 @@ type clientPool struct { disableBias bool // Disable connection bias(used in testing) } -// clientPeer represents a client in the pool. +// clientPoolPeer represents a client peer in the pool. // Positive balances are assigned to node key while negative balances are assigned // to freeClientId. Currently network IP address without port is used because // clients have a limited access to IP addresses while new node keys can be easily // generated so it would be useless to assign a negative value to them. -type clientPeer interface { +type clientPoolPeer interface { ID() enode.ID freeClientId() string updateCapacity(uint64) @@ -117,7 +117,7 @@ type clientInfo struct { capacity uint64 priority bool pool *clientPool - peer clientPeer + peer clientPoolPeer queueIndex int // position in connectedQueue balanceTracker balanceTracker posFactors, negFactors priceFactors @@ -207,7 +207,7 @@ func (f *clientPool) stop() { // connect should be called after a successful handshake. If the connection was // rejected, there is no need to call disconnect. -func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { +func (f *clientPool) connect(peer clientPoolPeer, capacity uint64) bool { f.lock.Lock() defer f.lock.Unlock() @@ -322,7 +322,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { // disconnect should be called when a connection is terminated. If the disconnection // was initiated by the pool itself using disconnectFn then calling disconnect is // not necessary but permitted. -func (f *clientPool) disconnect(p clientPeer) { +func (f *clientPool) disconnect(p clientPoolPeer) { f.lock.Lock() defer f.lock.Unlock() @@ -516,7 +516,7 @@ func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error { } // requestCost feeds request cost after serving a request from the given peer. -func (f *clientPool) requestCost(p *peer, cost uint64) { +func (f *clientPool) requestCost(p *clientPeer, cost uint64) { f.lock.Lock() defer f.lock.Unlock() diff --git a/les/commons.go b/les/commons.go index b402c51769..29b5b7660e 100644 --- a/les/commons.go +++ b/les/commons.go @@ -61,7 +61,6 @@ type lesCommons struct { chainConfig *params.ChainConfig iConfig *light.IndexerConfig chainDb ethdb.Database - peers *peerSet chainReader chainReader chtIndexer, bloomTrieIndexer *core.ChainIndexer oracle *checkpointoracle.CheckpointOracle diff --git a/les/distributor.go b/les/distributor.go index 6d81149720..4d2be1b8f9 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -49,7 +49,7 @@ type requestDistributor struct { type distPeer interface { waitBefore(uint64) (time.Duration, float64) canQueue() bool - queueSend(f func()) + queueSend(f func()) bool } // distReq is the request abstraction used by the distributor. It is based on @@ -73,7 +73,7 @@ type distReq struct { } // newRequestDistributor creates a new request distributor -func newRequestDistributor(peers *peerSet, clock mclock.Clock) *requestDistributor { +func newRequestDistributor(peers *serverPeerSet, clock mclock.Clock) *requestDistributor { d := &requestDistributor{ clock: clock, reqQueue: list.New(), @@ -82,7 +82,7 @@ func newRequestDistributor(peers *peerSet, clock mclock.Clock) *requestDistribut peers: make(map[distPeer]struct{}), } if peers != nil { - peers.notify(d) + peers.subscribe(d) } d.wg.Add(1) go d.loop() @@ -90,14 +90,14 @@ func newRequestDistributor(peers *peerSet, clock mclock.Clock) *requestDistribut } // registerPeer implements peerSetNotify -func (d *requestDistributor) registerPeer(p *peer) { +func (d *requestDistributor) registerPeer(p *serverPeer) { d.peerLock.Lock() d.peers[p] = struct{}{} d.peerLock.Unlock() } // unregisterPeer implements peerSetNotify -func (d *requestDistributor) unregisterPeer(p *peer) { +func (d *requestDistributor) unregisterPeer(p *serverPeer) { d.peerLock.Lock() delete(d.peers, p) d.peerLock.Unlock() diff --git a/les/distributor_test.go b/les/distributor_test.go index 539c1aa7d7..9a93dba145 100644 --- a/les/distributor_test.go +++ b/les/distributor_test.go @@ -105,8 +105,9 @@ func (p *testDistPeer) canQueue() bool { return true } -func (p *testDistPeer) queueSend(f func()) { +func (p *testDistPeer) queueSend(f func()) bool { f() + return true } func TestRequestDistributor(t *testing.T) { diff --git a/les/fetcher.go b/les/fetcher.go index df76c56d70..7fa81cbcbb 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -45,10 +45,10 @@ type lightFetcher struct { lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests maxConfirmedTd *big.Int - peers map[*peer]*fetcherPeerInfo + peers map[*serverPeer]*fetcherPeerInfo lastUpdateStats *updateStatsEntry syncing bool - syncDone chan *peer + syncDone chan *serverPeer reqMu sync.RWMutex // reqMu protects access to sent header fetch requests requested map[uint64]fetchRequest @@ -96,7 +96,7 @@ type fetcherTreeNode struct { type fetchRequest struct { hash common.Hash amount uint64 - peer *peer + peer *serverPeer sent mclock.AbsTime timeout bool } @@ -105,7 +105,7 @@ type fetchRequest struct { type fetchResponse struct { reqID uint64 headers []*types.Header - peer *peer + peer *serverPeer } // newLightFetcher creates a new light fetcher @@ -113,16 +113,16 @@ func newLightFetcher(h *clientHandler) *lightFetcher { f := &lightFetcher{ handler: h, chain: h.backend.blockchain, - peers: make(map[*peer]*fetcherPeerInfo), + peers: make(map[*serverPeer]*fetcherPeerInfo), deliverChn: make(chan fetchResponse, 100), requested: make(map[uint64]fetchRequest), timeoutChn: make(chan uint64), requestTrigger: make(chan struct{}, 1), - syncDone: make(chan *peer), + syncDone: make(chan *serverPeer), closeCh: make(chan struct{}), maxConfirmedTd: big.NewInt(0), } - h.backend.peers.notify(f) + h.backend.peers.subscribe(f) f.wg.Add(1) go f.syncLoop() @@ -222,7 +222,7 @@ func (f *lightFetcher) syncLoop() { } // registerPeer adds a new peer to the fetcher's peer set -func (f *lightFetcher) registerPeer(p *peer) { +func (f *lightFetcher) registerPeer(p *serverPeer) { p.lock.Lock() p.hasBlock = func(hash common.Hash, number uint64, hasState bool) bool { return f.peerHasBlock(p, hash, number, hasState) @@ -235,7 +235,7 @@ func (f *lightFetcher) registerPeer(p *peer) { } // unregisterPeer removes a new peer from the fetcher's peer set -func (f *lightFetcher) unregisterPeer(p *peer) { +func (f *lightFetcher) unregisterPeer(p *serverPeer) { p.lock.Lock() p.hasBlock = nil p.lock.Unlock() @@ -250,7 +250,7 @@ func (f *lightFetcher) unregisterPeer(p *peer) { // announce processes a new announcement message received from a peer, adding new // nodes to the peer's block tree and removing old nodes if necessary -func (f *lightFetcher) announce(p *peer, head *announceData) { +func (f *lightFetcher) announce(p *serverPeer, head *announceData) { f.lock.Lock() defer f.lock.Unlock() p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth) @@ -346,7 +346,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { f.checkKnownNode(p, n) p.lock.Lock() - p.headInfo = head + p.headInfo = blockInfo{Number: head.Number, Hash: head.Hash, Td: head.Td} fp.lastAnnounced = n p.lock.Unlock() f.checkUpdateStats(p, nil) @@ -358,7 +358,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { // peerHasBlock returns true if we can assume the peer knows the given block // based on its announcements -func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64, hasState bool) bool { +func (f *lightFetcher) peerHasBlock(p *serverPeer, hash common.Hash, number uint64, hasState bool) bool { f.lock.Lock() defer f.lock.Unlock() @@ -395,7 +395,7 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64, ha // requestAmount calculates the amount of headers to be downloaded starting // from a certain head backwards -func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { +func (f *lightFetcher) requestAmount(p *serverPeer, n *fetcherTreeNode) uint64 { amount := uint64(0) nn := n for nn != nil && !f.checkKnownNode(p, nn) { @@ -488,7 +488,7 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { return 0 }, canSend: func(dp distPeer) bool { - p := dp.(*peer) + p := dp.(*serverPeer) f.lock.Lock() defer f.lock.Unlock() @@ -504,7 +504,7 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { f.setLastTrustedHeader(f.chain.CurrentHeader()) } go func() { - p := dp.(*peer) + p := dp.(*serverPeer) p.Log().Debug("Synchronisation started") f.handler.synchronise(p) f.syncDone <- p @@ -518,11 +518,11 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq { return &distReq{ getCost: func(dp distPeer) uint64 { - p := dp.(*peer) - return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + p := dp.(*serverPeer) + return p.getRequestCost(GetBlockHeadersMsg, int(bestAmount)) }, canSend: func(dp distPeer) bool { - p := dp.(*peer) + p := dp.(*serverPeer) f.lock.Lock() defer f.lock.Unlock() @@ -537,7 +537,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes return n != nil && !n.requested }, request: func(dp distPeer) func() { - p := dp.(*peer) + p := dp.(*serverPeer) f.lock.Lock() fp := f.peers[p] if fp != nil { @@ -548,7 +548,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes } f.lock.Unlock() - cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + cost := p.getRequestCost(GetBlockHeadersMsg, int(bestAmount)) p.fcServer.QueuedRequest(reqID, cost) f.reqMu.Lock() f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} @@ -557,13 +557,13 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes time.Sleep(hardRequestTimeout) f.timeoutChn <- reqID }() - return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + return func() { p.requestHeadersByHash(reqID, bestHash, int(bestAmount), 0, true) } }, } } // deliverHeaders delivers header download request responses for processing -func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) { +func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqID uint64, headers []*types.Header) { f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} } @@ -694,7 +694,7 @@ func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*typ // checkSyncedHeaders updates peer's block tree after synchronisation by marking // downloaded headers as known. If none of the announced headers are found after // syncing, the peer is dropped. -func (f *lightFetcher) checkSyncedHeaders(p *peer) { +func (f *lightFetcher) checkSyncedHeaders(p *serverPeer) { fp := f.peers[p] if fp == nil { p.Log().Debug("Unknown peer to check sync headers") @@ -728,7 +728,7 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) { } // lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes -func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Hash) { +func (f *lightFetcher) lastTrustedTreeNode(p *serverPeer) (*types.Header, []common.Hash) { unapprovedHashes := make([]common.Hash, 0) current := f.chain.CurrentHeader() @@ -764,7 +764,7 @@ func (f *lightFetcher) setLastTrustedHeader(h *types.Header) { // checkKnownNode checks if a block tree node is known (downloaded and validated) // If it was not known previously but found in the database, sets its known flag -func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool { +func (f *lightFetcher) checkKnownNode(p *serverPeer, n *fetcherTreeNode) bool { if n.known { return true } @@ -867,7 +867,7 @@ func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { // If a new entry has been added to the global tail, it is passed as a parameter here even though this function // assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil), // it can set the new head to newEntry. -func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { +func (f *lightFetcher) checkUpdateStats(p *serverPeer, newEntry *updateStatsEntry) { now := mclock.Now() fp := f.peers[p] if fp == nil { diff --git a/les/handler_test.go b/les/handler_test.go index aad8d18e45..1612caf427 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -168,8 +168,7 @@ func testGetBlockHeaders(t *testing.T, protocol int) { // Send the hash request and verify the response reqID++ - cost := server.peer.peer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount)) - sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, cost, tt.query) + sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, tt.query) if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { t.Errorf("test %d: headers mismatch: %v", i, err) } @@ -246,8 +245,7 @@ func testGetBlockBodies(t *testing.T, protocol int) { reqID++ // Send the hash request and verify the response - cost := server.peer.peer.GetRequestCost(GetBlockBodiesMsg, len(hashes)) - sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, cost, hashes) + sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, hashes) if err := expectResponse(server.peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { t.Errorf("test %d: bodies mismatch: %v", i, err) } @@ -278,8 +276,7 @@ func testGetCode(t *testing.T, protocol int) { } } - cost := server.peer.peer.GetRequestCost(GetCodeMsg, len(codereqs)) - sendRequest(server.peer.app, GetCodeMsg, 42, cost, codereqs) + sendRequest(server.peer.app, GetCodeMsg, 42, codereqs) if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, codes); err != nil { t.Errorf("codes mismatch: %v", err) } @@ -299,8 +296,7 @@ func testGetStaleCode(t *testing.T, protocol int) { BHash: bc.GetHeaderByNumber(number).Hash(), AccKey: crypto.Keccak256(testContractAddr[:]), } - cost := server.peer.peer.GetRequestCost(GetCodeMsg, 1) - sendRequest(server.peer.app, GetCodeMsg, 42, cost, []*CodeReq{req}) + sendRequest(server.peer.app, GetCodeMsg, 42, []*CodeReq{req}) if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, expected); err != nil { t.Errorf("codes mismatch: %v", err) } @@ -331,8 +327,7 @@ func testGetReceipt(t *testing.T, protocol int) { receipts = append(receipts, rawdb.ReadRawReceipts(server.db, block.Hash(), block.NumberU64())) } // Send the hash request and verify the response - cost := server.peer.peer.GetRequestCost(GetReceiptsMsg, len(hashes)) - sendRequest(server.peer.app, GetReceiptsMsg, 42, cost, hashes) + sendRequest(server.peer.app, GetReceiptsMsg, 42, hashes) if err := expectResponse(server.peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { t.Errorf("receipts mismatch: %v", err) } @@ -367,8 +362,7 @@ func testGetProofs(t *testing.T, protocol int) { } } // Send the proof request and verify the response - cost := server.peer.peer.GetRequestCost(GetProofsV2Msg, len(proofreqs)) - sendRequest(server.peer.app, GetProofsV2Msg, 42, cost, proofreqs) + sendRequest(server.peer.app, GetProofsV2Msg, 42, proofreqs) if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { t.Errorf("proofs mismatch: %v", err) } @@ -392,8 +386,7 @@ func testGetStaleProof(t *testing.T, protocol int) { BHash: header.Hash(), Key: account, } - cost := server.peer.peer.GetRequestCost(GetProofsV2Msg, 1) - sendRequest(server.peer.app, GetProofsV2Msg, 42, cost, []*ProofReq{req}) + sendRequest(server.peer.app, GetProofsV2Msg, 42, []*ProofReq{req}) var expected []rlp.RawValue if wantOK { @@ -453,8 +446,7 @@ func testGetCHTProofs(t *testing.T, protocol int) { AuxReq: auxHeader, }} // Send the proof request and verify the response - cost := server.peer.peer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2)) - sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2) + sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requestsV2) if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { t.Errorf("proofs mismatch: %v", err) } @@ -502,8 +494,7 @@ func testGetBloombitsProofs(t *testing.T, protocol int) { trie.Prove(key, 0, &proofs.Proofs) // Send the proof request and verify the response - cost := server.peer.peer.GetRequestCost(GetHelperTrieProofsMsg, len(requests)) - sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, cost, requests) + sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requests) if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { t.Errorf("bit %d: proofs mismatch: %v", bit, err) } @@ -525,11 +516,9 @@ func testTransactionStatus(t *testing.T, protocol int) { test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) { reqID++ if send { - cost := server.peer.peer.GetRequestCost(SendTxV2Msg, 1) - sendRequest(server.peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx}) + sendRequest(server.peer.app, SendTxV2Msg, reqID, types.Transactions{tx}) } else { - cost := server.peer.peer.GetRequestCost(GetTxStatusMsg, 1) - sendRequest(server.peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) + sendRequest(server.peer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()}) } if err := expectResponse(server.peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil { t.Errorf("transaction status mismatch") @@ -620,7 +609,7 @@ func TestStopResumeLes3(t *testing.T) { header := server.handler.blockchain.CurrentHeader() req := func() { reqID++ - sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, testCost, &getBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) + sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) } for i := 1; i <= 5; i++ { // send requests while we still have enough buffer and expect a response diff --git a/les/odr.go b/les/odr.go index 136ecf4df4..f8469cc103 100644 --- a/les/odr.go +++ b/les/odr.go @@ -106,17 +106,17 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { - return lreq.GetCost(dp.(*peer)) + return lreq.GetCost(dp.(*serverPeer)) }, canSend: func(dp distPeer) bool { - p := dp.(*peer) + p := dp.(*serverPeer) if !p.onlyAnnounce { return lreq.CanSend(p) } return false }, request: func(dp distPeer) func() { - p := dp.(*peer) + p := dp.(*serverPeer) cost := lreq.GetCost(p) p.fcServer.QueuedRequest(reqID, cost) return func() { lreq.Request(reqID, p) } diff --git a/les/odr_requests.go b/les/odr_requests.go index 3c4dd7090f..146da2213c 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -46,9 +46,9 @@ var ( ) type LesOdrRequest interface { - GetCost(*peer) uint64 - CanSend(*peer) bool - Request(uint64, *peer) error + GetCost(*serverPeer) uint64 + CanSend(*serverPeer) bool + Request(uint64, *serverPeer) error Validate(ethdb.Database, *Msg) error } @@ -78,19 +78,19 @@ type BlockRequest light.BlockRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *BlockRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetBlockBodiesMsg, 1) +func (r *BlockRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetBlockBodiesMsg, 1) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *BlockRequest) CanSend(peer *peer) bool { +func (r *BlockRequest) CanSend(peer *serverPeer) bool { return peer.HasBlock(r.Hash, r.Number, false) } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *BlockRequest) Request(reqID uint64, peer *peer) error { +func (r *BlockRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting block body", "hash", r.Hash) - return peer.RequestBodies(reqID, r.GetCost(peer), []common.Hash{r.Hash}) + return peer.requestBodies(reqID, []common.Hash{r.Hash}) } // Valid processes an ODR request reply message from the LES network @@ -134,19 +134,19 @@ type ReceiptsRequest light.ReceiptsRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *ReceiptsRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetReceiptsMsg, 1) +func (r *ReceiptsRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetReceiptsMsg, 1) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *ReceiptsRequest) CanSend(peer *peer) bool { +func (r *ReceiptsRequest) CanSend(peer *serverPeer) bool { return peer.HasBlock(r.Hash, r.Number, false) } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error { +func (r *ReceiptsRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting block receipts", "hash", r.Hash) - return peer.RequestReceipts(reqID, r.GetCost(peer), []common.Hash{r.Hash}) + return peer.requestReceipts(reqID, []common.Hash{r.Hash}) } // Valid processes an ODR request reply message from the LES network @@ -191,24 +191,24 @@ type TrieRequest light.TrieRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *TrieRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetProofsV2Msg, 1) +func (r *TrieRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetProofsV2Msg, 1) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *TrieRequest) CanSend(peer *peer) bool { +func (r *TrieRequest) CanSend(peer *serverPeer) bool { return peer.HasBlock(r.Id.BlockHash, r.Id.BlockNumber, true) } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *TrieRequest) Request(reqID uint64, peer *peer) error { +func (r *TrieRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting trie proof", "root", r.Id.Root, "key", r.Key) req := ProofReq{ BHash: r.Id.BlockHash, AccKey: r.Id.AccKey, Key: r.Key, } - return peer.RequestProofs(reqID, r.GetCost(peer), []ProofReq{req}) + return peer.requestProofs(reqID, []ProofReq{req}) } // Valid processes an ODR request reply message from the LES network @@ -245,23 +245,23 @@ type CodeRequest light.CodeRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *CodeRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetCodeMsg, 1) +func (r *CodeRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetCodeMsg, 1) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *CodeRequest) CanSend(peer *peer) bool { +func (r *CodeRequest) CanSend(peer *serverPeer) bool { return peer.HasBlock(r.Id.BlockHash, r.Id.BlockNumber, true) } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *CodeRequest) Request(reqID uint64, peer *peer) error { +func (r *CodeRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting code data", "hash", r.Hash) req := CodeReq{ BHash: r.Id.BlockHash, AccKey: r.Id.AccKey, } - return peer.RequestCode(reqID, r.GetCost(peer), []CodeReq{req}) + return peer.requestCode(reqID, []CodeReq{req}) } // Valid processes an ODR request reply message from the LES network @@ -316,12 +316,12 @@ type ChtRequest light.ChtRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *ChtRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetHelperTrieProofsMsg, 1) +func (r *ChtRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetHelperTrieProofsMsg, 1) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *ChtRequest) CanSend(peer *peer) bool { +func (r *ChtRequest) CanSend(peer *serverPeer) bool { peer.lock.RLock() defer peer.lock.RUnlock() @@ -333,7 +333,7 @@ func (r *ChtRequest) CanSend(peer *peer) bool { } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *ChtRequest) Request(reqID uint64, peer *peer) error { +func (r *ChtRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting CHT", "cht", r.ChtNum, "block", r.BlockNum) var encNum [8]byte binary.BigEndian.PutUint64(encNum[:], r.BlockNum) @@ -343,7 +343,7 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error { Key: encNum[:], AuxReq: auxHeader, } - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req}) + return peer.requestHelperTrieProofs(reqID, []HelperTrieReq{req}) } // Valid processes an ODR request reply message from the LES network @@ -413,12 +413,12 @@ type BloomRequest light.BloomRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *BloomRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetHelperTrieProofsMsg, len(r.SectionIndexList)) +func (r *BloomRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetHelperTrieProofsMsg, len(r.SectionIndexList)) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *BloomRequest) CanSend(peer *peer) bool { +func (r *BloomRequest) CanSend(peer *serverPeer) bool { peer.lock.RLock() defer peer.lock.RUnlock() @@ -429,7 +429,7 @@ func (r *BloomRequest) CanSend(peer *peer) bool { } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *BloomRequest) Request(reqID uint64, peer *peer) error { +func (r *BloomRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting BloomBits", "bloomTrie", r.BloomTrieNum, "bitIdx", r.BitIdx, "sections", r.SectionIndexList) reqs := make([]HelperTrieReq, len(r.SectionIndexList)) @@ -444,7 +444,7 @@ func (r *BloomRequest) Request(reqID uint64, peer *peer) error { Key: common.CopyBytes(encNumber[:]), } } - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), reqs) + return peer.requestHelperTrieProofs(reqID, reqs) } // Valid processes an ODR request reply message from the LES network @@ -489,19 +489,19 @@ type TxStatusRequest light.TxStatusRequest // GetCost returns the cost of the given ODR request according to the serving // peer's cost table (implementation of LesOdrRequest) -func (r *TxStatusRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetTxStatusMsg, len(r.Hashes)) +func (r *TxStatusRequest) GetCost(peer *serverPeer) uint64 { + return peer.getRequestCost(GetTxStatusMsg, len(r.Hashes)) } // CanSend tells if a certain peer is suitable for serving the given request -func (r *TxStatusRequest) CanSend(peer *peer) bool { +func (r *TxStatusRequest) CanSend(peer *serverPeer) bool { return peer.version >= lpv2 } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *TxStatusRequest) Request(reqID uint64, peer *peer) error { +func (r *TxStatusRequest) Request(reqID uint64, peer *serverPeer) error { peer.Log().Debug("Requesting transaction status", "count", len(r.Hashes)) - return peer.RequestTxStatus(reqID, r.GetCost(peer), r.Hashes) + return peer.requestTxStatus(reqID, r.Hashes) } // Valid processes an ODR request reply message from the LES network diff --git a/les/odr_test.go b/les/odr_test.go index 7d10878226..bbe439dfec 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -186,7 +186,7 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) defer tearDown() - client.handler.synchronise(client.peer.peer) + client.handler.synchronise(client.peer.speer) // Ensure the client has synced all necessary data. clientHead := client.handler.backend.blockchain.CurrentHeader() @@ -224,19 +224,19 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od // expect retrievals to fail (except genesis block) without a les peer client.handler.backend.peers.lock.Lock() - client.peer.peer.hasBlock = func(common.Hash, uint64, bool) bool { return false } + client.peer.speer.hasBlock = func(common.Hash, uint64, bool) bool { return false } client.handler.backend.peers.lock.Unlock() test(expFail) // expect all retrievals to pass client.handler.backend.peers.lock.Lock() - client.peer.peer.hasBlock = func(common.Hash, uint64, bool) bool { return true } + client.peer.speer.hasBlock = func(common.Hash, uint64, bool) bool { return true } client.handler.backend.peers.lock.Unlock() test(5) // still expect all retrievals to pass, now data should be cached locally if checkCached { - client.handler.backend.peers.Unregister(client.peer.peer.id) + client.handler.backend.peers.unregister(client.peer.speer.id) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(5) } diff --git a/les/peer.go b/les/peer.go index feb3910beb..28ec201bc9 100644 --- a/les/peer.go +++ b/les/peer.go @@ -48,24 +48,25 @@ var ( const ( maxRequestErrors = 20 // number of invalid requests tolerated (makes the protocol less brittle but still avoids spam) maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) -) -// capacity limitation for parameter updates -const ( allowedUpdateBytes = 100000 // initial/maximum allowed update size allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance -) -const ( freezeTimeBase = time.Millisecond * 700 // fixed component of client freeze time freezeTimeRandom = time.Millisecond * 600 // random component of client freeze time freezeCheckPeriod = time.Millisecond * 100 // buffer value recheck period after initial freeze time has elapsed -) -// if the total encoded size of a sent transaction batch is over txSizeCostLimit -// per transaction then the request cost is calculated as proportional to the -// encoded size instead of the transaction count -const txSizeCostLimit = 0x4000 + // If the total encoded size of a sent transaction batch is over txSizeCostLimit + // per transaction then the request cost is calculated as proportional to the + // encoded size instead of the transaction count + txSizeCostLimit = 0x4000 + + // handshakeTimeout is the timeout LES handshake will be treated as failed. + handshakeTimeout = 5 * time.Second + + // retrySendCachePeriod is the time interval a caching retry is performed. + retrySendCachePeriod = time.Millisecond * 100 +) const ( announceTypeNone = iota @@ -73,62 +74,46 @@ const ( announceTypeSigned ) -type peer struct { - *p2p.Peer - rw p2p.MsgReadWriter - - version int // Protocol version negotiated - network uint64 // Network ID being on - - announceType uint64 - - // Checkpoint relative fields - checkpoint params.TrustedCheckpoint - checkpointNumber uint64 - - id string +type keyValueEntry struct { + Key string + Value rlp.RawValue +} - headInfo *announceData - lock sync.RWMutex +type keyValueList []keyValueEntry +type keyValueMap map[string]rlp.RawValue - sendQueue *execQueue +func (l keyValueList) add(key string, val interface{}) keyValueList { + var entry keyValueEntry + entry.Key = key + if val == nil { + val = uint64(0) + } + enc, err := rlp.EncodeToBytes(val) + if err == nil { + entry.Value = enc + } + return append(l, entry) +} - errCh chan error +func (l keyValueList) decode() (keyValueMap, uint64) { + m := make(keyValueMap) + var size uint64 + for _, entry := range l { + m[entry.Key] = entry.Value + size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8 + } + return m, size +} - // responseLock ensures that responses are queued in the same order as - // RequestProcessed is called - responseLock sync.Mutex - responseCount uint64 - invalidCount uint32 - - poolEntry *poolEntry - hasBlock func(common.Hash, uint64, bool) bool - responseErrors int - updateCounter uint64 - updateTime mclock.AbsTime - frozen uint32 // 1 if client is in frozen state - - fcClient *flowcontrol.ClientNode // nil if the peer is server only - fcServer *flowcontrol.ServerNode // nil if the peer is client only - fcParams flowcontrol.ServerParams - fcCosts requestCostTable - - trusted, server bool - onlyAnnounce bool - chainSince, chainRecent uint64 - stateSince, stateRecent uint64 -} - -func newPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return &peer{ - Peer: p, - rw: rw, - version: version, - network: network, - id: peerIdToString(p.ID()), - trusted: trusted, - errCh: make(chan error, 1), +func (m keyValueMap) get(key string, val interface{}) error { + enc, ok := m[key] + if !ok { + return errResp(ErrMissingKey, "%s", key) + } + if val == nil { + return nil } + return rlp.DecodeBytes(enc, val) } // peerIdToString converts enode.ID to a string form @@ -136,103 +121,71 @@ func peerIdToString(id enode.ID) string { return fmt.Sprintf("%x", id.Bytes()) } -// freeClientId returns a string identifier for the peer. Multiple peers with the -// same identifier can not be connected in free mode simultaneously. -func (p *peer) freeClientId() string { - if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { - if addr.IP.IsLoopback() { - // using peer id instead of loopback ip address allows multiple free - // connections from local machine to own server - return p.id - } else { - return addr.IP.String() - } - } - return p.id -} +// peerCommons contains fields needed by both server peer and client peer. +type peerCommons struct { + *p2p.Peer + rw p2p.MsgReadWriter -// rejectUpdate returns true if a parameter update has to be rejected because -// the size and/or rate of updates exceed the capacity limitation -func (p *peer) rejectUpdate(size uint64) bool { - now := mclock.Now() - if p.updateCounter == 0 { - p.updateTime = now - } else { - dt := now - p.updateTime - r := uint64(dt / mclock.AbsTime(allowedUpdateRate)) - if p.updateCounter > r { - p.updateCounter -= r - p.updateTime += mclock.AbsTime(allowedUpdateRate * time.Duration(r)) - } else { - p.updateCounter = 0 - p.updateTime = now - } - } - p.updateCounter += size - return p.updateCounter > allowedUpdateBytes -} + id string // Peer identity. + version int // Protocol version negotiated. + network uint64 // Network ID being on. + frozen uint32 // Flag whether the peer is frozen. + announceType uint64 // New block announcement type. + headInfo blockInfo // Latest block information. -// freezeClient temporarily puts the client in a frozen state which means all -// unprocessed and subsequent requests are dropped. Unfreezing happens automatically -// after a short time if the client's buffer value is at least in the slightly positive -// region. The client is also notified about being frozen/unfrozen with a Stop/Resume -// message. -func (p *peer) freezeClient() { - if p.version < lpv3 { - // if Stop/Resume is not supported then just drop the peer after setting - // its frozen status permanently - atomic.StoreUint32(&p.frozen, 1) - p.Peer.Disconnect(p2p.DiscUselessPeer) - return - } - if atomic.SwapUint32(&p.frozen, 1) == 0 { - go func() { - p.SendStop() - time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) - for { - bufValue, bufLimit := p.fcClient.BufferStatus() - if bufLimit == 0 { - return - } - if bufValue <= bufLimit/8 { - time.Sleep(freezeCheckPeriod) - } else { - atomic.StoreUint32(&p.frozen, 0) - p.SendResume(bufValue) - break - } - } - }() - } -} + // Background task queue for caching peer tasks and executing in order. + sendQueue *execQueue -// freezeServer processes Stop/Resume messages from the given server -func (p *peer) freezeServer(frozen bool) { - var f uint32 - if frozen { - f = 1 - } - if atomic.SwapUint32(&p.frozen, f) != f && frozen { - p.sendQueue.clear() - } + // Flow control agreement. + fcParams flowcontrol.ServerParams // The config for token bucket. + fcCosts requestCostTable // The Maximum request cost table. + + closeCh chan struct{} + lock sync.RWMutex // Lock used to protect all thread-sensitive fields. } // isFrozen returns true if the client is frozen or the server has put our // client in frozen state -func (p *peer) isFrozen() bool { +func (p *peerCommons) isFrozen() bool { return atomic.LoadUint32(&p.frozen) != 0 } -func (p *peer) canQueue() bool { +// canQueue returns an indicator whether the peer can queue a operation. +func (p *peerCommons) canQueue() bool { return p.sendQueue.canQueue() && !p.isFrozen() } -func (p *peer) queueSend(f func()) { - p.sendQueue.queue(f) +// queueSend caches a peer operation in the background task queue. +// Please ensure to check `canQueue` before call this function +func (p *peerCommons) queueSend(f func()) bool { + return p.sendQueue.queue(f) +} + +// mustQueueSend starts a for loop and retry the caching if failed. +// If the stopCh is closed, then it returns. +func (p *peerCommons) mustQueueSend(f func()) { + for { + // Check whether the stopCh is closed. + select { + case <-p.closeCh: + return + default: + } + // If the function is successfully cached, return. + if p.canQueue() && p.queueSend(f) { + return + } + time.Sleep(retrySendCachePeriod) + } +} + +// String implements fmt.Stringer. +func (p *peerCommons) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("les/%d", p.version)) } // Info gathers and returns a collection of metadata known about a peer. -func (p *peer) Info() *eth.PeerInfo { +func (p *peerCommons) Info() *eth.PeerInfo { return ð.PeerInfo{ Version: p.version, Difficulty: p.Td(), @@ -241,62 +194,231 @@ func (p *peer) Info() *eth.PeerInfo { } // Head retrieves a copy of the current head (most recent) hash of the peer. -func (p *peer) Head() (hash common.Hash) { +func (p *peerCommons) Head() (hash common.Hash) { p.lock.RLock() defer p.lock.RUnlock() - copy(hash[:], p.headInfo.Hash[:]) - return hash + return p.headInfo.Hash } -func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) { +// Td retrieves the current total difficulty of a peer. +func (p *peerCommons) Td() *big.Int { p.lock.RLock() defer p.lock.RUnlock() - copy(hash[:], p.headInfo.Hash[:]) - return hash, p.headInfo.Td + return new(big.Int).Set(p.headInfo.Td) } -func (p *peer) headBlockInfo() blockInfo { +// HeadAndTd retrieves the current head hash and total difficulty of a peer. +func (p *peerCommons) HeadAndTd() (hash common.Hash, td *big.Int) { p.lock.RLock() defer p.lock.RUnlock() - return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td} + return p.headInfo.Hash, new(big.Int).Set(p.headInfo.Td) } -// Td retrieves the current total difficulty of a peer. -func (p *peer) Td() *big.Int { - p.lock.RLock() - defer p.lock.RUnlock() +// sendReceiveHandshake exchanges handshake packet with remote peer and returns any error +// if failed to send or receive packet. +func (p *peerCommons) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) { + var ( + errc = make(chan error, 2) + recvList keyValueList + ) + // Send out own handshake in a new thread + go func() { + errc <- p2p.Send(p.rw, StatusMsg, sendList) + }() + go func() { + // In the mean time retrieve the remote status message + msg, err := p.rw.ReadMsg() + if err != nil { + errc <- err + return + } + if msg.Code != StatusMsg { + errc <- errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) + return + } + if msg.Size > ProtocolMaxMsgSize { + errc <- errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + return + } + // Decode the handshake + if err := msg.Decode(&recvList); err != nil { + errc <- errResp(ErrDecode, "msg %v: %v", msg, err) + return + } + errc <- nil + }() + timeout := time.NewTimer(handshakeTimeout) + defer timeout.Stop() + for i := 0; i < 2; i++ { + select { + case err := <-errc: + if err != nil { + return nil, err + } + case <-timeout.C: + return nil, p2p.DiscReadTimeout + } + } + return recvList, nil +} - return new(big.Int).Set(p.headInfo.Td) +// handshake executes the les protocol handshake, negotiating version number, +// network IDs, difficulties, head and genesis blocks. Besides the basic handshake +// fields, server and client can exchange and resolve some specified fields through +// two callback functions. +func (p *peerCommons) handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, sendCallback func(*keyValueList), recvCallback func(keyValueMap) error) error { + p.lock.Lock() + defer p.lock.Unlock() + + var send keyValueList + + // Add some basic handshake fields + send = send.add("protocolVersion", uint64(p.version)) + send = send.add("networkId", p.network) + send = send.add("headTd", td) + send = send.add("headHash", head) + send = send.add("headNum", headNum) + send = send.add("genesisHash", genesis) + + // Add client-specified or server-specified fields + if sendCallback != nil { + sendCallback(&send) + } + // Exchange the handshake packet and resolve the received one. + recvList, err := p.sendReceiveHandshake(send) + if err != nil { + return err + } + recv, size := recvList.decode() + if size > allowedUpdateBytes { + return errResp(ErrRequestRejected, "") + } + var rGenesis, rHash common.Hash + var rVersion, rNetwork, rNum uint64 + var rTd *big.Int + if err := recv.get("protocolVersion", &rVersion); err != nil { + return err + } + if err := recv.get("networkId", &rNetwork); err != nil { + return err + } + if err := recv.get("headTd", &rTd); err != nil { + return err + } + if err := recv.get("headHash", &rHash); err != nil { + return err + } + if err := recv.get("headNum", &rNum); err != nil { + return err + } + if err := recv.get("genesisHash", &rGenesis); err != nil { + return err + } + if rGenesis != genesis { + return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8]) + } + if rNetwork != p.network { + return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network) + } + if int(rVersion) != p.version { + return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version) + } + p.headInfo = blockInfo{Hash: rHash, Number: rNum, Td: rTd} + if recvCallback != nil { + return recvCallback(recv) + } + return nil } -// waitBefore implements distPeer interface -func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) { - return p.fcServer.CanSend(maxCost) +// close closes the channel and notifies all background routines to exit. +func (p *peerCommons) close() { + close(p.closeCh) + p.sendQueue.quit() } -// updateCapacity updates the request serving capacity assigned to a given client -// and also sends an announcement about the updated flow control parameters -func (p *peer) updateCapacity(cap uint64) { - p.responseLock.Lock() - defer p.responseLock.Unlock() +// serverPeer represents each node to which the client is connected. +// The node here refers to the les server. +type serverPeer struct { + peerCommons - p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio} - p.fcClient.UpdateParams(p.fcParams) - var kvList keyValueList - kvList = kvList.add("flowControl/MRR", cap) - kvList = kvList.add("flowControl/BL", cap*bufLimitRatio) - p.queueSend(func() { p.SendAnnounce(announceData{Update: kvList}) }) + // Status fields + trusted bool // The flag whether the server is selected as trusted server. + onlyAnnounce bool // The flag whether the server sends announcement only. + chainSince, chainRecent uint64 // The range of chain server peer can serve. + stateSince, stateRecent uint64 // The range of state server peer can serve. + + // Advertised checkpoint fields + checkpointNumber uint64 // The block height which the checkpoint is registered. + checkpoint params.TrustedCheckpoint // The advertised checkpoint sent by server. + + poolEntry *poolEntry // Statistic for server peer. + fcServer *flowcontrol.ServerNode // Client side mirror token bucket. + + // Statistics + errCount int // Counter the invalid responses server has replied + updateCount uint64 + updateTime mclock.AbsTime + + // Callbacks + hasBlock func(common.Hash, uint64, bool) bool // Used to determine whether the server has the specified block. } -func (p *peer) responseID() uint64 { - p.responseCount += 1 - return p.responseCount +func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *serverPeer { + return &serverPeer{ + peerCommons: peerCommons{ + Peer: p, + rw: rw, + id: peerIdToString(p.ID()), + version: version, + network: network, + sendQueue: newExecQueue(100), + closeCh: make(chan struct{}), + }, + trusted: trusted, + } } -func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { +// rejectUpdate returns true if a parameter update has to be rejected because +// the size and/or rate of updates exceed the capacity limitation +func (p *serverPeer) rejectUpdate(size uint64) bool { + now := mclock.Now() + if p.updateCount == 0 { + p.updateTime = now + } else { + dt := now - p.updateTime + p.updateTime = now + + r := uint64(dt / mclock.AbsTime(allowedUpdateRate)) + if p.updateCount > r { + p.updateCount -= r + } else { + p.updateCount = 0 + } + } + p.updateCount += size + return p.updateCount > allowedUpdateBytes +} + +// freeze processes Stop messages from the given server and set the status as +// frozen. +func (p *serverPeer) freeze() { + if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) { + p.sendQueue.clear() + } +} + +// unfreeze processes Resume messages from the given server and set the status +// as unfrozen. +func (p *serverPeer) unfreeze() { + atomic.StoreUint32(&p.frozen, 0) +} + +// sendRequest send a request to the server based on the given message type +// and content. +func sendRequest(w p2p.MsgWriter, msgcode, reqID uint64, data interface{}) error { type req struct { ReqID uint64 Data interface{} @@ -304,30 +426,72 @@ func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) return p2p.Send(w, msgcode, req{reqID, data}) } -// reply struct represents a reply with the actual data already RLP encoded and -// only the bv (buffer value) missing. This allows the serving mechanism to -// calculate the bv value which depends on the data size before sending the reply. -type reply struct { - w p2p.MsgWriter - msgcode, reqID uint64 - data rlp.RawValue +// requestHeadersByHash fetches a batch of blocks' headers corresponding to the +// specified header query, based on the hash of an origin block. +func (p *serverPeer) requestHeadersByHash(reqID uint64, origin common.Hash, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) + return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) } -// send sends the reply with the calculated buffer value -func (r *reply) send(bv uint64) error { - type resp struct { - ReqID, BV uint64 - Data rlp.RawValue - } - return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data}) +// requestHeadersByNumber fetches a batch of blocks' headers corresponding to the +// specified header query, based on the number of an origin block. +func (p *serverPeer) requestHeadersByNumber(reqID, origin uint64, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) + return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) } -// size returns the RLP encoded size of the message data -func (r *reply) size() uint32 { - return uint32(len(r.data)) +// requestBodies fetches a batch of blocks' bodies corresponding to the hashes +// specified. +func (p *serverPeer) requestBodies(reqID uint64, hashes []common.Hash) error { + p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) + return sendRequest(p.rw, GetBlockBodiesMsg, reqID, hashes) +} + +// requestCode fetches a batch of arbitrary data from a node's known state +// data, corresponding to the specified hashes. +func (p *serverPeer) requestCode(reqID uint64, reqs []CodeReq) error { + p.Log().Debug("Fetching batch of codes", "count", len(reqs)) + return sendRequest(p.rw, GetCodeMsg, reqID, reqs) +} + +// requestReceipts fetches a batch of transaction receipts from a remote node. +func (p *serverPeer) requestReceipts(reqID uint64, hashes []common.Hash) error { + p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) + return sendRequest(p.rw, GetReceiptsMsg, reqID, hashes) +} + +// requestProofs fetches a batch of merkle proofs from a remote node. +func (p *serverPeer) requestProofs(reqID uint64, reqs []ProofReq) error { + p.Log().Debug("Fetching batch of proofs", "count", len(reqs)) + return sendRequest(p.rw, GetProofsV2Msg, reqID, reqs) +} + +// requestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. +func (p *serverPeer) requestHelperTrieProofs(reqID uint64, reqs []HelperTrieReq) error { + p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) + return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, reqs) +} + +// requestTxStatus fetches a batch of transaction status records from a remote node. +func (p *serverPeer) requestTxStatus(reqID uint64, txHashes []common.Hash) error { + p.Log().Debug("Requesting transaction status", "count", len(txHashes)) + return sendRequest(p.rw, GetTxStatusMsg, reqID, txHashes) +} + +// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool. +func (p *serverPeer) sendTxs(reqID uint64, txs rlp.RawValue) error { + p.Log().Debug("Sending batch of transactions", "size", len(txs)) + return sendRequest(p.rw, SendTxV2Msg, reqID, txs) +} + +// waitBefore implements distPeer interface +func (p *serverPeer) waitBefore(maxCost uint64) (time.Duration, float64) { + return p.fcServer.CanSend(maxCost) } -func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { +// getRequestCost returns an estimated request cost according to the flow control +// rules negotiated between the server and the client. +func (p *serverPeer) getRequestCost(msgcode uint64, amount int) uint64 { p.lock.RLock() defer p.lock.RUnlock() @@ -342,7 +506,9 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { return cost } -func (p *peer) GetTxRelayCost(amount, size int) uint64 { +// getTxRelayCost returns an estimated relay cost according to the flow control +// rules negotiated between the server and the client. +func (p *serverPeer) getTxRelayCost(amount, size int) uint64 { p.lock.RLock() defer p.lock.RUnlock() @@ -355,7 +521,6 @@ func (p *peer) GetTxRelayCost(amount, size int) uint64 { if sizeCost > cost { cost = sizeCost } - if cost > p.fcParams.BufLimit { cost = p.fcParams.BufLimit } @@ -363,12 +528,12 @@ func (p *peer) GetTxRelayCost(amount, size int) uint64 { } // HasBlock checks if the peer has a given block -func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { - var head, since, recent uint64 +func (p *serverPeer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { p.lock.RLock() - if p.headInfo != nil { - head = p.headInfo.Number - } + defer p.lock.RUnlock() + + head := p.headInfo.Number + var since, recent uint64 if hasState { since = p.stateSince recent = p.stateRecent @@ -377,220 +542,313 @@ func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { recent = p.chainRecent } hasBlock := p.hasBlock - p.lock.RUnlock() return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState) } -// SendAnnounce announces the availability of a number of blocks through -// a hash notification. -func (p *peer) SendAnnounce(request announceData) error { - return p2p.Send(p.rw, AnnounceMsg, request) +// updateFlowControl updates the flow control parameters belonging to the server +// node if the announced key/value set contains relevant fields +func (p *serverPeer) updateFlowControl(update keyValueMap) { + p.lock.Lock() + defer p.lock.Unlock() + + // If any of the flow control params is nil, refuse to update. + var params flowcontrol.ServerParams + if update.get("flowControl/BL", ¶ms.BufLimit) == nil && update.get("flowControl/MRR", ¶ms.MinRecharge) == nil { + // todo can light client set a minimal acceptable flow control params? + p.fcParams = params + p.fcServer.UpdateParams(params) + } + var MRC RequestCostList + if update.get("flowControl/MRC", &MRC) == nil { + costUpdate := MRC.decode(ProtocolLengths[uint(p.version)]) + for code, cost := range costUpdate { + p.fcCosts[code] = cost + } + } +} + +// Handshake executes the les protocol handshake, negotiating version number, +// network IDs, difficulties, head and genesis blocks. +func (p *serverPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error { + return p.handshake(td, head, headNum, genesis, func(lists *keyValueList) { + // Add some client-specific handshake fields + // + // Enable signed announcement randomly even the server is not trusted. + p.announceType = announceTypeSimple + if p.trusted { + p.announceType = announceTypeSigned + } + *lists = (*lists).add("announceType", p.announceType) + }, func(recv keyValueMap) error { + if recv.get("serveChainSince", &p.chainSince) != nil { + p.onlyAnnounce = true + } + if recv.get("serveRecentChain", &p.chainRecent) != nil { + p.chainRecent = 0 + } + if recv.get("serveStateSince", &p.stateSince) != nil { + p.onlyAnnounce = true + } + if recv.get("serveRecentState", &p.stateRecent) != nil { + p.stateRecent = 0 + } + if recv.get("txRelay", nil) != nil { + p.onlyAnnounce = true + } + if p.onlyAnnounce && !p.trusted { + return errResp(ErrUselessPeer, "peer cannot serve requests") + } + // Parse flow control handshake packet. + var sParams flowcontrol.ServerParams + if err := recv.get("flowControl/BL", &sParams.BufLimit); err != nil { + return err + } + if err := recv.get("flowControl/MRR", &sParams.MinRecharge); err != nil { + return err + } + var MRC RequestCostList + if err := recv.get("flowControl/MRC", &MRC); err != nil { + return err + } + p.fcParams = sParams + p.fcServer = flowcontrol.NewServerNode(sParams, &mclock.System{}) + p.fcCosts = MRC.decode(ProtocolLengths[uint(p.version)]) + + recv.get("checkpoint/value", &p.checkpoint) + recv.get("checkpoint/registerHeight", &p.checkpointNumber) + + if !p.onlyAnnounce { + for msgCode := range reqAvgTimeCost { + if p.fcCosts[msgCode] == nil { + return errResp(ErrUselessPeer, "peer does not support message %d", msgCode) + } + } + } + return nil + }) +} + +// clientPeer represents each node to which the les server is connected. +// The node here refers to the light client. +type clientPeer struct { + peerCommons + + // responseLock ensures that responses are queued in the same order as + // RequestProcessed is called + responseLock sync.Mutex + server bool + invalidCount uint32 // Counter the invalid request the client peer has made. + responseCount uint64 // Counter to generate an unique id for request processing. + errCh chan error + fcClient *flowcontrol.ClientNode // Server side mirror token bucket. + balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler +} + +func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer { + return &clientPeer{ + peerCommons: peerCommons{ + Peer: p, + rw: rw, + id: peerIdToString(p.ID()), + version: version, + network: network, + sendQueue: newExecQueue(100), + closeCh: make(chan struct{}), + }, + errCh: make(chan error, 1), + } +} + +// freeClientId returns a string identifier for the peer. Multiple peers with +// the same identifier can not be connected in free mode simultaneously. +func (p *clientPeer) freeClientId() string { + if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok { + if addr.IP.IsLoopback() { + // using peer id instead of loopback ip address allows multiple free + // connections from local machine to own server + return p.id + } else { + return addr.IP.String() + } + } + return p.id } -// SendStop notifies the client about being in frozen state -func (p *peer) SendStop() error { +// sendStop notifies the client about being in frozen state +func (p *clientPeer) sendStop() error { return p2p.Send(p.rw, StopMsg, struct{}{}) } -// SendResume notifies the client about getting out of frozen state -func (p *peer) SendResume(bv uint64) error { +// sendResume notifies the client about getting out of frozen state +func (p *clientPeer) sendResume(bv uint64) error { return p2p.Send(p.rw, ResumeMsg, bv) } -// ReplyBlockHeaders creates a reply with a batch of block headers -func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply { +// freeze temporarily puts the client in a frozen state which means all unprocessed +// and subsequent requests are dropped. Unfreezing happens automatically after a short +// time if the client's buffer value is at least in the slightly positive region. +// The client is also notified about being frozen/unfrozen with a Stop/Resume message. +func (p *clientPeer) freeze() { + if p.version < lpv3 { + // if Stop/Resume is not supported then just drop the peer after setting + // its frozen status permanently + atomic.StoreUint32(&p.frozen, 1) + p.Peer.Disconnect(p2p.DiscUselessPeer) + return + } + if atomic.SwapUint32(&p.frozen, 1) == 0 { + go func() { + p.sendStop() + time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) + for { + bufValue, bufLimit := p.fcClient.BufferStatus() + if bufLimit == 0 { + return + } + if bufValue <= bufLimit/8 { + time.Sleep(freezeCheckPeriod) + continue + } + atomic.StoreUint32(&p.frozen, 0) + p.sendResume(bufValue) + return + } + }() + } +} + +// reply struct represents a reply with the actual data already RLP encoded and +// only the bv (buffer value) missing. This allows the serving mechanism to +// calculate the bv value which depends on the data size before sending the reply. +type reply struct { + w p2p.MsgWriter + msgcode, reqID uint64 + data rlp.RawValue +} + +// send sends the reply with the calculated buffer value +func (r *reply) send(bv uint64) error { + type resp struct { + ReqID, BV uint64 + Data rlp.RawValue + } + return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data}) +} + +// size returns the RLP encoded size of the message data +func (r *reply) size() uint32 { + return uint32(len(r.data)) +} + +// replyBlockHeaders creates a reply with a batch of block headers +func (p *clientPeer) replyBlockHeaders(reqID uint64, headers []*types.Header) *reply { data, _ := rlp.EncodeToBytes(headers) return &reply{p.rw, BlockHeadersMsg, reqID, data} } -// ReplyBlockBodiesRLP creates a reply with a batch of block contents from +// replyBlockBodiesRLP creates a reply with a batch of block contents from // an already RLP encoded format. -func (p *peer) ReplyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply { +func (p *clientPeer) replyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply { data, _ := rlp.EncodeToBytes(bodies) return &reply{p.rw, BlockBodiesMsg, reqID, data} } -// ReplyCode creates a reply with a batch of arbitrary internal data, corresponding to the +// replyCode creates a reply with a batch of arbitrary internal data, corresponding to the // hashes requested. -func (p *peer) ReplyCode(reqID uint64, codes [][]byte) *reply { +func (p *clientPeer) replyCode(reqID uint64, codes [][]byte) *reply { data, _ := rlp.EncodeToBytes(codes) return &reply{p.rw, CodeMsg, reqID, data} } -// ReplyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the +// replyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the // ones requested from an already RLP encoded format. -func (p *peer) ReplyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply { +func (p *clientPeer) replyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply { data, _ := rlp.EncodeToBytes(receipts) return &reply{p.rw, ReceiptsMsg, reqID, data} } -// ReplyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested. -func (p *peer) ReplyProofsV2(reqID uint64, proofs light.NodeList) *reply { +// replyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested. +func (p *clientPeer) replyProofsV2(reqID uint64, proofs light.NodeList) *reply { data, _ := rlp.EncodeToBytes(proofs) return &reply{p.rw, ProofsV2Msg, reqID, data} } -// ReplyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested. -func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply { +// replyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested. +func (p *clientPeer) replyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply { data, _ := rlp.EncodeToBytes(resp) return &reply{p.rw, HelperTrieProofsMsg, reqID, data} } -// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested. -func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply { +// replyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested. +func (p *clientPeer) replyTxStatus(reqID uint64, stats []light.TxStatus) *reply { data, _ := rlp.EncodeToBytes(stats) return &reply{p.rw, TxStatusMsg, reqID, data} } -// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the -// specified header query, based on the hash of an origin block. -func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) -} - -// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the -// specified header query, based on the number of an origin block. -func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) -} - -// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes -// specified. -func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error { - p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes) -} - -// RequestCode fetches a batch of arbitrary data from a node's known state -// data, corresponding to the specified hashes. -func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error { - p.Log().Debug("Fetching batch of codes", "count", len(reqs)) - return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs) -} - -// RequestReceipts fetches a batch of transaction receipts from a remote node. -func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error { - p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes) -} - -// RequestProofs fetches a batch of merkle proofs from a remote node. -func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error { - p.Log().Debug("Fetching batch of proofs", "count", len(reqs)) - return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs) -} - -// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. -func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error { - p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) - return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs) -} - -// RequestTxStatus fetches a batch of transaction status records from a remote node. -func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error { - p.Log().Debug("Requesting transaction status", "count", len(txHashes)) - return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) -} - -// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool. -func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error { - p.Log().Debug("Sending batch of transactions", "size", len(txs)) - return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs) -} - -type keyValueEntry struct { - Key string - Value rlp.RawValue -} -type keyValueList []keyValueEntry -type keyValueMap map[string]rlp.RawValue - -func (l keyValueList) add(key string, val interface{}) keyValueList { - var entry keyValueEntry - entry.Key = key - if val == nil { - val = uint64(0) - } - enc, err := rlp.EncodeToBytes(val) - if err == nil { - entry.Value = enc - } - return append(l, entry) -} - -func (l keyValueList) decode() (keyValueMap, uint64) { - m := make(keyValueMap) - var size uint64 - for _, entry := range l { - m[entry.Key] = entry.Value - size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8 - } - return m, size -} - -func (m keyValueMap) get(key string, val interface{}) error { - enc, ok := m[key] - if !ok { - return errResp(ErrMissingKey, "%s", key) - } - if val == nil { - return nil - } - return rlp.DecodeBytes(enc, val) +// sendAnnounce announces the availability of a number of blocks through +// a hash notification. +func (p *clientPeer) sendAnnounce(request announceData) error { + return p2p.Send(p.rw, AnnounceMsg, request) } -func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) { - // Send out own handshake in a new thread - errc := make(chan error, 1) - go func() { - errc <- p2p.Send(p.rw, StatusMsg, sendList) - }() - // In the mean time retrieve the remote status message - msg, err := p.rw.ReadMsg() - if err != nil { - return nil, err - } - if msg.Code != StatusMsg { - return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) - } - if msg.Size > ProtocolMaxMsgSize { - return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) - } - // Decode the handshake - var recvList keyValueList - if err := msg.Decode(&recvList); err != nil { - return nil, errResp(ErrDecode, "msg %v: %v", msg, err) +// updateCapacity updates the request serving capacity assigned to a given client +// and also sends an announcement about the updated flow control parameters +func (p *clientPeer) updateCapacity(cap uint64) { + p.lock.Lock() + defer p.lock.Unlock() + + p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio} + p.fcClient.UpdateParams(p.fcParams) + var kvList keyValueList + kvList = kvList.add("flowControl/MRR", cap) + kvList = kvList.add("flowControl/BL", cap*bufLimitRatio) + p.mustQueueSend(func() { p.sendAnnounce(announceData{Update: kvList}) }) +} + +// freezeClient temporarily puts the client in a frozen state which means all +// unprocessed and subsequent requests are dropped. Unfreezing happens automatically +// after a short time if the client's buffer value is at least in the slightly positive +// region. The client is also notified about being frozen/unfrozen with a Stop/Resume +// message. +func (p *clientPeer) freezeClient() { + if p.version < lpv3 { + // if Stop/Resume is not supported then just drop the peer after setting + // its frozen status permanently + atomic.StoreUint32(&p.frozen, 1) + p.Peer.Disconnect(p2p.DiscUselessPeer) + return } - if err := <-errc; err != nil { - return nil, err + if atomic.SwapUint32(&p.frozen, 1) == 0 { + go func() { + p.sendStop() + time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) + for { + bufValue, bufLimit := p.fcClient.BufferStatus() + if bufLimit == 0 { + return + } + if bufValue <= bufLimit/8 { + time.Sleep(freezeCheckPeriod) + } else { + atomic.StoreUint32(&p.frozen, 0) + p.sendResume(bufValue) + break + } + } + }() } - return recvList, nil } // Handshake executes the les protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. -func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error { - p.lock.Lock() - defer p.lock.Unlock() - - var send keyValueList - - // Add some basic handshake fields - send = send.add("protocolVersion", uint64(p.version)) - send = send.add("networkId", p.network) - send = send.add("headTd", td) - send = send.add("headHash", head) - send = send.add("headNum", headNum) - send = send.add("genesisHash", genesis) - if server != nil { +func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error { + return p.handshake(td, head, headNum, genesis, func(lists *keyValueList) { // Add some information which services server can offer. if !server.config.UltraLightOnlyAnnounce { - send = send.add("serveHeaders", nil) - send = send.add("serveChainSince", uint64(0)) - send = send.add("serveStateSince", uint64(0)) + *lists = (*lists).add("serveHeaders", nil) + *lists = (*lists).add("serveChainSince", uint64(0)) + *lists = (*lists).add("serveStateSince", uint64(0)) // If local ethereum node is running in archive mode, advertise ourselves we have // all version state data. Otherwise only recent state is available. @@ -598,11 +856,11 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis if server.archiveMode { stateRecent = 0 } - send = send.add("serveRecentState", stateRecent) - send = send.add("txRelay", nil) + *lists = (*lists).add("serveRecentState", stateRecent) + *lists = (*lists).add("txRelay", nil) } - send = send.add("flowControl/BL", server.defParams.BufLimit) - send = send.add("flowControl/MRR", server.defParams.MinRecharge) + *lists = (*lists).add("flowControl/BL", server.defParams.BufLimit) + *lists = (*lists).add("flowControl/MRR", server.defParams.MinRecharge) var costList RequestCostList if server.costTracker.testCostList != nil { @@ -610,7 +868,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis } else { costList = server.costTracker.makeCostList(server.costTracker.globalFactor()) } - send = send.add("flowControl/MRC", costList) + *lists = (*lists).add("flowControl/MRC", costList) p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)]) p.fcParams = server.defParams @@ -619,62 +877,11 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis if server.oracle != nil && server.oracle.IsRunning() { cp, height := server.oracle.StableCheckpoint() if cp != nil { - send = send.add("checkpoint/value", cp) - send = send.add("checkpoint/registerHeight", height) + *lists = (*lists).add("checkpoint/value", cp) + *lists = (*lists).add("checkpoint/registerHeight", height) } } - } else { - // Add some client-specific handshake fields - p.announceType = announceTypeSimple - if p.trusted { - p.announceType = announceTypeSigned - } - send = send.add("announceType", p.announceType) - } - - recvList, err := p.sendReceiveHandshake(send) - if err != nil { - return err - } - recv, size := recvList.decode() - if p.rejectUpdate(size) { - return errResp(ErrRequestRejected, "") - } - - var rGenesis, rHash common.Hash - var rVersion, rNetwork, rNum uint64 - var rTd *big.Int - - if err := recv.get("protocolVersion", &rVersion); err != nil { - return err - } - if err := recv.get("networkId", &rNetwork); err != nil { - return err - } - if err := recv.get("headTd", &rTd); err != nil { - return err - } - if err := recv.get("headHash", &rHash); err != nil { - return err - } - if err := recv.get("headNum", &rNum); err != nil { - return err - } - if err := recv.get("genesisHash", &rGenesis); err != nil { - return err - } - - if rGenesis != genesis { - return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8]) - } - if rNetwork != p.network { - return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network) - } - if int(rVersion) != p.version { - return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version) - } - - if server != nil { + }, func(recv keyValueMap) error { p.server = recv.get("flowControl/MRR", nil) == nil if p.server { p.announceType = announceTypeNone // connected to another server, send no messages @@ -685,237 +892,298 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis } p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) } - } else { - if recv.get("serveChainSince", &p.chainSince) != nil { - p.onlyAnnounce = true - } - if recv.get("serveRecentChain", &p.chainRecent) != nil { - p.chainRecent = 0 - } - if recv.get("serveStateSince", &p.stateSince) != nil { - p.onlyAnnounce = true - } - if recv.get("serveRecentState", &p.stateRecent) != nil { - p.stateRecent = 0 - } - if recv.get("txRelay", nil) != nil { - p.onlyAnnounce = true - } + return nil + }) +} - if p.onlyAnnounce && !p.trusted { - return errResp(ErrUselessPeer, "peer cannot serve requests") - } +// serverPeerSubscriber is an interface to notify services about added or +// removed server peers +type serverPeerSubscriber interface { + registerPeer(*serverPeer) + unregisterPeer(*serverPeer) +} - var sParams flowcontrol.ServerParams - if err := recv.get("flowControl/BL", &sParams.BufLimit); err != nil { - return err - } - if err := recv.get("flowControl/MRR", &sParams.MinRecharge); err != nil { - return err - } - var MRC RequestCostList - if err := recv.get("flowControl/MRC", &MRC); err != nil { - return err - } - p.fcParams = sParams - p.fcServer = flowcontrol.NewServerNode(sParams, &mclock.System{}) - p.fcCosts = MRC.decode(ProtocolLengths[uint(p.version)]) +// clientPeerSubscriber is an interface to notify services about added or +// removed client peers +type clientPeerSubscriber interface { + registerPeer(*clientPeer) + unregisterPeer(*clientPeer) +} - recv.get("checkpoint/value", &p.checkpoint) - recv.get("checkpoint/registerHeight", &p.checkpointNumber) +// clientPeerSet represents the set of active client peers currently +// participating in the Light Ethereum sub-protocol. +type clientPeerSet struct { + peers map[string]*clientPeer + // subscribers is a batch of subscribers and peerset will notify + // these subscribers when the peerset changes(new client peer is + // added or removed) + subscribers []clientPeerSubscriber + closed bool + lock sync.RWMutex +} - if !p.onlyAnnounce { - for msgCode := range reqAvgTimeCost { - if p.fcCosts[msgCode] == nil { - return errResp(ErrUselessPeer, "peer does not support message %d", msgCode) - } - } +// newClientPeerSet creates a new peer set to track the client peers. +func newClientPeerSet() *clientPeerSet { + return &clientPeerSet{peers: make(map[string]*clientPeer)} +} + +// subscribe adds a service to be notified about added or removed +// peers and also register all active peers into the given service. +func (ps *clientPeerSet) subscribe(sub clientPeerSubscriber) { + ps.lock.Lock() + defer ps.lock.Unlock() + + ps.subscribers = append(ps.subscribers, sub) + for _, p := range ps.peers { + sub.registerPeer(p) + } +} + +// unSubscribe removes the specified service from the subscriber pool. +func (ps *clientPeerSet) unSubscribe(sub clientPeerSubscriber) { + ps.lock.Lock() + defer ps.lock.Unlock() + + for i, s := range ps.subscribers { + if s == sub { + ps.subscribers = append(ps.subscribers[:i], ps.subscribers[i+1:]...) + return } - p.server = true } - p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} +} + +// register adds a new peer into the peer set, or returns an error if the +// peer is already known. +func (ps *clientPeerSet) register(peer *clientPeer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if ps.closed { + return errClosed + } + if _, exist := ps.peers[peer.id]; exist { + return errAlreadyRegistered + } + ps.peers[peer.id] = peer + for _, sub := range ps.subscribers { + sub.registerPeer(peer) + } return nil } -// updateFlowControl updates the flow control parameters belonging to the server -// node if the announced key/value set contains relevant fields -func (p *peer) updateFlowControl(update keyValueMap) { - if p.fcServer == nil { - return +// unregister removes a remote peer from the peer set, disabling any further +// actions to/from that particular entity. It also initiates disconnection +// at the networking layer. +func (ps *clientPeerSet) unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + p, ok := ps.peers[id] + if !ok { + return errNotRegistered } - // If any of the flow control params is nil, refuse to update. - var params flowcontrol.ServerParams - if update.get("flowControl/BL", ¶ms.BufLimit) == nil && update.get("flowControl/MRR", ¶ms.MinRecharge) == nil { - // todo can light client set a minimal acceptable flow control params? - p.fcParams = params - p.fcServer.UpdateParams(params) + delete(ps.peers, id) + for _, sub := range ps.subscribers { + sub.unregisterPeer(p) } - var MRC RequestCostList - if update.get("flowControl/MRC", &MRC) == nil { - costUpdate := MRC.decode(ProtocolLengths[uint(p.version)]) - for code, cost := range costUpdate { - p.fcCosts[code] = cost - } + p.Peer.Disconnect(p2p.DiscRequested) + return nil +} + +// ids returns a list of all registered peer IDs +func (ps *clientPeerSet) ids() []string { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var ids []string + for id := range ps.peers { + ids = append(ids, id) } + return ids } -// String implements fmt.Stringer. -func (p *peer) String() string { - return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("les/%d", p.version), - ) +// peer retrieves the registered peer with the given id. +func (ps *clientPeerSet) peer(id string) *clientPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] } -// peerSetNotify is a callback interface to notify services about added or -// removed peers -type peerSetNotify interface { - registerPeer(*peer) - unregisterPeer(*peer) +// len returns if the current number of peers in the set. +func (ps *clientPeerSet) len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) } -// peerSet represents the collection of active peers currently participating in -// the Light Ethereum sub-protocol. -type peerSet struct { - peers map[string]*peer - lock sync.RWMutex - notifyList []peerSetNotify - closed bool +// allClientPeers returns all client peers in a list. +func (ps *clientPeerSet) allPeers() []*clientPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*clientPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list } -// newPeerSet creates a new peer set to track the active participants. -func newPeerSet() *peerSet { - return &peerSet{ - peers: make(map[string]*peer), +// close disconnects all peers. No new peers can be registered +// after close has returned. +func (ps *clientPeerSet) close() { + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, p := range ps.peers { + p.Disconnect(p2p.DiscQuitting) } + ps.closed = true +} + +// serverPeerSet represents the set of active server peers currently +// participating in the Light Ethereum sub-protocol. +type serverPeerSet struct { + peers map[string]*serverPeer + // subscribers is a batch of subscribers and peerset will notify + // these subscribers when the peerset changes(new server peer is + // added or removed) + subscribers []serverPeerSubscriber + closed bool + lock sync.RWMutex +} + +// newServerPeerSet creates a new peer set to track the active server peers. +func newServerPeerSet() *serverPeerSet { + return &serverPeerSet{peers: make(map[string]*serverPeer)} } -// notify adds a service to be notified about added or removed peers -func (ps *peerSet) notify(n peerSetNotify) { +// subscribe adds a service to be notified about added or removed +// peers and also register all active peers into the given service. +func (ps *serverPeerSet) subscribe(sub serverPeerSubscriber) { ps.lock.Lock() - ps.notifyList = append(ps.notifyList, n) - peers := make([]*peer, 0, len(ps.peers)) + defer ps.lock.Unlock() + + ps.subscribers = append(ps.subscribers, sub) for _, p := range ps.peers { - peers = append(peers, p) + sub.registerPeer(p) } - ps.lock.Unlock() +} - for _, p := range peers { - n.registerPeer(p) +// unSubscribe removes the specified service from the subscriber pool. +func (ps *serverPeerSet) unSubscribe(sub serverPeerSubscriber) { + ps.lock.Lock() + defer ps.lock.Unlock() + + for i, s := range ps.subscribers { + if s == sub { + ps.subscribers = append(ps.subscribers[:i], ps.subscribers[i+1:]...) + return + } } } -// Register injects a new peer into the working set, or returns an error if the +// register adds a new server peer into the set, or returns an error if the // peer is already known. -func (ps *peerSet) Register(p *peer) error { +func (ps *serverPeerSet) register(peer *serverPeer) error { ps.lock.Lock() + defer ps.lock.Unlock() + if ps.closed { - ps.lock.Unlock() return errClosed } - if _, ok := ps.peers[p.id]; ok { - ps.lock.Unlock() + if _, exist := ps.peers[peer.id]; exist { return errAlreadyRegistered } - ps.peers[p.id] = p - p.sendQueue = newExecQueue(100) - peers := make([]peerSetNotify, len(ps.notifyList)) - copy(peers, ps.notifyList) - ps.lock.Unlock() - - for _, n := range peers { - n.registerPeer(p) + ps.peers[peer.id] = peer + for _, sub := range ps.subscribers { + sub.registerPeer(peer) } return nil } -// Unregister removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. It also initiates disconnection at the networking layer. -func (ps *peerSet) Unregister(id string) error { +// unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. It also initiates disconnection at +// the networking layer. +func (ps *serverPeerSet) unregister(id string) error { ps.lock.Lock() - if p, ok := ps.peers[id]; !ok { - ps.lock.Unlock() - return errNotRegistered - } else { - delete(ps.peers, id) - peers := make([]peerSetNotify, len(ps.notifyList)) - copy(peers, ps.notifyList) - ps.lock.Unlock() - - for _, n := range peers { - n.unregisterPeer(p) - } - - p.sendQueue.quit() - p.Peer.Disconnect(p2p.DiscUselessPeer) + defer ps.lock.Unlock() - return nil + p, ok := ps.peers[id] + if !ok { + return errNotRegistered + } + delete(ps.peers, id) + for _, sub := range ps.subscribers { + sub.unregisterPeer(p) } + p.Peer.Disconnect(p2p.DiscRequested) + return nil } -// AllPeerIDs returns a list of all registered peer IDs -func (ps *peerSet) AllPeerIDs() []string { +// ids returns a list of all registered peer IDs +func (ps *serverPeerSet) ids() []string { ps.lock.RLock() defer ps.lock.RUnlock() - res := make([]string, len(ps.peers)) - idx := 0 + var ids []string for id := range ps.peers { - res[idx] = id - idx++ + ids = append(ids, id) } - return res + return ids } -// Peer retrieves the registered peer with the given id. -func (ps *peerSet) Peer(id string) *peer { +// peer retrieves the registered peer with the given id. +func (ps *serverPeerSet) peer(id string) *serverPeer { ps.lock.RLock() defer ps.lock.RUnlock() return ps.peers[id] } -// Len returns if the current number of peers in the set. -func (ps *peerSet) Len() int { +// len returns if the current number of peers in the set. +func (ps *serverPeerSet) len() int { ps.lock.RLock() defer ps.lock.RUnlock() return len(ps.peers) } -// BestPeer retrieves the known peer with the currently highest total difficulty. -func (ps *peerSet) BestPeer() *peer { +// bestPeer retrieves the known peer with the currently highest total difficulty. +// If the peerset is "client peer set", then nothing meaningful will return. The +// reason is client peer never send back their latest status to server. +func (ps *serverPeerSet) bestPeer() *serverPeer { ps.lock.RLock() defer ps.lock.RUnlock() var ( - bestPeer *peer + bestPeer *serverPeer bestTd *big.Int ) for _, p := range ps.peers { - if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 { + if td := p.Td(); bestTd == nil || td.Cmp(bestTd) > 0 { bestPeer, bestTd = p, td } } return bestPeer } -// AllPeers returns all peers in a list -func (ps *peerSet) AllPeers() []*peer { +// allServerPeers returns all server peers in a list. +func (ps *serverPeerSet) allPeers() []*serverPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, len(ps.peers)) - i := 0 - for _, peer := range ps.peers { - list[i] = peer - i++ + list := make([]*serverPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) } return list } -// Close disconnects all peers. -// No new peers can be registered after Close has returned. -func (ps *peerSet) Close() { +// close disconnects all peers. No new peers can be registered +// after close has returned. +func (ps *serverPeerSet) close() { ps.lock.Lock() defer ps.lock.Unlock() diff --git a/les/peer_test.go b/les/peer_test.go index db74a052c1..59a2ad7009 100644 --- a/les/peer_test.go +++ b/les/peer_test.go @@ -17,286 +17,131 @@ package les import ( + "crypto/rand" "math/big" - "net" + "reflect" + "sort" "testing" + "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rlp" ) -const protocolVersion = lpv2 - -var ( - hash = common.HexToHash("deadbeef") - genesis = common.HexToHash("cafebabe") - headNum = uint64(1234) - td = big.NewInt(123) -) - -func newNodeID(t *testing.T) *enode.Node { - key, err := crypto.GenerateKey() - if err != nil { - t.Fatal("generate key err:", err) - } - return enode.NewV4(&key.PublicKey, net.IP{}, 35000, 35000) -} - -// ulc connects to trusted peer and send announceType=announceTypeSigned -func TestPeerHandshakeSetAnnounceTypeToAnnounceTypeSignedForTrustedPeer(t *testing.T) { - id := newNodeID(t).ID() - - // peer to connect(on ulc side) - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - trusted: true, - rw: &rwStub{ - WriteHook: func(recvList keyValueList) { - recv, _ := recvList.decode() - var reqType uint64 - err := recv.get("announceType", &reqType) - if err != nil { - t.Fatal(err) - } - if reqType != announceTypeSigned { - t.Fatal("Expected announceTypeSigned") - } - }, - ReadHook: func(l keyValueList) keyValueList { - l = l.add("serveHeaders", nil) - l = l.add("serveChainSince", uint64(0)) - l = l.add("serveStateSince", uint64(0)) - l = l.add("txRelay", nil) - l = l.add("flowControl/BL", uint64(0)) - l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", testCostList(0)) - return l - }, - }, - network: NetworkId, - } - err := p.Handshake(td, hash, headNum, genesis, nil) - if err != nil { - t.Fatalf("Handshake error: %s", err) - } - if p.announceType != announceTypeSigned { - t.Fatal("Incorrect announceType") - } -} - -func TestPeerHandshakeAnnounceTypeSignedForTrustedPeersPeerNotInTrusted(t *testing.T) { - id := newNodeID(t).ID() - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - rw: &rwStub{ - WriteHook: func(recvList keyValueList) { - // checking that ulc sends to peer allowedRequests=noRequests and announceType != announceTypeSigned - recv, _ := recvList.decode() - var reqType uint64 - err := recv.get("announceType", &reqType) - if err != nil { - t.Fatal(err) - } - if reqType == announceTypeSigned { - t.Fatal("Expected not announceTypeSigned") - } - }, - ReadHook: func(l keyValueList) keyValueList { - l = l.add("serveHeaders", nil) - l = l.add("serveChainSince", uint64(0)) - l = l.add("serveStateSince", uint64(0)) - l = l.add("txRelay", nil) - l = l.add("flowControl/BL", uint64(0)) - l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", testCostList(0)) - return l - }, - }, - network: NetworkId, - } - err := p.Handshake(td, hash, headNum, genesis, nil) - if err != nil { - t.Fatal(err) - } - if p.announceType == announceTypeSigned { - t.Fatal("Incorrect announceType") - } -} - -func TestPeerHandshakeDefaultAllRequests(t *testing.T) { - id := newNodeID(t).ID() - - s := generateLesServer() - - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - rw: &rwStub{ - ReadHook: func(l keyValueList) keyValueList { - l = l.add("announceType", uint64(announceTypeSigned)) - l = l.add("allowedRequests", uint64(0)) - return l - }, - }, - network: NetworkId, - } - - err := p.Handshake(td, hash, headNum, genesis, s) - if err != nil { - t.Fatal(err) - } - - if p.onlyAnnounce { - t.Fatal("Incorrect announceType") - } -} - -func TestPeerHandshakeServerSendOnlyAnnounceRequestsHeaders(t *testing.T) { - id := newNodeID(t).ID() - - s := generateLesServer() - s.config.UltraLightOnlyAnnounce = true - - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - rw: &rwStub{ - ReadHook: func(l keyValueList) keyValueList { - l = l.add("announceType", uint64(announceTypeSigned)) - return l - }, - WriteHook: func(l keyValueList) { - for _, v := range l { - if v.Key == "serveHeaders" || - v.Key == "serveChainSince" || - v.Key == "serveStateSince" || - v.Key == "txRelay" { - t.Fatalf("%v exists", v.Key) - } - } - }, - }, - network: NetworkId, - } - - err := p.Handshake(td, hash, headNum, genesis, s) - if err != nil { - t.Fatal(err) - } -} -func TestPeerHandshakeClientReceiveOnlyAnnounceRequestsHeaders(t *testing.T) { - id := newNodeID(t).ID() - - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - rw: &rwStub{ - ReadHook: func(l keyValueList) keyValueList { - l = l.add("flowControl/BL", uint64(0)) - l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", RequestCostList{}) - - l = l.add("announceType", uint64(announceTypeSigned)) - - return l - }, - }, - network: NetworkId, - trusted: true, - } - - err := p.Handshake(td, hash, headNum, genesis, nil) - if err != nil { - t.Fatal(err) - } - - if !p.onlyAnnounce { - t.Fatal("onlyAnnounce must be true") - } -} - -func TestPeerHandshakeClientReturnErrorOnUselessPeer(t *testing.T) { - id := newNodeID(t).ID() - - p := peer{ - Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), - version: protocolVersion, - rw: &rwStub{ - ReadHook: func(l keyValueList) keyValueList { - l = l.add("flowControl/BL", uint64(0)) - l = l.add("flowControl/MRR", uint64(0)) - l = l.add("flowControl/MRC", RequestCostList{}) - l = l.add("announceType", uint64(announceTypeSigned)) - return l - }, - }, - network: NetworkId, - } - - err := p.Handshake(td, hash, headNum, genesis, nil) - if err == nil { - t.FailNow() - } +type testServerPeerSub struct { + regCh chan *serverPeer + unregCh chan *serverPeer } -func generateLesServer() *LesServer { - s := &LesServer{ - lesCommons: lesCommons{ - config: ð.Config{UltraLightOnlyAnnounce: true}, - }, - defParams: flowcontrol.ServerParams{ - BufLimit: uint64(300000000), - MinRecharge: uint64(50000), - }, - fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}), +func newTestServerPeerSub() *testServerPeerSub { + return &testServerPeerSub{ + regCh: make(chan *serverPeer, 1), + unregCh: make(chan *serverPeer, 1), } - s.costTracker, _ = newCostTracker(rawdb.NewMemoryDatabase(), s.config) - return s } -type rwStub struct { - ReadHook func(l keyValueList) keyValueList - WriteHook func(l keyValueList) +func (t *testServerPeerSub) registerPeer(p *serverPeer) { t.regCh <- p } +func (t *testServerPeerSub) unregisterPeer(p *serverPeer) { t.unregCh <- p } + +func TestPeerSubscription(t *testing.T) { + peers := newServerPeerSet() + defer peers.close() + + checkIds := func(expect []string) { + given := peers.ids() + if len(given) == 0 && len(expect) == 0 { + return + } + sort.Strings(given) + sort.Strings(expect) + if !reflect.DeepEqual(given, expect) { + t.Fatalf("all peer ids mismatch, want %v, given %v", expect, given) + } + } + checkPeers := func(peerCh chan *serverPeer) { + select { + case <-peerCh: + case <-time.NewTimer(100 * time.Millisecond).C: + t.Fatalf("timeout, no event received") + } + select { + case <-peerCh: + t.Fatalf("unexpected event received") + case <-time.NewTimer(10 * time.Millisecond).C: + } + } + checkIds([]string{}) + + sub := newTestServerPeerSub() + peers.subscribe(sub) + + // Generate a random id and create the peer + var id enode.ID + rand.Read(id[:]) + peer := newServerPeer(2, NetworkId, false, p2p.NewPeer(id, "name", nil), nil) + peers.register(peer) + + checkIds([]string{peer.id}) + checkPeers(sub.regCh) + + peers.unregister(peer.id) + checkIds([]string{}) + checkPeers(sub.unregCh) } -func (s *rwStub) ReadMsg() (p2p.Msg, error) { - payload := keyValueList{} - payload = payload.add("protocolVersion", uint64(protocolVersion)) - payload = payload.add("networkId", uint64(NetworkId)) - payload = payload.add("headTd", td) - payload = payload.add("headHash", hash) - payload = payload.add("headNum", headNum) - payload = payload.add("genesisHash", genesis) - - if s.ReadHook != nil { - payload = s.ReadHook(payload) - } - size, p, err := rlp.EncodeToReader(payload) - if err != nil { - return p2p.Msg{}, err - } - return p2p.Msg{ - Size: uint32(size), - Payload: p, - }, nil -} - -func (s *rwStub) WriteMsg(m p2p.Msg) error { - recvList := keyValueList{} - if err := m.Decode(&recvList); err != nil { - return err - } - if s.WriteHook != nil { - s.WriteHook(recvList) +func TestHandshake(t *testing.T) { + // Create a message pipe to communicate through + app, net := p2p.MsgPipe() + + // Generate a random id and create the peer + var id enode.ID + rand.Read(id[:]) + + peer1 := newClientPeer(2, NetworkId, p2p.NewPeer(id, "name", nil), net) + peer2 := newServerPeer(2, NetworkId, true, p2p.NewPeer(id, "name", nil), app) + + var ( + errCh1 = make(chan error, 1) + errCh2 = make(chan error, 1) + + td = big.NewInt(100) + head = common.HexToHash("deadbeef") + headNum = uint64(10) + genesis = common.HexToHash("cafebabe") + ) + go func() { + errCh1 <- peer1.handshake(td, head, headNum, genesis, func(list *keyValueList) { + var announceType uint64 = announceTypeSigned + *list = (*list).add("announceType", announceType) + }, nil) + }() + go func() { + errCh2 <- peer2.handshake(td, head, headNum, genesis, nil, func(recv keyValueMap) error { + var reqType uint64 + err := recv.get("announceType", &reqType) + if err != nil { + t.Fatal(err) + } + if reqType != announceTypeSigned { + t.Fatal("Expected announceTypeSigned") + } + return nil + }) + }() + + for i := 0; i < 2; i++ { + select { + case err := <-errCh1: + if err != nil { + t.Fatalf("handshake failed, %v", err) + } + case err := <-errCh2: + if err != nil { + t.Fatalf("handshake failed, %v", err) + } + case <-time.NewTimer(100 * time.Millisecond).C: + t.Fatalf("timeout") + } } - return nil } diff --git a/les/request_test.go b/les/request_test.go index 8d09703c57..f58ebca9c1 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -81,8 +81,7 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) defer tearDown() - - client.handler.synchronise(client.peer.peer) + client.handler.synchronise(client.peer.speer) // Ensure the client has synced all necessary data. clientHead := client.handler.backend.blockchain.CurrentHeader() diff --git a/les/retrieve.go b/les/retrieve.go index c806117902..5fa68b7456 100644 --- a/les/retrieve.go +++ b/les/retrieve.go @@ -38,7 +38,7 @@ var ( // matching replies by request ID and handles timeouts and resends if necessary. type retrieveManager struct { dist *requestDistributor - peers *peerSet + peers *serverPeerSet serverPool peerSelector lock sync.RWMutex @@ -99,7 +99,7 @@ const ( ) // newRetrieveManager creates the retrieve manager -func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager { +func newRetrieveManager(peers *serverPeerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager { return &retrieveManager{ peers: peers, dist: dist, @@ -337,7 +337,7 @@ func (r *sentReq) tryRequest() { defer func() { // send feedback to server pool and remove peer if hard timeout happened - pp, ok := p.(*peer) + pp, ok := p.(*serverPeer) if ok && r.rm.serverPool != nil { respTime := time.Duration(mclock.Now() - reqSent) r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto) @@ -345,7 +345,7 @@ func (r *sentReq) tryRequest() { if hrto { pp.Log().Debug("Request timed out hard") if r.rm.peers != nil { - r.rm.peers.Unregister(pp.id) + r.rm.peers.unregister(pp.id) } } diff --git a/les/server.go b/les/server.go index 664eba9717..f72f31321a 100644 --- a/les/server.go +++ b/les/server.go @@ -40,6 +40,7 @@ type LesServer struct { lesCommons archiveMode bool // Flag whether the ethereum node runs in archive mode. + peers *clientPeerSet handler *serverHandler lesTopics []discv5.Topic privateKey *ecdsa.PrivateKey @@ -75,13 +76,13 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { chainConfig: e.BlockChain().Config(), iConfig: light.DefaultServerIndexerConfig, chainDb: e.ChainDb(), - peers: newPeerSet(), chainReader: e.BlockChain(), chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations), bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), closeCh: make(chan struct{}), }, archiveMode: e.ArchiveMode(), + peers: newClientPeerSet(), lesTopics: lesTopics, fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}), servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100), @@ -115,7 +116,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { srv.maxCapacity = totalRecharge } srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2) - srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) }) + srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.unregister(peerIdToString(id)) }) srv.clientPool.setDefaultFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) checkpoint := srv.latestLocalCheckpoint() @@ -152,7 +153,7 @@ func (s *LesServer) APIs() []rpc.API { func (s *LesServer) Protocols() []p2p.Protocol { ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} { - if p := s.peers.Peer(peerIdToString(id)); p != nil { + if p := s.peers.peer(peerIdToString(id)); p != nil { return p.Info() } return nil @@ -194,7 +195,7 @@ func (s *LesServer) Stop() { // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet // will exit when they try to register. - s.peers.Close() + s.peers.close() s.fcManager.Stop() s.costTracker.stop() diff --git a/les/server_handler.go b/les/server_handler.go index 98e9294b07..186bdcbb03 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -101,13 +101,14 @@ func (h *serverHandler) stop() { // runPeer is the p2p protocol run function for the given version. func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := newPeer(int(version), h.server.config.NetworkId, false, p, newMeteredMsgWriter(rw, int(version))) + peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version))) + defer peer.close() h.wg.Add(1) defer h.wg.Done() return h.handle(peer) } -func (h *serverHandler) handle(p *peer) error { +func (h *serverHandler) handle(p *clientPeer) error { p.Log().Debug("Light Ethereum peer connected", "name", p.Name()) // Execute the LES handshake @@ -139,21 +140,21 @@ func (h *serverHandler) handle(p *peer) error { return errFullClientPool } // Register the peer locally - if err := h.server.peers.Register(p); err != nil { + if err := h.server.peers.register(p); err != nil { h.server.clientPool.disconnect(p) p.Log().Error("Light Ethereum peer registration failed", "err", err) return err } - clientConnectionGauge.Update(int64(h.server.peers.Len())) + clientConnectionGauge.Update(int64(h.server.peers.len())) var wg sync.WaitGroup // Wait group used to track all in-flight task routines. connectedAt := mclock.Now() defer func() { wg.Wait() // Ensure all background task routines have exited. - h.server.peers.Unregister(p.id) + h.server.peers.unregister(p.id) h.server.clientPool.disconnect(p) - clientConnectionGauge.Update(int64(h.server.peers.Len())) + clientConnectionGauge.Update(int64(h.server.peers.len())) connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) }() @@ -174,7 +175,7 @@ func (h *serverHandler) handle(p *peer) error { // handleMsg is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. -func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { +func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { @@ -208,7 +209,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt) accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost) if !accepted { - p.freezeClient() + p.freeze() p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge))) p.fcClient.OneTimeCost(inSizeCost) return false @@ -258,7 +259,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { h.server.clientPool.requestCost(p, realCost) } if reply != nil { - p.queueSend(func() { + p.mustQueueSend(func() { if err := reply.send(bv); err != nil { select { case p.errCh <- err: @@ -372,8 +373,8 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { } first = false } - reply := p.ReplyBlockHeaders(req.ReqID, headers) - sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done()) + reply := p.replyBlockHeaders(req.ReqID, headers) + sendResponse(req.ReqID, query.Amount, p.replyBlockHeaders(req.ReqID, headers), task.done()) if metrics.EnabledExpensive { miscOutHeaderPacketsMeter.Mark(1) miscOutHeaderTrafficMeter.Mark(int64(reply.size())) @@ -421,7 +422,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { bodies = append(bodies, body) bytes += len(body) } - reply := p.ReplyBlockBodiesRLP(req.ReqID, bodies) + reply := p.replyBlockBodiesRLP(req.ReqID, bodies) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutBodyPacketsMeter.Mark(1) @@ -493,7 +494,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { break } } - reply := p.ReplyCode(req.ReqID, data) + reply := p.replyCode(req.ReqID, data) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutCodePacketsMeter.Mark(1) @@ -550,7 +551,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { bytes += len(encoded) } } - reply := p.ReplyReceiptsRLP(req.ReqID, receipts) + reply := p.replyReceiptsRLP(req.ReqID, receipts) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutReceiptPacketsMeter.Mark(1) @@ -653,7 +654,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { break } } - reply := p.ReplyProofsV2(req.ReqID, nodes.NodeList()) + reply := p.replyProofsV2(req.ReqID, nodes.NodeList()) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutTrieProofPacketsMeter.Mark(1) @@ -728,7 +729,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { break } } - reply := p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}) + reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutHelperTriePacketsMeter.Mark(1) @@ -777,7 +778,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { stats[i] = h.txStatus(hash) } } - reply := p.ReplyTxStatus(req.ReqID, stats) + reply := p.replyTxStatus(req.ReqID, stats) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutTxsPacketsMeter.Mark(1) @@ -814,7 +815,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error { } stats[i] = h.txStatus(hash) } - reply := p.ReplyTxStatus(req.ReqID, stats) + reply := p.replyTxStatus(req.ReqID, stats) sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) if metrics.EnabledExpensive { miscOutTxStatusPacketsMeter.Mark(1) @@ -913,7 +914,7 @@ func (h *serverHandler) broadcastHeaders() { for { select { case ev := <-headCh: - peers := h.server.peers.AllPeers() + peers := h.server.peers.allPeers() if len(peers) == 0 { continue } @@ -939,14 +940,18 @@ func (h *serverHandler) broadcastHeaders() { p := p switch p.announceType { case announceTypeSimple: - p.queueSend(func() { p.SendAnnounce(announce) }) + if !p.queueSend(func() { p.sendAnnounce(announce) }) { + log.Debug("Drop announcement because queue is full", "number", number, "hash", hash) + } case announceTypeSigned: if !signed { signedAnnounce = announce signedAnnounce.sign(h.server.privateKey) signed = true } - p.queueSend(func() { p.SendAnnounce(signedAnnounce) }) + if !p.queueSend(func() { p.sendAnnounce(signedAnnounce) }) { + log.Debug("Drop announcement because queue is full", "number", number, "hash", hash) + } } } case <-h.closeCh: diff --git a/les/serverpool.go b/les/serverpool.go index f8fd721696..ec99a2d982 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -90,7 +90,7 @@ const ( // connReq represents a request for peer connection. type connReq struct { - p *peer + p *serverPeer node *enode.Node result chan *poolEntry } @@ -220,7 +220,7 @@ func (pool *serverPool) discoverNodes() { // Otherwise, the connection should be rejected. // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. -func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry { +func (pool *serverPool) connect(p *serverPeer, node *enode.Node) *poolEntry { log.Debug("Connect new entry", "enode", p.id) req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)} select { @@ -679,7 +679,7 @@ const ( // poolEntry represents a server node and stores its current state and statistics. type poolEntry struct { - peer *peer + peer *serverPeer pubkey [64]byte // secp256k1 key of the node addr map[string]*poolEntryAddress node *enode.Node diff --git a/les/servingqueue.go b/les/servingqueue.go index 8842cf9e9d..9db84e6159 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -55,7 +55,7 @@ type servingQueue struct { type servingTask struct { sq *servingQueue servingTime, timeAdded, maxTime, expTime uint64 - peer *peer + peer *clientPeer priority int64 biasAdded bool token runToken @@ -142,7 +142,7 @@ func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue { } // newTask creates a new task with the given priority -func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask { +func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask { return &servingTask{ sq: sq, peer: peer, @@ -187,7 +187,7 @@ func (sq *servingQueue) threadController() { type ( // peerTasks lists the tasks received from a given peer when selecting peers to freeze peerTasks struct { - peer *peer + peer *clientPeer list []*servingTask sumTime uint64 priority float64 @@ -211,7 +211,7 @@ func (l peerList) Swap(i, j int) { // freezePeers selects the peers with the worst priority queued tasks and freezes // them until burstTime goes under burstDropLimit or all peers are frozen func (sq *servingQueue) freezePeers() { - peerMap := make(map[*peer]*peerTasks) + peerMap := make(map[*clientPeer]*peerTasks) var peerList peerList if sq.best != nil { sq.queue.Push(sq.best, sq.best.priority) @@ -239,7 +239,7 @@ func (sq *servingQueue) freezePeers() { drop := true for _, tasks := range peerList { if drop { - tasks.peer.freezeClient() + tasks.peer.freeze() tasks.peer.fcClient.Freeze() sq.queuedTime -= tasks.sumTime sqQueuedGauge.Update(int64(sq.queuedTime)) diff --git a/les/sync.go b/les/sync.go index 207686403f..d2568d45bc 100644 --- a/les/sync.go +++ b/les/sync.go @@ -51,7 +51,7 @@ const ( // In addition to the checkpoint registered in the registrar contract, there are // several legacy hardcoded checkpoints in our codebase. These checkpoints are // also considered as valid. -func (h *clientHandler) validateCheckpoint(peer *peer) error { +func (h *clientHandler) validateCheckpoint(peer *serverPeer) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -87,7 +87,7 @@ func (h *clientHandler) validateCheckpoint(peer *peer) error { } // synchronise tries to sync up our local chain with a remote peer. -func (h *clientHandler) synchronise(peer *peer) { +func (h *clientHandler) synchronise(peer *serverPeer) { // Short circuit if the peer is nil. if peer == nil { return @@ -95,7 +95,7 @@ func (h *clientHandler) synchronise(peer *peer) { // Make sure the peer's TD is higher than our own. latest := h.backend.blockchain.CurrentHeader() currentTd := rawdb.ReadTd(h.backend.chainDb, latest.Hash(), latest.Number.Uint64()) - if currentTd != nil && peer.headBlockInfo().Td.Cmp(currentTd) < 0 { + if currentTd != nil && peer.Td().Cmp(currentTd) < 0 { return } // Recap the checkpoint. diff --git a/les/sync_test.go b/les/sync_test.go index 1c157b4fbf..77b82deb7a 100644 --- a/les/sync_test.go +++ b/les/sync_test.go @@ -109,7 +109,9 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } // Create connected peer pair. - _, err1, _, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler) + peer1, err1, peer2, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler) + defer peer1.close() + defer peer2.close() select { case <-time.After(time.Millisecond * 100): case err := <-err1: diff --git a/les/test_helper.go b/les/test_helper.go index fb1965eebe..d9ffe32db2 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -166,7 +166,7 @@ func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.Indexer return indexers[:] } -func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, indexers []*core.ChainIndexer, db ethdb.Database, peers *peerSet, ulcServers []string, ulcFraction int) *clientHandler { +func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, indexers []*core.ChainIndexer, db ethdb.Database, peers *serverPeerSet, ulcServers []string, ulcFraction int) *clientHandler { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() @@ -206,9 +206,9 @@ func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, index chainDb: db, oracle: oracle, chainReader: chain, - peers: peers, closeCh: make(chan struct{}), }, + peers: peers, reqDist: odr.retriever.dist, retriever: odr.retriever, odr: odr, @@ -224,7 +224,7 @@ func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, index return client.handler } -func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Database, peers *peerSet, clock mclock.Clock) (*serverHandler, *backends.SimulatedBackend) { +func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Database, peers *clientPeerSet, clock mclock.Clock) (*serverHandler, *backends.SimulatedBackend) { var ( gspec = core.Genesis{ Config: params.AllEthashProtocolChanges, @@ -269,9 +269,9 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da chainDb: db, chainReader: simulation.Blockchain(), oracle: oracle, - peers: peers, closeCh: make(chan struct{}), }, + peers: peers, servingQueue: newServingQueue(int64(time.Millisecond*10), 1), defParams: flowcontrol.ServerParams{ BufLimit: testBufLimit, @@ -294,7 +294,8 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da // testPeer is a simulated peer to allow testing direct network calls. type testPeer struct { - peer *peer + cpeer *clientPeer + speer *serverPeer net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side @@ -308,7 +309,7 @@ func newTestPeer(t *testing.T, name string, version int, handler *serverHandler, // Generate a random id and create the peer var id enode.ID rand.Read(id[:]) - peer := newPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), net) + peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) // Start the peer on a new thread errCh := make(chan error, 1) @@ -320,9 +321,9 @@ func newTestPeer(t *testing.T, name string, version int, handler *serverHandler, } }() tp := &testPeer{ - app: app, - net: net, - peer: peer, + app: app, + net: net, + cpeer: peer, } // Execute any implicitly requested handshakes and return if shake { @@ -354,8 +355,8 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl var id enode.ID rand.Read(id[:]) - peer1 := newPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), net) - peer2 := newPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), app) + peer1 := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) + peer2 := newServerPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), app) // Start the peer on a new thread errc1 := make(chan error, 1) @@ -374,14 +375,14 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl case errc1 <- client.handle(peer2): } }() - return &testPeer{peer: peer1, net: net, app: app}, errc1, &testPeer{peer: peer2, net: app, app: net}, errc2 + return &testPeer{cpeer: peer1, net: net, app: app}, errc1, &testPeer{speer: peer2, net: app, app: net}, errc2 } // handshake simulates a trivial handshake that expects the same state from the // remote side as we are simulating locally. func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, costList RequestCostList) { var expList keyValueList - expList = expList.add("protocolVersion", uint64(p.peer.version)) + expList = expList.add("protocolVersion", uint64(p.cpeer.version)) expList = expList.add("networkId", uint64(NetworkId)) expList = expList.add("headTd", td) expList = expList.add("headHash", head) @@ -404,7 +405,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu if err := p2p.Send(p.app, StatusMsg, sendList); err != nil { t.Fatalf("status send: %v", err) } - p.peer.fcParams = flowcontrol.ServerParams{ + p.cpeer.fcParams = flowcontrol.ServerParams{ BufLimit: testBufLimit, MinRecharge: testBufRecharge, } @@ -445,7 +446,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallba if simClock { clock = &mclock.Simulated{} } - handler, b := newTestServerHandler(blocks, indexers, db, newPeerSet(), clock) + handler, b := newTestServerHandler(blocks, indexers, db, newClientPeerSet(), clock) var peer *testPeer if newPeer { @@ -473,6 +474,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallba teardown := func() { if newPeer { peer.close() + peer.cpeer.close() b.Close() } cIndexer.Close() @@ -483,14 +485,14 @@ func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallba func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool) (*testServer, *testClient, func()) { sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase() - speers, cPeers := newPeerSet(), newPeerSet() + speers, cpeers := newServerPeerSet(), newClientPeerSet() var clock mclock.Clock = &mclock.System{} if simClock { clock = &mclock.Simulated{} } - dist := newRequestDistributor(cPeers, clock) - rm := newRetrieveManager(cPeers, dist, nil) + dist := newRequestDistributor(speers, clock) + rm := newRetrieveManager(speers, dist, nil) odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm) sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig) @@ -500,8 +502,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2] odr.SetIndexers(ccIndexer, cbIndexer, cbtIndexer) - server, b := newTestServerHandler(blocks, sindexers, sdb, speers, clock) - client := newTestClientHandler(b, odr, cIndexers, cdb, cPeers, ulcServers, ulcFraction) + server, b := newTestServerHandler(blocks, sindexers, sdb, cpeers, clock) + client := newTestClientHandler(b, odr, cIndexers, cdb, speers, ulcServers, ulcFraction) scIndexer.Start(server.blockchain) sbIndexer.Start(server.blockchain) @@ -548,6 +550,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer if connect { speer.close() cpeer.close() + cpeer.cpeer.close() + speer.speer.close() } ccIndexer.Close() cbIndexer.Close() diff --git a/les/txrelay.go b/les/txrelay.go index d37a18faff..595c4d5808 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -27,14 +27,13 @@ import ( type ltrInfo struct { tx *types.Transaction - sentTo map[*peer]struct{} + sentTo map[*serverPeer]struct{} } type lesTxRelay struct { txSent map[common.Hash]*ltrInfo txPending map[common.Hash]struct{} - ps *peerSet - peerList []*peer + peerList []*serverPeer peerStartPos int lock sync.RWMutex stop chan struct{} @@ -42,15 +41,14 @@ type lesTxRelay struct { retriever *retrieveManager } -func newLesTxRelay(ps *peerSet, retriever *retrieveManager) *lesTxRelay { +func newLesTxRelay(ps *serverPeerSet, retriever *retrieveManager) *lesTxRelay { r := &lesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), - ps: ps, retriever: retriever, stop: make(chan struct{}), } - ps.notify(r) + ps.subscribe(r) return r } @@ -58,24 +56,34 @@ func (ltrx *lesTxRelay) Stop() { close(ltrx.stop) } -func (ltrx *lesTxRelay) registerPeer(p *peer) { +func (ltrx *lesTxRelay) registerPeer(p *serverPeer) { ltrx.lock.Lock() defer ltrx.lock.Unlock() - ltrx.peerList = ltrx.ps.AllPeers() + // Short circuit if the peer is announce only. + if p.onlyAnnounce { + return + } + ltrx.peerList = append(ltrx.peerList, p) } -func (ltrx *lesTxRelay) unregisterPeer(p *peer) { +func (ltrx *lesTxRelay) unregisterPeer(p *serverPeer) { ltrx.lock.Lock() defer ltrx.lock.Unlock() - ltrx.peerList = ltrx.ps.AllPeers() + for i, peer := range ltrx.peerList { + if peer == p { + // Remove from the peer list + ltrx.peerList = append(ltrx.peerList[:i], ltrx.peerList[i+1:]...) + return + } + } } // send sends a list of transactions to at most a given number of peers at // once, never resending any particular transaction to the same peer twice func (ltrx *lesTxRelay) send(txs types.Transactions, count int) { - sendTo := make(map[*peer]types.Transactions) + sendTo := make(map[*serverPeer]types.Transactions) ltrx.peerStartPos++ // rotate the starting position of the peer list if ltrx.peerStartPos >= len(ltrx.peerList) { @@ -88,7 +96,7 @@ func (ltrx *lesTxRelay) send(txs types.Transactions, count int) { if !ok { ltr = <rInfo{ tx: tx, - sentTo: make(map[*peer]struct{}), + sentTo: make(map[*serverPeer]struct{}), } ltrx.txSent[hash] = ltr ltrx.txPending[hash] = struct{}{} @@ -126,17 +134,17 @@ func (ltrx *lesTxRelay) send(txs types.Transactions, count int) { reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetTxRelayCost(len(ll), len(enc)) + peer := dp.(*serverPeer) + return peer.getTxRelayCost(len(ll), len(enc)) }, canSend: func(dp distPeer) bool { - return !dp.(*peer).onlyAnnounce && dp.(*peer) == pp + return !dp.(*serverPeer).onlyAnnounce && dp.(*serverPeer) == pp }, request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetTxRelayCost(len(ll), len(enc)) + peer := dp.(*serverPeer) + cost := peer.getTxRelayCost(len(ll), len(enc)) peer.fcServer.QueuedRequest(reqID, cost) - return func() { peer.SendTxs(reqID, cost, enc) } + return func() { peer.sendTxs(reqID, enc) } }, } go ltrx.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, ltrx.stop) diff --git a/les/ulc_test.go b/les/ulc_test.go index 9112bf928c..273c63e4bd 100644 --- a/les/ulc_test.go +++ b/les/ulc_test.go @@ -54,14 +54,14 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) { ids []string ) for i := 0; i < len(testcase.height); i++ { - s, n, teardown := newServerPeer(t, 0, protocol) + s, n, teardown := newTestServerPeer(t, 0, protocol) servers = append(servers, s) nodes = append(nodes, n) teardowns = append(teardowns, teardown) ids = append(ids, n.String()) } - c, teardown := newLightPeer(t, protocol, ids, testcase.threshold) + c, teardown := newTestLightPeer(t, protocol, ids, testcase.threshold) // Connect all servers. for i := 0; i < len(servers); i++ { @@ -86,15 +86,15 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) { } } -func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int) (*peer, *peer, error) { +func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int) (*serverPeer, *clientPeer, error) { // Create a message pipe to communicate through app, net := p2p.MsgPipe() var id enode.ID rand.Read(id[:]) - peer1 := newPeer(protocol, NetworkId, true, p2p.NewPeer(serverId, "", nil), net) // Mark server as trusted - peer2 := newPeer(protocol, NetworkId, false, p2p.NewPeer(id, "", nil), app) + peer1 := newServerPeer(protocol, NetworkId, true, p2p.NewPeer(serverId, "", nil), net) // Mark server as trusted + peer2 := newClientPeer(protocol, NetworkId, p2p.NewPeer(id, "", nil), app) // Start the peerLight on a new thread errc1 := make(chan error, 1) @@ -124,8 +124,8 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr return peer1, peer2, nil } -// newServerPeer creates server peer. -func newServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *enode.Node, func()) { +// newTestServerPeer creates server peer. +func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *enode.Node, func()) { s, teardown := newServerEnv(t, blocks, protocol, nil, false, false, 0) key, err := crypto.GenerateKey() if err != nil { @@ -136,8 +136,8 @@ func newServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *enode. return s, n, teardown } -// newLightPeer creates node with light sync mode -func newLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) { +// newTestLightPeer creates node with light sync mode +func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) { _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false) return c, teardown }