metrics: update EWMA rates synchronously

pull/29775/head
zeim839 6 months ago
parent 5b3e3cd2be
commit 67fa9bdda2
  1. 96
      metrics/ewma.go
  2. 111
      metrics/ewma_test.go

@ -3,7 +3,6 @@ package metrics
import ( import (
"math" "math"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -11,32 +10,30 @@ type EWMASnapshot interface {
Rate() float64 Rate() float64
} }
// EWMAs continuously calculate an exponentially-weighted moving average // EWMAs calculate an exponentially-weighted moving average.
// based on an outside source of clock ticks.
type EWMA interface { type EWMA interface {
Snapshot() EWMASnapshot Snapshot() EWMASnapshot
Tick()
Update(int64) Update(int64)
} }
// NewEWMA constructs a new EWMA with the given alpha. // NewEWMA constructs a new EWMA with the given alpha and sampling period.
func NewEWMA(alpha float64) EWMA { func NewEWMA(alpha float64, period time.Duration) EWMA {
return &StandardEWMA{alpha: alpha} return &StandardEWMA{alpha: alpha, period: period, ts: time.Now()}
} }
// NewEWMA1 constructs a new EWMA for a one-minute moving average. // NewEWMA1 constructs a new EWMA for a one-minute moving average.
func NewEWMA1() EWMA { func NewEWMA1() EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/1)) return NewEWMA(1-math.Exp(-5.0/60.0/1), 5*time.Second)
} }
// NewEWMA5 constructs a new EWMA for a five-minute moving average. // NewEWMA5 constructs a new EWMA for a five-minute moving average.
func NewEWMA5() EWMA { func NewEWMA5() EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/5)) return NewEWMA(1-math.Exp(-5.0/60.0/5), 5*time.Second)
} }
// NewEWMA15 constructs a new EWMA for a fifteen-minute moving average. // NewEWMA15 constructs a new EWMA for a fifteen-minute moving average.
func NewEWMA15() EWMA { func NewEWMA15() EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/15)) return NewEWMA(1-math.Exp(-5.0/60.0/15), 5*time.Second)
} }
// ewmaSnapshot is a read-only copy of another EWMA. // ewmaSnapshot is a read-only copy of another EWMA.
@ -50,62 +47,65 @@ func (a ewmaSnapshot) Rate() float64 { return float64(a) }
type NilEWMA struct{} type NilEWMA struct{}
func (NilEWMA) Snapshot() EWMASnapshot { return (*emptySnapshot)(nil) } func (NilEWMA) Snapshot() EWMASnapshot { return (*emptySnapshot)(nil) }
func (NilEWMA) Tick() {}
func (NilEWMA) Update(n int64) {} func (NilEWMA) Update(n int64) {}
// StandardEWMA is the standard implementation of an EWMA and tracks the number // StandardEWMA is the standard implementation of an EWMA.
// of uncounted events and processes them on each tick. It uses the
// sync/atomic package to manage uncounted events.
type StandardEWMA struct { type StandardEWMA struct {
uncounted atomic.Int64 uncounted int64
alpha float64 alpha float64
rate atomic.Uint64 period time.Duration
init atomic.Bool ewma float64
ts time.Time
init bool
mutex sync.Mutex mutex sync.Mutex
} }
// Snapshot returns a read-only copy of the EWMA. // Snapshot returns a read-only copy of the EWMA.
func (a *StandardEWMA) Snapshot() EWMASnapshot { func (a *StandardEWMA) Snapshot() EWMASnapshot {
r := math.Float64frombits(a.rate.Load()) * float64(time.Second) return ewmaSnapshot(a.rate())
return ewmaSnapshot(r)
} }
// Tick ticks the clock to update the moving average. It assumes it is called // rate returns the moving average rate of events per second.
// every five seconds. func (a *StandardEWMA) rate() float64 {
func (a *StandardEWMA) Tick() {
// Optimization to avoid mutex locking in the hot-path.
if a.init.Load() {
a.updateRate(a.fetchInstantRate())
return
}
// Slow-path: this is only needed on the first Tick() and preserves transactional updating
// of init and rate in the else block. The first conditional is needed below because
// a different thread could have set a.init = 1 between the time of the first atomic load and when
// the lock was acquired.
a.mutex.Lock() a.mutex.Lock()
if a.init.Load() { defer a.mutex.Unlock()
// The fetchInstantRate() uses atomic loading, which is unnecessary in this critical section if time.Since(a.ts)/a.period < 1 {
// but again, this section is only invoked on the first successful Tick() operation. return a.ewma * float64(time.Second)
a.updateRate(a.fetchInstantRate())
} else {
a.init.Store(true)
a.rate.Store(math.Float64bits(a.fetchInstantRate()))
} }
a.mutex.Unlock() a.updateRate()
return a.ewma * float64(time.Second)
} }
func (a *StandardEWMA) fetchInstantRate() float64 { func (a *StandardEWMA) updateRate() {
count := a.uncounted.Swap(0) periods := time.Since(a.ts) / a.period
return float64(count) / float64(5*time.Second) rate := float64(a.uncounted) / float64(a.period)
}
a.ewma = a.alpha*(rate) + (1-a.alpha)*a.ewma
a.ts = a.ts.Add(a.period)
a.uncounted = 0
periods -= 1
if !a.init {
a.ewma = rate
a.init = true
}
func (a *StandardEWMA) updateRate(instantRate float64) { a.ewma = math.Pow(1-a.alpha, float64(periods)) * a.ewma
currentRate := math.Float64frombits(a.rate.Load()) a.ts = a.ts.Add(periods * a.period) //nolint:durationcheck
currentRate += a.alpha * (instantRate - currentRate)
a.rate.Store(math.Float64bits(currentRate))
} }
// Update adds n uncounted events. // Update adds n uncounted events.
func (a *StandardEWMA) Update(n int64) { func (a *StandardEWMA) Update(n int64) {
a.uncounted.Add(n) a.mutex.Lock()
defer a.mutex.Unlock()
if time.Since(a.ts)/a.period < 1 {
a.uncounted += n
return
}
a.updateRate()
}
// used to elapse time in unit tests.
func (a *StandardEWMA) addToTimestamp(d time.Duration) {
a.ts = a.ts.Add(d)
} }

