diff --git a/metrics/ewma.go b/metrics/ewma.go index 1d7a4f00cf..276fe9bc5b 100644 --- a/metrics/ewma.go +++ b/metrics/ewma.go @@ -3,7 +3,6 @@ package metrics import ( "math" "sync" - "sync/atomic" "time" ) @@ -11,32 +10,30 @@ type EWMASnapshot interface { Rate() float64 } -// EWMAs continuously calculate an exponentially-weighted moving average -// based on an outside source of clock ticks. +// EWMAs calculate an exponentially-weighted moving average. type EWMA interface { Snapshot() EWMASnapshot - Tick() Update(int64) } -// NewEWMA constructs a new EWMA with the given alpha. -func NewEWMA(alpha float64) EWMA { - return &StandardEWMA{alpha: alpha} +// NewEWMA constructs a new EWMA with the given alpha and sampling period. +func NewEWMA(alpha float64, period time.Duration) EWMA { + return &StandardEWMA{alpha: alpha, period: period, ts: time.Now()} } // NewEWMA1 constructs a new EWMA for a one-minute moving average. func NewEWMA1() EWMA { - return NewEWMA(1 - math.Exp(-5.0/60.0/1)) + return NewEWMA(1-math.Exp(-5.0/60.0/1), 5*time.Second) } // NewEWMA5 constructs a new EWMA for a five-minute moving average. func NewEWMA5() EWMA { - return NewEWMA(1 - math.Exp(-5.0/60.0/5)) + return NewEWMA(1-math.Exp(-5.0/60.0/5), 5*time.Second) } // NewEWMA15 constructs a new EWMA for a fifteen-minute moving average. func NewEWMA15() EWMA { - return NewEWMA(1 - math.Exp(-5.0/60.0/15)) + return NewEWMA(1-math.Exp(-5.0/60.0/15), 5*time.Second) } // ewmaSnapshot is a read-only copy of another EWMA. @@ -50,62 +47,65 @@ func (a ewmaSnapshot) Rate() float64 { return float64(a) } type NilEWMA struct{} func (NilEWMA) Snapshot() EWMASnapshot { return (*emptySnapshot)(nil) } -func (NilEWMA) Tick() {} func (NilEWMA) Update(n int64) {} -// StandardEWMA is the standard implementation of an EWMA and tracks the number -// of uncounted events and processes them on each tick. It uses the -// sync/atomic package to manage uncounted events. +// StandardEWMA is the standard implementation of an EWMA. type StandardEWMA struct { - uncounted atomic.Int64 + uncounted int64 alpha float64 - rate atomic.Uint64 - init atomic.Bool + period time.Duration + ewma float64 + ts time.Time + init bool mutex sync.Mutex } // Snapshot returns a read-only copy of the EWMA. func (a *StandardEWMA) Snapshot() EWMASnapshot { - r := math.Float64frombits(a.rate.Load()) * float64(time.Second) - return ewmaSnapshot(r) + return ewmaSnapshot(a.rate()) } -// Tick ticks the clock to update the moving average. It assumes it is called -// every five seconds. -func (a *StandardEWMA) Tick() { - // Optimization to avoid mutex locking in the hot-path. - if a.init.Load() { - a.updateRate(a.fetchInstantRate()) - return - } - // Slow-path: this is only needed on the first Tick() and preserves transactional updating - // of init and rate in the else block. The first conditional is needed below because - // a different thread could have set a.init = 1 between the time of the first atomic load and when - // the lock was acquired. +// rate returns the moving average rate of events per second. +func (a *StandardEWMA) rate() float64 { a.mutex.Lock() - if a.init.Load() { - // The fetchInstantRate() uses atomic loading, which is unnecessary in this critical section - // but again, this section is only invoked on the first successful Tick() operation. - a.updateRate(a.fetchInstantRate()) - } else { - a.init.Store(true) - a.rate.Store(math.Float64bits(a.fetchInstantRate())) + defer a.mutex.Unlock() + if time.Since(a.ts)/a.period < 1 { + return a.ewma * float64(time.Second) } - a.mutex.Unlock() + a.updateRate() + return a.ewma * float64(time.Second) } -func (a *StandardEWMA) fetchInstantRate() float64 { - count := a.uncounted.Swap(0) - return float64(count) / float64(5*time.Second) -} +func (a *StandardEWMA) updateRate() { + periods := time.Since(a.ts) / a.period + rate := float64(a.uncounted) / float64(a.period) + + a.ewma = a.alpha*(rate) + (1-a.alpha)*a.ewma + a.ts = a.ts.Add(a.period) + a.uncounted = 0 + periods -= 1 + + if !a.init { + a.ewma = rate + a.init = true + } -func (a *StandardEWMA) updateRate(instantRate float64) { - currentRate := math.Float64frombits(a.rate.Load()) - currentRate += a.alpha * (instantRate - currentRate) - a.rate.Store(math.Float64bits(currentRate)) + a.ewma = math.Pow(1-a.alpha, float64(periods)) * a.ewma + a.ts = a.ts.Add(periods * a.period) //nolint:durationcheck } // Update adds n uncounted events. func (a *StandardEWMA) Update(n int64) { - a.uncounted.Add(n) + a.mutex.Lock() + defer a.mutex.Unlock() + if time.Since(a.ts)/a.period < 1 { + a.uncounted += n + return + } + a.updateRate() +} + +// used to elapse time in unit tests. +func (a *StandardEWMA) addToTimestamp(d time.Duration) { + a.ts = a.ts.Add(d) } diff --git a/metrics/ewma_test.go b/metrics/ewma_test.go index 9a91b43db8..0a72e53f15 100644 --- a/metrics/ewma_test.go +++ b/metrics/ewma_test.go @@ -2,17 +2,17 @@ package metrics import ( "math" + "math/rand" + "sync" "testing" + "time" ) -const epsilon = 0.0000000000000001 - func BenchmarkEWMA(b *testing.B) { a := NewEWMA1() b.ResetTimer() for i := 0; i < b.N; i++ { a.Update(1) - a.Tick() } } @@ -23,67 +23,74 @@ func BenchmarkEWMAParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { a.Update(1) - a.Tick() } }) } -func TestEWMA1(t *testing.T) { +// exercise race detector. +func TestEWMAConcurrency(t *testing.T) { a := NewEWMA1() - a.Update(3) - a.Tick() - for i, want := range []float64{0.6, - 0.22072766470286553, 0.08120116994196772, 0.029872241020718428, - 0.01098938333324054, 0.004042768199451294, 0.0014872513059998212, - 0.0005471291793327122, 0.00020127757674150815, 7.404588245200814e-05, - 2.7239957857491083e-05, 1.0021020474147462e-05, 3.6865274119969525e-06, - 1.3561976441886433e-06, 4.989172314621449e-07, 1.8354139230109722e-07, - } { - if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon { - t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate) - } - elapseMinute(a) + wg := &sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func(ewma EWMA, wg *sync.WaitGroup) { + a.Update(rand.Int63()) + wg.Done() + }(a, wg) } + wg.Wait() } -func TestEWMA5(t *testing.T) { - a := NewEWMA5() - a.Update(3) - a.Tick() - for i, want := range []float64{ - 0.6, 0.49123845184678905, 0.4021920276213837, 0.32928698165641596, - 0.269597378470333, 0.2207276647028654, 0.18071652714732128, - 0.14795817836496392, 0.12113791079679326, 0.09917933293295193, - 0.08120116994196763, 0.06648189501740036, 0.05443077197364752, - 0.04456414692860035, 0.03648603757513079, 0.0298722410207183831020718428, - } { - if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon { - t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate) - } - elapseMinute(a) +func testEWMA(t *testing.T, alpha float64) { + r := rand.New(rand.NewSource(time.Now().Unix())) + a := NewEWMA(alpha, time.Second) + + // Base case. + + if rate := a.Snapshot().Rate(); rate != 0 { + t.Errorf("(A) Base Case a.rate(): 0 != %v\n", rate) + } + a.Update(10) + if rate := a.Snapshot().Rate(); rate != 0 { + t.Errorf("(B) Base Case a.rate(): 0 != %v\n", rate) } -} -func TestEWMA15(t *testing.T) { - a := NewEWMA15() - a.Update(3) - a.Tick() - for i, want := range []float64{ - 0.6, 0.5613041910189706, 0.5251039914257684, 0.4912384518467888184678905, - 0.459557003018789, 0.4299187863442732, 0.4021920276213831, - 0.37625345116383313, 0.3519877317060185, 0.3292869816564153165641596, - 0.3080502714195546, 0.2881831806538789, 0.26959737847033216, - 0.2522102307052083, 0.23594443252115815, 0.2207276647028646247028654470286553, - } { - if rate := a.Snapshot().Rate(); math.Abs(want-rate) > epsilon { - t.Errorf("%d minute a.Snapshot().Rate(): %f != %v\n", i, want, rate) + // Recursive case. + + for i := 0; i < 100; i++ { + rnd := r.Int63n(1000) + 1 + td, _ := NewEWMA(alpha, time.Second).(*StandardEWMA) + + td.Update(10) + td.addToTimestamp(-time.Duration(rnd) * time.Second) + expect := math.Pow(1-alpha, float64(rnd-1)) * 10.00 + if rate := td.rate(); math.Abs(rate-expect) > 0.001 { + t.Fatalf("(A) Recursive Case a.rate(): %v != %v\n", + expect, rate) + } + + expect = alpha*25 + (1-alpha)*expect + td.Update(25) + td.addToTimestamp(-1e9) + + if rate := td.Snapshot().Rate(); math.Abs(rate-expect) > 0.001 { + t.Fatalf("(B) Recursive Case a.rate(): %v != %v\n", + expect, rate) } - elapseMinute(a) } } -func elapseMinute(a EWMA) { - for i := 0; i < 12; i++ { - a.Tick() - } +func TestEWMA1(t *testing.T) { + // 1-minute moving average. + testEWMA(t, 1-math.Exp(-5.0/60.0/1)) +} + +func TestEWMA5(t *testing.T) { + // 5-minute moving average. + testEWMA(t, 1-math.Exp(-5.0/60.0/5)) +} + +func TestEWMA15(t *testing.T) { + // 15-minute moving average. + testEWMA(t, 1-math.Exp(-5.0/60.0/15)) }