metrics: make meter updates lock-free (#21446)

release/1.9
Marius van der Wijden 4 years ago committed by GitHub
parent 54add42550
commit f3bafecef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      metrics/ewma.go
  2. 65
      metrics/meter.go
  3. 5
      metrics/meter_test.go

@ -4,6 +4,7 @@ import (
"math" "math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
// EWMAs continuously calculate an exponentially-weighted moving average // EWMAs continuously calculate an exponentially-weighted moving average
@ -85,7 +86,7 @@ type StandardEWMA struct {
func (a *StandardEWMA) Rate() float64 { func (a *StandardEWMA) Rate() float64 {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
return a.rate * float64(1e9) return a.rate * float64(time.Second)
} }
// Snapshot returns a read-only copy of the EWMA. // Snapshot returns a read-only copy of the EWMA.
@ -98,7 +99,7 @@ func (a *StandardEWMA) Snapshot() EWMA {
func (a *StandardEWMA) Tick() { func (a *StandardEWMA) Tick() {
count := atomic.LoadInt64(&a.uncounted) count := atomic.LoadInt64(&a.uncounted)
atomic.AddInt64(&a.uncounted, -count) atomic.AddInt64(&a.uncounted, -count)
instantRate := float64(count) / float64(5e9) instantRate := float64(count) / float64(5*time.Second)
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
if a.init { if a.init {

@ -2,6 +2,7 @@ package metrics
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -101,6 +102,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter {
// MeterSnapshot is a read-only copy of another Meter. // MeterSnapshot is a read-only copy of another Meter.
type MeterSnapshot struct { type MeterSnapshot struct {
count int64 count int64
temp int64
rate1, rate5, rate15, rateMean float64 rate1, rate5, rate15, rateMean float64
} }
@ -149,7 +151,7 @@ func (NilMeter) Rate1() float64 { return 0.0 }
// Rate5 is a no-op. // Rate5 is a no-op.
func (NilMeter) Rate5() float64 { return 0.0 } func (NilMeter) Rate5() float64 { return 0.0 }
// Rate15is a no-op. // Rate15 is a no-op.
func (NilMeter) Rate15() float64 { return 0.0 } func (NilMeter) Rate15() float64 { return 0.0 }
// RateMean is a no-op. // RateMean is a no-op.
@ -167,7 +169,7 @@ type StandardMeter struct {
snapshot *MeterSnapshot snapshot *MeterSnapshot
a1, a5, a15 EWMA a1, a5, a15 EWMA
startTime time.Time startTime time.Time
stopped bool stopped uint32
} }
func newStandardMeter() *StandardMeter { func newStandardMeter() *StandardMeter {
@ -182,11 +184,8 @@ func newStandardMeter() *StandardMeter {
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() { func (m *StandardMeter) Stop() {
m.lock.Lock() stopped := atomic.SwapUint32(&m.stopped, 1)
stopped := m.stopped if stopped != 1 {
m.stopped = true
m.lock.Unlock()
if !stopped {
arbiter.Lock() arbiter.Lock()
delete(arbiter.meters, m) delete(arbiter.meters, m)
arbiter.Unlock() arbiter.Unlock()
@ -194,57 +193,45 @@ func (m *StandardMeter) Stop() {
} }
// Count returns the number of events recorded. // Count returns the number of events recorded.
// It updates the meter to be as accurate as possible
func (m *StandardMeter) Count() int64 { func (m *StandardMeter) Count() int64 {
m.lock.RLock() m.lock.Lock()
count := m.snapshot.count defer m.lock.Unlock()
m.lock.RUnlock() m.updateMeter()
return count return m.snapshot.count
} }
// Mark records the occurrence of n events. // Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) { func (m *StandardMeter) Mark(n int64) {
m.lock.Lock() atomic.AddInt64(&m.snapshot.temp, n)
defer m.lock.Unlock()
if m.stopped {
return
}
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
m.updateSnapshot()
} }
// Rate1 returns the one-minute moving average rate of events per second. // Rate1 returns the one-minute moving average rate of events per second.
func (m *StandardMeter) Rate1() float64 { func (m *StandardMeter) Rate1() float64 {
m.lock.RLock() m.lock.RLock()
rate1 := m.snapshot.rate1 defer m.lock.RUnlock()
m.lock.RUnlock() return m.snapshot.rate1
return rate1
} }
// Rate5 returns the five-minute moving average rate of events per second. // Rate5 returns the five-minute moving average rate of events per second.
func (m *StandardMeter) Rate5() float64 { func (m *StandardMeter) Rate5() float64 {
m.lock.RLock() m.lock.RLock()
rate5 := m.snapshot.rate5 defer m.lock.RUnlock()
m.lock.RUnlock() return m.snapshot.rate5
return rate5
} }
// Rate15 returns the fifteen-minute moving average rate of events per second. // Rate15 returns the fifteen-minute moving average rate of events per second.
func (m *StandardMeter) Rate15() float64 { func (m *StandardMeter) Rate15() float64 {
m.lock.RLock() m.lock.RLock()
rate15 := m.snapshot.rate15 defer m.lock.RUnlock()
m.lock.RUnlock() return m.snapshot.rate15
return rate15
} }
// RateMean returns the meter's mean rate of events per second. // RateMean returns the meter's mean rate of events per second.
func (m *StandardMeter) RateMean() float64 { func (m *StandardMeter) RateMean() float64 {
m.lock.RLock() m.lock.RLock()
rateMean := m.snapshot.rateMean defer m.lock.RUnlock()
m.lock.RUnlock() return m.snapshot.rateMean
return rateMean
} }
// Snapshot returns a read-only copy of the meter. // Snapshot returns a read-only copy of the meter.
@ -264,9 +251,19 @@ func (m *StandardMeter) updateSnapshot() {
snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds() snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
} }
func (m *StandardMeter) updateMeter() {
// should only run with write lock held on m.lock
n := atomic.LoadInt64(&m.snapshot.temp)
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
}
func (m *StandardMeter) tick() { func (m *StandardMeter) tick() {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
m.updateMeter()
m.a1.Tick() m.a1.Tick()
m.a5.Tick() m.a5.Tick()
m.a15.Tick() m.a15.Tick()
@ -282,7 +279,7 @@ type meterArbiter struct {
ticker *time.Ticker ticker *time.Ticker
} }
var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})} var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
// Ticks meters on the scheduled interval // Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() { func (ma *meterArbiter) tick() {

@ -17,7 +17,7 @@ func TestGetOrRegisterMeter(t *testing.T) {
r := NewRegistry() r := NewRegistry()
NewRegisteredMeter("foo", r).Mark(47) NewRegisteredMeter("foo", r).Mark(47)
if m := GetOrRegisterMeter("foo", r); m.Count() != 47 { if m := GetOrRegisterMeter("foo", r); m.Count() != 47 {
t.Fatal(m) t.Fatal(m.Count())
} }
} }
@ -29,10 +29,11 @@ func TestMeterDecay(t *testing.T) {
defer ma.ticker.Stop() defer ma.ticker.Stop()
m := newStandardMeter() m := newStandardMeter()
ma.meters[m] = struct{}{} ma.meters[m] = struct{}{}
go ma.tick()
m.Mark(1) m.Mark(1)
ma.tickMeters()
rateMean := m.RateMean() rateMean := m.RateMean()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
ma.tickMeters()
if m.RateMean() >= rateMean { if m.RateMean() >= rateMean {
t.Error("m.RateMean() didn't decrease") t.Error("m.RateMean() didn't decrease")
} }

Loading…
Cancel
Save