diff --git a/metrics/ewma.go b/metrics/ewma.go index 57c949e7d..039286493 100644 --- a/metrics/ewma.go +++ b/metrics/ewma.go @@ -4,6 +4,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) // EWMAs continuously calculate an exponentially-weighted moving average @@ -85,7 +86,7 @@ type StandardEWMA struct { func (a *StandardEWMA) Rate() float64 { a.mutex.Lock() defer a.mutex.Unlock() - return a.rate * float64(1e9) + return a.rate * float64(time.Second) } // Snapshot returns a read-only copy of the EWMA. @@ -98,7 +99,7 @@ func (a *StandardEWMA) Snapshot() EWMA { func (a *StandardEWMA) Tick() { count := atomic.LoadInt64(&a.uncounted) atomic.AddInt64(&a.uncounted, -count) - instantRate := float64(count) / float64(5e9) + instantRate := float64(count) / float64(5*time.Second) a.mutex.Lock() defer a.mutex.Unlock() if a.init { diff --git a/metrics/meter.go b/metrics/meter.go index 58d170fae..7d2a2f530 100644 --- a/metrics/meter.go +++ b/metrics/meter.go @@ -2,6 +2,7 @@ package metrics import ( "sync" + "sync/atomic" "time" ) @@ -101,6 +102,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter { // MeterSnapshot is a read-only copy of another Meter. type MeterSnapshot struct { count int64 + temp int64 rate1, rate5, rate15, rateMean float64 } @@ -149,7 +151,7 @@ func (NilMeter) Rate1() float64 { return 0.0 } // Rate5 is a no-op. func (NilMeter) Rate5() float64 { return 0.0 } -// Rate15is a no-op. +// Rate15 is a no-op. func (NilMeter) Rate15() float64 { return 0.0 } // RateMean is a no-op. @@ -167,7 +169,7 @@ type StandardMeter struct { snapshot *MeterSnapshot a1, a5, a15 EWMA startTime time.Time - stopped bool + stopped uint32 } func newStandardMeter() *StandardMeter { @@ -182,11 +184,8 @@ func newStandardMeter() *StandardMeter { // Stop stops the meter, Mark() will be a no-op if you use it after being stopped. func (m *StandardMeter) Stop() { - m.lock.Lock() - stopped := m.stopped - m.stopped = true - m.lock.Unlock() - if !stopped { + stopped := atomic.SwapUint32(&m.stopped, 1) + if stopped != 1 { arbiter.Lock() delete(arbiter.meters, m) arbiter.Unlock() @@ -194,57 +193,45 @@ func (m *StandardMeter) Stop() { } // Count returns the number of events recorded. +// It updates the meter to be as accurate as possible func (m *StandardMeter) Count() int64 { - m.lock.RLock() - count := m.snapshot.count - m.lock.RUnlock() - return count + m.lock.Lock() + defer m.lock.Unlock() + m.updateMeter() + return m.snapshot.count } // Mark records the occurrence of n events. func (m *StandardMeter) Mark(n int64) { - m.lock.Lock() - defer m.lock.Unlock() - if m.stopped { - return - } - m.snapshot.count += n - m.a1.Update(n) - m.a5.Update(n) - m.a15.Update(n) - m.updateSnapshot() + atomic.AddInt64(&m.snapshot.temp, n) } // Rate1 returns the one-minute moving average rate of events per second. func (m *StandardMeter) Rate1() float64 { m.lock.RLock() - rate1 := m.snapshot.rate1 - m.lock.RUnlock() - return rate1 + defer m.lock.RUnlock() + return m.snapshot.rate1 } // Rate5 returns the five-minute moving average rate of events per second. func (m *StandardMeter) Rate5() float64 { m.lock.RLock() - rate5 := m.snapshot.rate5 - m.lock.RUnlock() - return rate5 + defer m.lock.RUnlock() + return m.snapshot.rate5 } // Rate15 returns the fifteen-minute moving average rate of events per second. func (m *StandardMeter) Rate15() float64 { m.lock.RLock() - rate15 := m.snapshot.rate15 - m.lock.RUnlock() - return rate15 + defer m.lock.RUnlock() + return m.snapshot.rate15 } // RateMean returns the meter's mean rate of events per second. func (m *StandardMeter) RateMean() float64 { m.lock.RLock() - rateMean := m.snapshot.rateMean - m.lock.RUnlock() - return rateMean + defer m.lock.RUnlock() + return m.snapshot.rateMean } // Snapshot returns a read-only copy of the meter. @@ -264,9 +251,19 @@ func (m *StandardMeter) updateSnapshot() { snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds() } +func (m *StandardMeter) updateMeter() { + // should only run with write lock held on m.lock + n := atomic.LoadInt64(&m.snapshot.temp) + m.snapshot.count += n + m.a1.Update(n) + m.a5.Update(n) + m.a15.Update(n) +} + func (m *StandardMeter) tick() { m.lock.Lock() defer m.lock.Unlock() + m.updateMeter() m.a1.Tick() m.a5.Tick() m.a15.Tick() @@ -282,7 +279,7 @@ type meterArbiter struct { ticker *time.Ticker } -var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})} +var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})} // Ticks meters on the scheduled interval func (ma *meterArbiter) tick() { diff --git a/metrics/meter_test.go b/metrics/meter_test.go index 28472253e..9c43b6156 100644 --- a/metrics/meter_test.go +++ b/metrics/meter_test.go @@ -17,7 +17,7 @@ func TestGetOrRegisterMeter(t *testing.T) { r := NewRegistry() NewRegisteredMeter("foo", r).Mark(47) if m := GetOrRegisterMeter("foo", r); m.Count() != 47 { - t.Fatal(m) + t.Fatal(m.Count()) } } @@ -29,10 +29,11 @@ func TestMeterDecay(t *testing.T) { defer ma.ticker.Stop() m := newStandardMeter() ma.meters[m] = struct{}{} - go ma.tick() m.Mark(1) + ma.tickMeters() rateMean := m.RateMean() time.Sleep(100 * time.Millisecond) + ma.tickMeters() if m.RateMean() >= rateMean { t.Error("m.RateMean() didn't decrease") }