metrics: rateMean is updated only after sampling period elapses

pull/29775/head
zeim839 4 months ago
parent 7be876ec5d
commit 8a99570807
  1. 122
      metrics/meter.go

@ -1,9 +1,7 @@
package metrics
import (
"math"
"sync"
"sync/atomic"
"time"
)
@ -20,13 +18,10 @@ type MeterSnapshot interface {
type Meter interface {
Mark(int64)
Snapshot() MeterSnapshot
Stop()
}
// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new StandardMeter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) Meter {
if nil == r {
r = DefaultRegistry
@ -35,36 +30,15 @@ func GetOrRegisterMeter(name string, r Registry) Meter {
}
// NewMeter constructs a new StandardMeter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
return m
}
// NewInactiveMeter returns a meter but does not start any goroutines. This
// method is mainly intended for testing.
func NewInactiveMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
return m
return newStandardMeter()
}
// NewRegisteredMeter constructs and registers a new StandardMeter
// and launches a goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
return GetOrRegisterMeter(name, r)
}
@ -104,13 +78,12 @@ func (NilMeter) Stop() {}
// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
count atomic.Int64
uncounted atomic.Int64 // not yet added to the EWMAs
rateMean atomic.Uint64
count int64
uncounted int64
a1, a5, a15 EWMA
startTime time.Time
stopped atomic.Bool
lastMark time.Time
mutex sync.Mutex
}
func newStandardMeter() *StandardMeter {
@ -119,71 +92,58 @@ func newStandardMeter() *StandardMeter {
a5: NewEWMA5(),
a15: NewEWMA15(),
startTime: time.Now(),
}
}
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
if stopped := m.stopped.Swap(true); !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
lastMark: time.Now(),
}
}
// Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) {
m.uncounted.Add(n)
m.mutex.Lock()
defer m.mutex.Unlock()
// Synchronize rateMean so that it's only updated after
// a sampling period elapses.
if elapsed := time.Since(m.lastMark); elapsed >= 5*time.Second {
m.lastMark = m.lastMark.Add(elapsed)
m.count += m.uncounted
m.uncounted = 0
}
m.uncounted += n
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
}
// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() MeterSnapshot {
m.mutex.Lock()
defer m.mutex.Unlock()
if elapsed := time.Since(m.lastMark); elapsed >= 5*time.Second {
m.lastMark = m.lastMark.Add(elapsed)
m.count += m.uncounted
m.uncounted = 0
}
rateMean := float64(m.count) / (1 + time.Since(m.startTime).Seconds())
return &meterSnapshot{
count: m.count.Load() + m.uncounted.Load(),
count: m.count + m.uncounted,
rate1: m.a1.Snapshot().Rate(),
rate5: m.a5.Snapshot().Rate(),
rate15: m.a15.Snapshot().Rate(),
rateMean: math.Float64frombits(m.rateMean.Load()),
rateMean: rateMean,
}
}
func (m *StandardMeter) tick() {
// Take the uncounted values, add to count
n := m.uncounted.Swap(0)
count := m.count.Add(n)
m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds()))
// Update the EWMA's internal state
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
// And trigger them to calculate the rates
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
}
// used to elapse time in unit tests.
func (m *StandardMeter) addToTimestamp(d time.Duration) {
m.startTime = m.startTime.Add(d)
m.lastMark = m.lastMark.Add(d)
a1, _ := m.a1.(*StandardEWMA)
a1.addToTimestamp(d)
// meterArbiter ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
started bool
meters map[*StandardMeter]struct{}
ticker *time.Ticker
}
var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
// tick meters on the scheduled interval
func (ma *meterArbiter) tick() {
for range ma.ticker.C {
ma.tickMeters()
}
}
a5, _ := m.a5.(*StandardEWMA)
a5.addToTimestamp(d)
func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for meter := range ma.meters {
meter.tick()
}
a15, _ := m.a15.(*StandardEWMA)
a15.addToTimestamp(d)
}

Loading…
Cancel
Save