diff --git a/common/mclock/alarm.go b/common/mclock/alarm.go new file mode 100644 index 0000000000..e83810a6a0 --- /dev/null +++ b/common/mclock/alarm.go @@ -0,0 +1,106 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mclock + +import ( + "time" +) + +// Alarm sends timed notifications on a channel. This is very similar to a regular timer, +// but is easier to use in code that needs to re-schedule the same timer over and over. +// +// When scheduling an Alarm, the channel returned by C() will receive a value no later +// than the scheduled time. An Alarm can be reused after it has fired and can also be +// canceled by calling Stop. +type Alarm struct { + ch chan struct{} + clock Clock + timer Timer + deadline AbsTime +} + +// NewAlarm creates an Alarm. +func NewAlarm(clock Clock) *Alarm { + if clock == nil { + panic("nil clock") + } + return &Alarm{ + ch: make(chan struct{}, 1), + clock: clock, + } +} + +// C returns the alarm notification channel. This channel remains identical for +// the entire lifetime of the alarm, and is never closed. +func (e *Alarm) C() <-chan struct{} { + return e.ch +} + +// Stop cancels the alarm and drains the channel. +// This method is not safe for concurrent use. +func (e *Alarm) Stop() { + // Clear timer. + if e.timer != nil { + e.timer.Stop() + } + e.deadline = 0 + + // Drain the channel. + select { + case <-e.ch: + default: + } +} + +// Schedule sets the alarm to fire no later than the given time. If the alarm was already +// scheduled but has not fired yet, it may fire earlier than the newly-scheduled time. +func (e *Alarm) Schedule(time AbsTime) { + now := e.clock.Now() + e.schedule(now, time) +} + +func (e *Alarm) schedule(now, newDeadline AbsTime) { + if e.timer != nil { + if e.deadline > now && e.deadline <= newDeadline { + // Here, the current timer can be reused because it is already scheduled to + // occur earlier than the new deadline. + // + // The e.deadline > now part of the condition is important. If the old + // deadline lies in the past, we assume the timer has already fired and needs + // to be rescheduled. + return + } + e.timer.Stop() + } + + // Set the timer. + d := time.Duration(0) + if newDeadline < now { + newDeadline = now + } else { + d = newDeadline.Sub(now) + } + e.timer = e.clock.AfterFunc(d, e.send) + e.deadline = newDeadline +} + +func (e *Alarm) send() { + select { + case e.ch <- struct{}{}: + default: + } +} diff --git a/common/mclock/alarm_test.go b/common/mclock/alarm_test.go new file mode 100644 index 0000000000..d2ad9913fd --- /dev/null +++ b/common/mclock/alarm_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mclock + +import "testing" + +// This test checks basic functionality of Alarm. +func TestAlarm(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 10) + if recv(a.C()) { + t.Fatal("Alarm fired before scheduled deadline") + } + if ntimers := clk.ActiveTimers(); ntimers != 1 { + t.Fatal("clock has", ntimers, "active timers, want", 1) + } + clk.Run(5) + if recv(a.C()) { + t.Fatal("Alarm fired too early") + } + + clk.Run(5) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } + if recv(a.C()) { + t.Fatal("Alarm fired twice") + } + if ntimers := clk.ActiveTimers(); ntimers != 0 { + t.Fatal("clock has", ntimers, "active timers, want", 0) + } + + a.Schedule(clk.Now() + 5) + if recv(a.C()) { + t.Fatal("Alarm fired before scheduled deadline when scheduling the second event") + } + + clk.Run(5) + if !recv(a.C()) { + t.Fatal("Alarm did not fire when scheduling the second event") + } + if recv(a.C()) { + t.Fatal("Alarm fired twice when scheduling the second event") + } +} + +// This test checks that scheduling an Alarm to an earlier time than the +// one already scheduled works properly. +func TestAlarmScheduleEarlier(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 50) + clk.Run(5) + a.Schedule(clk.Now() + 1) + clk.Run(3) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } +} + +// This test checks that scheduling an Alarm to a later time than the +// one already scheduled works properly. +func TestAlarmScheduleLater(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 50) + clk.Run(5) + a.Schedule(clk.Now() + 100) + clk.Run(50) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } +} + +// This test checks that scheduling an Alarm in the past makes it fire immediately. +func TestAlarmNegative(t *testing.T) { + clk := new(Simulated) + clk.Run(50) + a := NewAlarm(clk) + + a.Schedule(-1) + clk.Run(1) // needed to process timers + if !recv(a.C()) { + t.Fatal("Alarm did not fire for negative time") + } +} + +func recv(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/p2p/dial.go b/p2p/dial.go index 02878fae4d..134e6e2eae 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -117,9 +117,8 @@ type dialScheduler struct { staticPool []*dialTask // The dial history keeps recently dialed nodes. Members of history are not dialed. - history expHeap - historyTimer mclock.Timer - historyTimerTime mclock.AbsTime + history expHeap + historyTimer *mclock.Alarm // for logStats lastStatsLog mclock.AbsTime @@ -160,18 +159,20 @@ func (cfg dialConfig) withDefaults() dialConfig { } func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler { + cfg := config.withDefaults() d := &dialScheduler{ - dialConfig: config.withDefaults(), - setupFunc: setupFunc, - dialing: make(map[enode.ID]*dialTask), - static: make(map[enode.ID]*dialTask), - peers: make(map[enode.ID]struct{}), - doneCh: make(chan *dialTask), - nodesIn: make(chan *enode.Node), - addStaticCh: make(chan *enode.Node), - remStaticCh: make(chan *enode.Node), - addPeerCh: make(chan *conn), - remPeerCh: make(chan *conn), + dialConfig: cfg, + historyTimer: mclock.NewAlarm(cfg.clock), + setupFunc: setupFunc, + dialing: make(map[enode.ID]*dialTask), + static: make(map[enode.ID]*dialTask), + peers: make(map[enode.ID]struct{}), + doneCh: make(chan *dialTask), + nodesIn: make(chan *enode.Node), + addStaticCh: make(chan *enode.Node), + remStaticCh: make(chan *enode.Node), + addPeerCh: make(chan *conn), + remPeerCh: make(chan *conn), } d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) @@ -222,8 +223,7 @@ func (d *dialScheduler) peerRemoved(c *conn) { // loop is the main loop of the dialer. func (d *dialScheduler) loop(it enode.Iterator) { var ( - nodesCh chan *enode.Node - historyExp = make(chan struct{}, 1) + nodesCh chan *enode.Node ) loop: @@ -236,7 +236,7 @@ loop: } else { nodesCh = nil } - d.rearmHistoryTimer(historyExp) + d.rearmHistoryTimer() d.logStats() select { @@ -297,7 +297,7 @@ loop: } } - case <-historyExp: + case <-d.historyTimer.C(): d.expireHistory() case <-d.ctx.Done(): @@ -306,7 +306,7 @@ loop: } } - d.stopHistoryTimer(historyExp) + d.historyTimer.Stop() for range d.dialing { <-d.doneCh } @@ -343,28 +343,15 @@ func (d *dialScheduler) logStats() { // rearmHistoryTimer configures d.historyTimer to fire when the // next item in d.history expires. -func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) { - if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() { +func (d *dialScheduler) rearmHistoryTimer() { + if len(d.history) == 0 { return } - d.stopHistoryTimer(ch) - d.historyTimerTime = d.history.nextExpiry() - timeout := time.Duration(d.historyTimerTime - d.clock.Now()) - d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} }) -} - -// stopHistoryTimer stops the timer and drains the channel it sends on. -func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) { - if d.historyTimer != nil && !d.historyTimer.Stop() { - <-ch - } + d.historyTimer.Schedule(d.history.nextExpiry()) } // expireHistory removes expired items from d.history. func (d *dialScheduler) expireHistory() { - d.historyTimer.Stop() - d.historyTimer = nil - d.historyTimerTime = 0 d.history.expire(d.clock.Now(), func(hkey string) { var id enode.ID copy(id[:], hkey)