@ -2,17 +2,17 @@ package metrics
import ( import (
"math" "math"
"math/rand"
"sync"
"testing" "testing"
"time"
) )
const epsilon = 0.0000000000000001
func BenchmarkEWMA(b *testing.B) { func BenchmarkEWMA(b *testing.B) {
a := NewEWMA1() a := NewEWMA1()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
a.Update(1) a.Update(1)
a.Tick()
} }
} }
@ -23,67 +23,74 @@ func BenchmarkEWMAParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
a.Update(1) a.Update(1)
a.Tick()
} }
}) })
} }
func TestEWMA1(t *testing.T) { // exercise race detector.
func TestEWMAConcurrency(t *testing.T) {
a := NewEWMA1() a := NewEWMA1()
a.Update(3) wg := &sync.WaitGroup{}
a.Tick() for i := 0; i < 100; i++ {
for i, want := range []float64{0.6, wg.Add(1)
0.22072766470286553, 0.08120116994196772, 0.029872241020718428, go func(ewma EWMA, wg *sync.WaitGroup) {
0.01098938333324054, 0.004042768199451294, 0.0014872513059998212, a.Update(rand.Int63())
0.0005471291793327122, 0.00020127757674150815, 7.404588245200814e-05, wg.Done()
2.7239957857491083e-05, 1.0021020474147462e-05, 3.6865274119969525e-06, }(a, wg)
1.3561976441886433e-06, 4.989172314621449e-07, 1.8354139230109722e-07,
} {
if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon {
t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate)
}
elapseMinute(a)
} }
wg.Wait()
} }
func TestEWMA5(t *testing.T) { func testEWMA(t *testing.T, alpha float64) {
a := NewEWMA5() r := rand.New(rand.NewSource(time.Now().Unix()))
a.Update(3) a := NewEWMA(alpha, time.Second)
a.Tick()
for i, want := range []float64{ // Base case.
0.6, 0.49123845184678905, 0.4021920276213837, 0.32928698165641596,
0.269597378470333, 0.2207276647028654, 0.18071652714732128, if rate := a.Snapshot().Rate(); rate != 0 {
0.14795817836496392, 0.12113791079679326, 0.09917933293295193, t.Errorf("(A) Base Case a.rate(): 0 != %v\n", rate)
0.08120116994196763, 0.06648189501740036, 0.05443077197364752, }
0.04456414692860035, 0.03648603757513079, 0.0298722410207183831020718428, a.Update(10)
} { if rate := a.Snapshot().Rate(); rate != 0 {
if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon { t.Errorf("(B) Base Case a.rate(): 0 != %v\n", rate)
t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate)
}
elapseMinute(a)
} }
}
func TestEWMA15(t *testing.T) { // Recursive case.
a := NewEWMA15()
a.Update(3) for i := 0; i < 100; i++ {
a.Tick() rnd := r.Int63n(1000) + 1
for i, want := range []float64{ td, _ := NewEWMA(alpha, time.Second).(*StandardEWMA)
0.6, 0.5613041910189706, 0.5251039914257684, 0.4912384518467888184678905,
0.459557003018789, 0.4299187863442732, 0.4021920276213831, td.Update(10)
0.37625345116383313, 0.3519877317060185, 0.3292869816564153165641596, td.addToTimestamp(-time.Duration(rnd) * time.Second)
0.3080502714195546, 0.2881831806538789, 0.26959737847033216, expect := math.Pow(1-alpha, float64(rnd-1)) * 10.00
0.2522102307052083, 0.23594443252115815, 0.2207276647028646247028654470286553, if rate := td.rate(); math.Abs(rate-expect) > 0.001 {
} { t.Fatalf("(A) Recursive Case a.rate(): %v != %v\n",
if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon { expect, rate)
t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate) }
expect = alpha*25 + (1-alpha)*expect
td.Update(25)
td.addToTimestamp(-1e9)
if rate := td.Snapshot().Rate(); math.Abs(rate-expect) > 0.001 {
t.Fatalf("(B) Recursive Case a.rate(): %v != %v\n",
expect, rate)
} }
elapseMinute(a)
} }
} }
func elapseMinute(a EWMA) { func TestEWMA1(t *testing.T) {
for i := 0; i < 12; i++ { // 1-minute moving average.
a.Tick() testEWMA(t, 1-math.Exp(-5.0/60.0/1))
} }
func TestEWMA5(t *testing.T) {
// 5-minute moving average.
testEWMA(t, 1-math.Exp(-5.0/60.0/5))
}
func TestEWMA15(t *testing.T) {
// 15-minute moving average.
testEWMA(t, 1-math.Exp(-5.0/60.0/15))
} }

Loading…
Cancel
Save