From 8a995708076b83581da677d8f7f80f8c992c0557 Mon Sep 17 00:00:00 2001 From: zeim839 Date: Tue, 14 May 2024 06:32:11 -0400 Subject: [PATCH] metrics: rateMean is updated only after sampling period elapses --- metrics/meter.go | 122 ++++++++++++++++------------------------------- 1 file changed, 41 insertions(+), 81 deletions(-) diff --git a/metrics/meter.go b/metrics/meter.go index 432838f4ef..043327c8f6 100644 --- a/metrics/meter.go +++ b/metrics/meter.go @@ -1,9 +1,7 @@ package metrics import ( - "math" "sync" - "sync/atomic" "time" ) @@ -20,13 +18,10 @@ type MeterSnapshot interface { type Meter interface { Mark(int64) Snapshot() MeterSnapshot - Stop() } // GetOrRegisterMeter returns an existing Meter or constructs and registers a // new StandardMeter. -// Be sure to unregister the meter from the registry once it is of no use to -// allow for garbage collection. func GetOrRegisterMeter(name string, r Registry) Meter { if nil == r { r = DefaultRegistry @@ -35,36 +30,15 @@ func GetOrRegisterMeter(name string, r Registry) Meter { } // NewMeter constructs a new StandardMeter and launches a goroutine. -// Be sure to call Stop() once the meter is of no use to allow for garbage collection. func NewMeter() Meter { if !Enabled { return NilMeter{} } - m := newStandardMeter() - arbiter.Lock() - defer arbiter.Unlock() - arbiter.meters[m] = struct{}{} - if !arbiter.started { - arbiter.started = true - go arbiter.tick() - } - return m -} - -// NewInactiveMeter returns a meter but does not start any goroutines. This -// method is mainly intended for testing. -func NewInactiveMeter() Meter { - if !Enabled { - return NilMeter{} - } - m := newStandardMeter() - return m + return newStandardMeter() } // NewRegisteredMeter constructs and registers a new StandardMeter // and launches a goroutine. -// Be sure to unregister the meter from the registry once it is of no use to -// allow for garbage collection. func NewRegisteredMeter(name string, r Registry) Meter { return GetOrRegisterMeter(name, r) } @@ -104,13 +78,12 @@ func (NilMeter) Stop() {} // StandardMeter is the standard implementation of a Meter. type StandardMeter struct { - count atomic.Int64 - uncounted atomic.Int64 // not yet added to the EWMAs - rateMean atomic.Uint64 - + count int64 + uncounted int64 a1, a5, a15 EWMA startTime time.Time - stopped atomic.Bool + lastMark time.Time + mutex sync.Mutex } func newStandardMeter() *StandardMeter { @@ -119,71 +92,58 @@ func newStandardMeter() *StandardMeter { a5: NewEWMA5(), a15: NewEWMA15(), startTime: time.Now(), - } -} - -// Stop stops the meter, Mark() will be a no-op if you use it after being stopped. -func (m *StandardMeter) Stop() { - if stopped := m.stopped.Swap(true); !stopped { - arbiter.Lock() - delete(arbiter.meters, m) - arbiter.Unlock() + lastMark: time.Now(), } } // Mark records the occurrence of n events. func (m *StandardMeter) Mark(n int64) { - m.uncounted.Add(n) + m.mutex.Lock() + defer m.mutex.Unlock() + + // Synchronize rateMean so that it's only updated after + // a sampling period elapses. + if elapsed := time.Since(m.lastMark); elapsed >= 5*time.Second { + m.lastMark = m.lastMark.Add(elapsed) + m.count += m.uncounted + m.uncounted = 0 + } + + m.uncounted += n + m.a1.Update(n) + m.a5.Update(n) + m.a15.Update(n) } // Snapshot returns a read-only copy of the meter. func (m *StandardMeter) Snapshot() MeterSnapshot { + m.mutex.Lock() + defer m.mutex.Unlock() + if elapsed := time.Since(m.lastMark); elapsed >= 5*time.Second { + m.lastMark = m.lastMark.Add(elapsed) + m.count += m.uncounted + m.uncounted = 0 + } + rateMean := float64(m.count) / (1 + time.Since(m.startTime).Seconds()) return &meterSnapshot{ - count: m.count.Load() + m.uncounted.Load(), + count: m.count + m.uncounted, rate1: m.a1.Snapshot().Rate(), rate5: m.a5.Snapshot().Rate(), rate15: m.a15.Snapshot().Rate(), - rateMean: math.Float64frombits(m.rateMean.Load()), + rateMean: rateMean, } } -func (m *StandardMeter) tick() { - // Take the uncounted values, add to count - n := m.uncounted.Swap(0) - count := m.count.Add(n) - m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds())) - // Update the EWMA's internal state - m.a1.Update(n) - m.a5.Update(n) - m.a15.Update(n) - // And trigger them to calculate the rates - m.a1.Tick() - m.a5.Tick() - m.a15.Tick() -} +// used to elapse time in unit tests. +func (m *StandardMeter) addToTimestamp(d time.Duration) { + m.startTime = m.startTime.Add(d) + m.lastMark = m.lastMark.Add(d) + a1, _ := m.a1.(*StandardEWMA) + a1.addToTimestamp(d) -// meterArbiter ticks meters every 5s from a single goroutine. -// meters are references in a set for future stopping. -type meterArbiter struct { - sync.RWMutex - started bool - meters map[*StandardMeter]struct{} - ticker *time.Ticker -} - -var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})} - -// tick meters on the scheduled interval -func (ma *meterArbiter) tick() { - for range ma.ticker.C { - ma.tickMeters() - } -} + a5, _ := m.a5.(*StandardEWMA) + a5.addToTimestamp(d) -func (ma *meterArbiter) tickMeters() { - ma.RLock() - defer ma.RUnlock() - for meter := range ma.meters { - meter.tick() - } + a15, _ := m.a15.(*StandardEWMA) + a15.addToTimestamp(d) }