p2p/discv5: fix reg lookup, polish code, use logger (#15737)

pull/15767/merge
Péter Szilágyi 7 years ago committed by Felix Lange
parent 5369a5c54d
commit c15d76a40f
  1. 56
      p2p/discv5/net.go
  2. 5
      p2p/discv5/node.go
  3. 217
      p2p/discv5/ticket.go
  4. 3
      p2p/discv5/topic.go

@ -51,16 +51,9 @@ const (
const testTopic = "foo" const testTopic = "foo"
const ( const (
printDebugLogs = false
printTestImgLogs = false printTestImgLogs = false
) )
func debugLog(s string) {
if printDebugLogs {
fmt.Println(s)
}
}
// Network manages the table and all protocol interaction. // Network manages the table and all protocol interaction.
type Network struct { type Network struct {
db *nodeDB // database of known nodes db *nodeDB // database of known nodes
@ -388,14 +381,14 @@ func (net *Network) loop() {
} }
}() }()
resetNextTicket := func() { resetNextTicket := func() {
t, timeout := net.ticketStore.nextFilteredTicket() ticket, timeout := net.ticketStore.nextFilteredTicket()
if t != nextTicket { if nextTicket != ticket {
nextTicket = t nextTicket = ticket
if nextRegisterTimer != nil { if nextRegisterTimer != nil {
nextRegisterTimer.Stop() nextRegisterTimer.Stop()
nextRegisterTime = nil nextRegisterTime = nil
} }
if t != nil { if ticket != nil {
nextRegisterTimer = time.NewTimer(timeout) nextRegisterTimer = time.NewTimer(timeout)
nextRegisterTime = nextRegisterTimer.C nextRegisterTime = nextRegisterTimer.C
} }
@ -423,13 +416,13 @@ loop:
select { select {
case <-net.closeReq: case <-net.closeReq:
debugLog("<-net.closeReq") log.Trace("<-net.closeReq")
break loop break loop
// Ingress packet handling. // Ingress packet handling.
case pkt := <-net.read: case pkt := <-net.read:
//fmt.Println("read", pkt.ev) //fmt.Println("read", pkt.ev)
debugLog("<-net.read") log.Trace("<-net.read")
n := net.internNode(&pkt) n := net.internNode(&pkt)
prestate := n.state prestate := n.state
status := "ok" status := "ok"
@ -444,7 +437,7 @@ loop:
// State transition timeouts. // State transition timeouts.
case timeout := <-net.timeout: case timeout := <-net.timeout:
debugLog("<-net.timeout") log.Trace("<-net.timeout")
if net.timeoutTimers[timeout] == nil { if net.timeoutTimers[timeout] == nil {
// Stale timer (was aborted). // Stale timer (was aborted).
continue continue
@ -462,20 +455,20 @@ loop:
// Querying. // Querying.
case q := <-net.queryReq: case q := <-net.queryReq:
debugLog("<-net.queryReq") log.Trace("<-net.queryReq")
if !q.start(net) { if !q.start(net) {
q.remote.deferQuery(q) q.remote.deferQuery(q)
} }
// Interacting with the table. // Interacting with the table.
case f := <-net.tableOpReq: case f := <-net.tableOpReq:
debugLog("<-net.tableOpReq") log.Trace("<-net.tableOpReq")
f() f()
net.tableOpResp <- struct{}{} net.tableOpResp <- struct{}{}
// Topic registration stuff. // Topic registration stuff.
case req := <-net.topicRegisterReq: case req := <-net.topicRegisterReq:
debugLog("<-net.topicRegisterReq") log.Trace("<-net.topicRegisterReq")
if !req.add { if !req.add {
net.ticketStore.removeRegisterTopic(req.topic) net.ticketStore.removeRegisterTopic(req.topic)
continue continue
@ -486,7 +479,7 @@ loop:
// determination for new topics. // determination for new topics.
// if topicRegisterLookupDone == nil { // if topicRegisterLookupDone == nil {
if topicRegisterLookupTarget.target == (common.Hash{}) { if topicRegisterLookupTarget.target == (common.Hash{}) {
debugLog("topicRegisterLookupTarget == null") log.Trace("topicRegisterLookupTarget == null")
if topicRegisterLookupTick.Stop() { if topicRegisterLookupTick.Stop() {
<-topicRegisterLookupTick.C <-topicRegisterLookupTick.C
} }
@ -496,7 +489,7 @@ loop:
} }
case nodes := <-topicRegisterLookupDone: case nodes := <-topicRegisterLookupDone:
debugLog("<-topicRegisterLookupDone") log.Trace("<-topicRegisterLookupDone")
net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte { net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte {
net.ping(n, n.addr()) net.ping(n, n.addr())
return n.pingEcho return n.pingEcho
@ -507,7 +500,7 @@ loop:
topicRegisterLookupDone = nil topicRegisterLookupDone = nil
case <-topicRegisterLookupTick.C: case <-topicRegisterLookupTick.C:
debugLog("<-topicRegisterLookupTick") log.Trace("<-topicRegisterLookupTick")
if (topicRegisterLookupTarget.target == common.Hash{}) { if (topicRegisterLookupTarget.target == common.Hash{}) {
target, delay := net.ticketStore.nextRegisterLookup() target, delay := net.ticketStore.nextRegisterLookup()
topicRegisterLookupTarget = target topicRegisterLookupTarget = target
@ -520,14 +513,14 @@ loop:
} }
case <-nextRegisterTime: case <-nextRegisterTime:
debugLog("<-nextRegisterTime") log.Trace("<-nextRegisterTime")
net.ticketStore.ticketRegistered(*nextTicket) net.ticketStore.ticketRegistered(*nextTicket)
//fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) //fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong) net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
case req := <-net.topicSearchReq: case req := <-net.topicSearchReq:
if refreshDone == nil { if refreshDone == nil {
debugLog("<-net.topicSearchReq") log.Trace("<-net.topicSearchReq")
info, ok := searchInfo[req.topic] info, ok := searchInfo[req.topic]
if ok { if ok {
if req.delay == time.Duration(0) { if req.delay == time.Duration(0) {
@ -588,7 +581,7 @@ loop:
}) })
case <-statsDump.C: case <-statsDump.C:
debugLog("<-statsDump.C") log.Trace("<-statsDump.C")
/*r, ok := net.ticketStore.radius[testTopic] /*r, ok := net.ticketStore.radius[testTopic]
if !ok { if !ok {
fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now()) fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now())
@ -617,7 +610,7 @@ loop:
// Periodic / lookup-initiated bucket refresh. // Periodic / lookup-initiated bucket refresh.
case <-refreshTimer.C: case <-refreshTimer.C:
debugLog("<-refreshTimer.C") log.Trace("<-refreshTimer.C")
// TODO: ideally we would start the refresh timer after // TODO: ideally we would start the refresh timer after
// fallback nodes have been set for the first time. // fallback nodes have been set for the first time.
if refreshDone == nil { if refreshDone == nil {
@ -631,7 +624,7 @@ loop:
bucketRefreshTimer.Reset(bucketRefreshInterval) bucketRefreshTimer.Reset(bucketRefreshInterval)
}() }()
case newNursery := <-net.refreshReq: case newNursery := <-net.refreshReq:
debugLog("<-net.refreshReq") log.Trace("<-net.refreshReq")
if newNursery != nil { if newNursery != nil {
net.nursery = newNursery net.nursery = newNursery
} }
@ -641,7 +634,7 @@ loop:
} }
net.refreshResp <- refreshDone net.refreshResp <- refreshDone
case <-refreshDone: case <-refreshDone:
debugLog("<-net.refreshDone") log.Trace("<-net.refreshDone")
refreshDone = nil refreshDone = nil
list := searchReqWhenRefreshDone list := searchReqWhenRefreshDone
searchReqWhenRefreshDone = nil searchReqWhenRefreshDone = nil
@ -652,7 +645,7 @@ loop:
}() }()
} }
} }
debugLog("loop stopped") log.Trace("loop stopped")
log.Debug(fmt.Sprintf("shutting down")) log.Debug(fmt.Sprintf("shutting down"))
if net.conn != nil { if net.conn != nil {
@ -1109,14 +1102,14 @@ func (net *Network) ping(n *Node, addr *net.UDPAddr) {
//fmt.Println(" not sent") //fmt.Println(" not sent")
return return
} }
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8])) log.Trace("Pinging remote node", "node", n.ID)
n.pingTopics = net.ticketStore.regTopicSet() n.pingTopics = net.ticketStore.regTopicSet()
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics) n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
net.timedEvent(respTimeout, n, pongTimeout) net.timedEvent(respTimeout, n, pongTimeout)
} }
func (net *Network) handlePing(n *Node, pkt *ingressPacket) { func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
debugLog(fmt.Sprintf("handlePing(node = %x)", n.ID[:8])) log.Trace("Handling remote ping", "node", n.ID)
ping := pkt.data.(*ping) ping := pkt.data.(*ping)
n.TCP = ping.From.TCP n.TCP = ping.From.TCP
t := net.topictab.getTicket(n, ping.Topics) t := net.topictab.getTicket(n, ping.Topics)
@ -1131,7 +1124,7 @@ func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
} }
func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error { func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error {
debugLog(fmt.Sprintf("handleKnownPong(node = %x)", n.ID[:8])) log.Trace("Handling known pong", "node", n.ID)
net.abortTimedEvent(n, pongTimeout) net.abortTimedEvent(n, pongTimeout)
now := mclock.Now() now := mclock.Now()
ticket, err := pongToTicket(now, n.pingTopics, n, pkt) ticket, err := pongToTicket(now, n.pingTopics, n, pkt)
@ -1139,9 +1132,8 @@ func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error {
// fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data) // fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data)
net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket) net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket)
} else { } else {
debugLog(fmt.Sprintf(" error: %v", err)) log.Trace("Failed to convert pong to ticket", "err", err)
} }
n.pingEcho = nil n.pingEcho = nil
n.pingTopics = nil n.pingTopics = nil
return err return err

@ -273,6 +273,11 @@ func (n NodeID) GoString() string {
return fmt.Sprintf("discover.HexID(\"%x\")", n[:]) return fmt.Sprintf("discover.HexID(\"%x\")", n[:])
} }
// TerminalString returns a shortened hex string for terminal logging.
func (n NodeID) TerminalString() string {
return hex.EncodeToString(n[:8])
}
// HexID converts a hex string to a NodeID. // HexID converts a hex string to a NodeID.
// The string may be prefixed with 0x. // The string may be prefixed with 0x.
func HexID(in string) (NodeID, error) { func HexID(in string) (NodeID, error) {

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
) )
const ( const (
@ -128,8 +129,11 @@ type ticketStore struct {
// Contains buckets (for each absolute minute) of tickets // Contains buckets (for each absolute minute) of tickets
// that can be used in that minute. // that can be used in that minute.
// This is only set if the topic is being registered. // This is only set if the topic is being registered.
tickets map[Topic]topicTickets tickets map[Topic]*topicTickets
regtopics []Topic
regQueue []Topic // Topic registration queue for round robin attempts
regSet map[Topic]struct{} // Topic registration queue contents for fast filling
nodes map[*Node]*ticket nodes map[*Node]*ticket
nodeLastReq map[*Node]reqInfo nodeLastReq map[*Node]reqInfo
@ -152,14 +156,16 @@ type sentQuery struct {
} }
type topicTickets struct { type topicTickets struct {
buckets map[timeBucket][]ticketRef buckets map[timeBucket][]ticketRef
nextLookup, nextReg mclock.AbsTime nextLookup mclock.AbsTime
nextReg mclock.AbsTime
} }
func newTicketStore() *ticketStore { func newTicketStore() *ticketStore {
return &ticketStore{ return &ticketStore{
radius: make(map[Topic]*topicRadius), radius: make(map[Topic]*topicRadius),
tickets: make(map[Topic]topicTickets), tickets: make(map[Topic]*topicTickets),
regSet: make(map[Topic]struct{}),
nodes: make(map[*Node]*ticket), nodes: make(map[*Node]*ticket),
nodeLastReq: make(map[*Node]reqInfo), nodeLastReq: make(map[*Node]reqInfo),
searchTopicMap: make(map[Topic]searchTopic), searchTopicMap: make(map[Topic]searchTopic),
@ -169,13 +175,13 @@ func newTicketStore() *ticketStore {
// addTopic starts tracking a topic. If register is true, // addTopic starts tracking a topic. If register is true,
// the local node will register the topic and tickets will be collected. // the local node will register the topic and tickets will be collected.
func (s *ticketStore) addTopic(t Topic, register bool) { func (s *ticketStore) addTopic(topic Topic, register bool) {
debugLog(fmt.Sprintf(" addTopic(%v, %v)", t, register)) log.Trace("Adding discovery topic", "topic", topic, "register", register)
if s.radius[t] == nil { if s.radius[topic] == nil {
s.radius[t] = newTopicRadius(t) s.radius[topic] = newTopicRadius(topic)
} }
if register && s.tickets[t].buckets == nil { if register && s.tickets[topic] == nil {
s.tickets[t] = topicTickets{buckets: make(map[timeBucket][]ticketRef)} s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
} }
} }
@ -194,7 +200,11 @@ func (s *ticketStore) removeSearchTopic(t Topic) {
// removeRegisterTopic deletes all tickets for the given topic. // removeRegisterTopic deletes all tickets for the given topic.
func (s *ticketStore) removeRegisterTopic(topic Topic) { func (s *ticketStore) removeRegisterTopic(topic Topic) {
debugLog(fmt.Sprintf(" removeRegisterTopic(%v)", topic)) log.Trace("Removing discovery topic", "topic", topic)
if s.tickets[topic] == nil {
log.Warn("Removing non-existent discovery topic", "topic", topic)
return
}
for _, list := range s.tickets[topic].buckets { for _, list := range s.tickets[topic].buckets {
for _, ref := range list { for _, ref := range list {
ref.t.refCnt-- ref.t.refCnt--
@ -216,23 +226,35 @@ func (s *ticketStore) regTopicSet() []Topic {
} }
// nextRegisterLookup returns the target of the next lookup for ticket collection. // nextRegisterLookup returns the target of the next lookup for ticket collection.
func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Duration) { func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
debugLog("nextRegisterLookup()") // Queue up any new topics (or discarded ones), preserving iteration order
firstTopic, ok := s.iterRegTopics() for topic := range s.tickets {
for topic := firstTopic; ok; { if _, ok := s.regSet[topic]; !ok {
debugLog(fmt.Sprintf(" checking topic %v, len(s.tickets[topic]) = %d", topic, len(s.tickets[topic].buckets))) s.regQueue = append(s.regQueue, topic)
if s.tickets[topic].buckets != nil && s.needMoreTickets(topic) { s.regSet[topic] = struct{}{}
next := s.radius[topic].nextTarget(false)
debugLog(fmt.Sprintf(" %x 1s", next.target[:8]))
return next, 100 * time.Millisecond
} }
topic, ok = s.iterRegTopics() }
if topic == firstTopic { // Iterate over the set of all topics and look up the next suitable one
break // We have checked all topics. for len(s.regQueue) > 0 {
// Fetch the next topic from the queue, and ensure it still exists
topic := s.regQueue[0]
s.regQueue = s.regQueue[1:]
delete(s.regSet, topic)
if s.tickets[topic] == nil {
continue
}
// If the topic needs more tickets, return it
if s.tickets[topic].nextLookup < mclock.Now() {
next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
return next, delay
} }
} }
debugLog(" null, 40s") // No registration topics found or all exhausted, sleep
return lookupInfo{}, 40 * time.Second delay := 40 * time.Second
log.Trace("No topic found to register", "delay", delay)
return lookupInfo{}, delay
} }
func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
@ -246,40 +268,22 @@ func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
return target return target
} }
// iterRegTopics returns topics to register in arbitrary order. // ticketsInWindow returns the tickets of a given topic in the registration window.
// The second return value is false if there are no topics. func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
func (s *ticketStore) iterRegTopics() (Topic, bool) { // Sanity check that the topic still exists before operating on it
debugLog("iterRegTopics()") if s.tickets[topic] == nil {
if len(s.regtopics) == 0 { log.Warn("Listing non-existing discovery tickets", "topic", topic)
if len(s.tickets) == 0 { return nil
debugLog(" false")
return "", false
}
// Refill register list.
for t := range s.tickets {
s.regtopics = append(s.regtopics, t)
}
} }
topic := s.regtopics[len(s.regtopics)-1] // Gather all the tickers in the next time window
s.regtopics = s.regtopics[:len(s.regtopics)-1] var tickets []ticketRef
debugLog(" " + string(topic) + " true")
return topic, true
}
func (s *ticketStore) needMoreTickets(t Topic) bool {
return s.tickets[t].nextLookup < mclock.Now()
}
// ticketsInWindow returns the tickets of a given topic in the registration window. buckets := s.tickets[topic].buckets
func (s *ticketStore) ticketsInWindow(t Topic) []ticketRef { for idx := timeBucket(0); idx < timeWindow; idx++ {
ltBucket := s.lastBucketFetched tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
var res []ticketRef
tickets := s.tickets[t].buckets
for g := ltBucket; g < ltBucket+timeWindow; g++ {
res = append(res, tickets[g]...)
} }
debugLog(fmt.Sprintf("ticketsInWindow(%v) = %v", t, len(res))) log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
return res return tickets
} }
func (s *ticketStore) removeExcessTickets(t Topic) { func (s *ticketStore) removeExcessTickets(t Topic) {
@ -317,53 +321,55 @@ func (s ticketRefByWaitTime) Swap(i, j int) {
func (s *ticketStore) addTicketRef(r ticketRef) { func (s *ticketStore) addTicketRef(r ticketRef) {
topic := r.t.topics[r.idx] topic := r.t.topics[r.idx]
t := s.tickets[topic] tickets := s.tickets[topic]
if t.buckets == nil { if tickets == nil {
log.Warn("Adding ticket to non-existent topic", "topic", topic)
return return
} }
bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen)) bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
t.buckets[bucket] = append(t.buckets[bucket], r) tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
r.t.refCnt++ r.t.refCnt++
min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
if t.nextLookup < min { if tickets.nextLookup < min {
t.nextLookup = min tickets.nextLookup = min
} }
t.nextLookup += mclock.AbsTime(collectFrequency) tickets.nextLookup += mclock.AbsTime(collectFrequency)
s.tickets[topic] = t
//s.removeExcessTickets(topic) //s.removeExcessTickets(topic)
} }
func (s *ticketStore) nextFilteredTicket() (t *ticketRef, wait time.Duration) { func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
now := mclock.Now() now := mclock.Now()
for { for {
t, wait = s.nextRegisterableTicket() ticket, wait := s.nextRegisterableTicket()
if t == nil { if ticket == nil {
return return ticket, wait
} }
log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait)
regTime := now + mclock.AbsTime(wait) regTime := now + mclock.AbsTime(wait)
topic := t.t.topics[t.idx] topic := ticket.t.topics[ticket.idx]
if regTime >= s.tickets[topic].nextReg { if regTime >= s.tickets[topic].nextReg {
return return ticket, wait
} }
s.removeTicketRef(*t) s.removeTicketRef(*ticket)
} }
} }
func (s *ticketStore) ticketRegistered(t ticketRef) { func (s *ticketStore) ticketRegistered(ref ticketRef) {
now := mclock.Now() now := mclock.Now()
topic := t.t.topics[t.idx] topic := ref.t.topics[ref.idx]
tt := s.tickets[topic] tickets := s.tickets[topic]
min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
if min > tt.nextReg { if min > tickets.nextReg {
tt.nextReg = min tickets.nextReg = min
} }
tt.nextReg += mclock.AbsTime(registerFrequency) tickets.nextReg += mclock.AbsTime(registerFrequency)
s.tickets[topic] = tt s.tickets[topic] = tickets
s.removeTicketRef(t) s.removeTicketRef(ref)
} }
// nextRegisterableTicket returns the next ticket that can be used // nextRegisterableTicket returns the next ticket that can be used
@ -374,16 +380,7 @@ func (s *ticketStore) ticketRegistered(t ticketRef) {
// //
// A ticket can be returned more than once with <= zero wait time in case // A ticket can be returned more than once with <= zero wait time in case
// the ticket contains multiple topics. // the ticket contains multiple topics.
func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration) { func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
defer func() {
if t == nil {
debugLog(" nil")
} else {
debugLog(fmt.Sprintf(" node = %x sn = %v wait = %v", t.t.node.ID[:8], t.t.serial, wait))
}
}()
debugLog("nextRegisterableTicket()")
now := mclock.Now() now := mclock.Now()
if s.nextTicketCached != nil { if s.nextTicketCached != nil {
return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now) return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
@ -412,9 +409,8 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration
return nil, 0 return nil, 0
} }
if nextTicket.t != nil { if nextTicket.t != nil {
wait = time.Duration(nextTicket.topicRegTime() - now)
s.nextTicketCached = &nextTicket s.nextTicketCached = &nextTicket
return &nextTicket, wait return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
} }
s.lastBucketFetched = bucket s.lastBucketFetched = bucket
} }
@ -422,14 +418,17 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration
// removeTicket removes a ticket from the ticket store // removeTicket removes a ticket from the ticket store
func (s *ticketStore) removeTicketRef(ref ticketRef) { func (s *ticketStore) removeTicketRef(ref ticketRef) {
debugLog(fmt.Sprintf("removeTicketRef(node = %x sn = %v)", ref.t.node.ID[:8], ref.t.serial)) log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
topic := ref.topic() topic := ref.topic()
tickets := s.tickets[topic].buckets tickets := s.tickets[topic]
if tickets == nil { if tickets == nil {
log.Warn("Removing tickets from unknown topic", "topic", topic)
return return
} }
bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen)) bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
list := tickets[bucket] list := tickets.buckets[bucket]
idx := -1 idx := -1
for i, bt := range list { for i, bt := range list {
if bt.t == ref.t { if bt.t == ref.t {
@ -442,9 +441,9 @@ func (s *ticketStore) removeTicketRef(ref ticketRef) {
} }
list = append(list[:idx], list[idx+1:]...) list = append(list[:idx], list[idx+1:]...)
if len(list) != 0 { if len(list) != 0 {
tickets[bucket] = list tickets.buckets[bucket] = list
} else { } else {
delete(tickets, bucket) delete(tickets.buckets, bucket)
} }
ref.t.refCnt-- ref.t.refCnt--
if ref.t.refCnt == 0 { if ref.t.refCnt == 0 {
@ -523,21 +522,21 @@ func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Has
} }
} }
func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ticket) { func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) {
debugLog(fmt.Sprintf("add(node = %x sn = %v)", t.node.ID[:8], t.serial)) log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
lastReq, ok := s.nodeLastReq[t.node] lastReq, ok := s.nodeLastReq[ticket.node]
if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) { if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
return return
} }
s.adjustWithTicket(localTime, lastReq.lookup.target, t) s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
if lastReq.lookup.radiusLookup || s.nodes[t.node] != nil { if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
return return
} }
topic := lastReq.lookup.topic topic := lastReq.lookup.topic
topicIdx := t.findIdx(topic) topicIdx := ticket.findIdx(topic)
if topicIdx == -1 { if topicIdx == -1 {
return return
} }
@ -548,29 +547,29 @@ func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ti
} }
if _, ok := s.tickets[topic]; ok { if _, ok := s.tickets[topic]; ok {
wait := t.regTime[topicIdx] - localTime wait := ticket.regTime[topicIdx] - localTime
rnd := rand.ExpFloat64() rnd := rand.ExpFloat64()
if rnd > 10 { if rnd > 10 {
rnd = 10 rnd = 10
} }
if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd { if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
// use the ticket to register this topic // use the ticket to register this topic
//fmt.Println("addTicket", t.node.ID[:8], t.node.addr().String(), t.serial, t.pong) //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
s.addTicketRef(ticketRef{t, topicIdx}) s.addTicketRef(ticketRef{ticket, topicIdx})
} }
} }
if t.refCnt > 0 { if ticket.refCnt > 0 {
s.nextTicketCached = nil s.nextTicketCached = nil
s.nodes[t.node] = t s.nodes[ticket.node] = ticket
} }
} }
func (s *ticketStore) getNodeTicket(node *Node) *ticket { func (s *ticketStore) getNodeTicket(node *Node) *ticket {
if s.nodes[node] == nil { if s.nodes[node] == nil {
debugLog(fmt.Sprintf("getNodeTicket(%x) sn = nil", node.ID[:8])) log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil)
} else { } else {
debugLog(fmt.Sprintf("getNodeTicket(%x) sn = %v", node.ID[:8], s.nodes[node].serial)) log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
} }
return s.nodes[node] return s.nodes[node]
} }

@ -24,6 +24,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
) )
const ( const (
@ -235,7 +236,7 @@ func (t *topicTable) deleteEntry(e *topicEntry) {
// It is assumed that topics and waitPeriods have the same length. // It is assumed that topics and waitPeriods have the same length.
func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) { func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
debugLog(fmt.Sprintf("useTicket %v %v %v", serialNo, topics, waitPeriods)) log.Trace("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
//fmt.Println("useTicket", serialNo, topics, waitPeriods) //fmt.Println("useTicket", serialNo, topics, waitPeriods)
t.collectGarbage() t.collectGarbage()

Loading…
Cancel
Save