From 7dc100714d5477296fbeef1f4f58b788d6bab738 Mon Sep 17 00:00:00 2001 From: turboboost55 <7891737+turboboost55@users.noreply.github.com> Date: Thu, 23 Mar 2023 06:13:50 -0700 Subject: [PATCH] metrics: add cpu counters (#26796) This PR adds counter metrics for the CPU system and the Geth process. Currently the only metrics available for these items are gauges. Gauges are fine when the consumer scrapes metrics data at the same interval as Geth produces new values (every 3 seconds), but it is likely that most consumers will not scrape that often. Intervals of 10, 15, or maybe even 30 seconds are probably more common. So the problem is, how does the consumer estimate what the CPU was doing in between scrapes. With a counter, it's easy ... you just subtract two successive values and divide by the time to get a nice, accurate average. But with a gauge, you can't do that. A gauge reading is an instantaneous picture of what was happening at that moment, but it gives you no idea about what was going on between scrapes. Taking an average of values is meaningless. --- metrics/counter_float64.go | 153 +++++++++++++++++++++++++++ metrics/counter_float_64_test.go | 77 ++++++++++++++ metrics/exp/exp.go | 7 ++ metrics/graphite.go | 2 + metrics/influxdb/influxdb.go | 10 ++ metrics/influxdb/influxdbv2.go | 17 +-- metrics/librato/librato.go | 11 ++ metrics/log.go | 3 + metrics/metrics.go | 19 ++-- metrics/metrics_test.go | 2 + metrics/opentsdb.go | 2 + metrics/prometheus/collector.go | 4 + metrics/prometheus/collector_test.go | 7 ++ metrics/prometheus/prometheus.go | 2 + metrics/registry.go | 4 +- metrics/syslog.go | 2 + metrics/writer.go | 3 + 17 files changed, 312 insertions(+), 13 deletions(-) create mode 100644 metrics/counter_float64.go create mode 100644 metrics/counter_float_64_test.go diff --git a/metrics/counter_float64.go b/metrics/counter_float64.go new file mode 100644 index 0000000000..f05f62fed6 --- /dev/null +++ b/metrics/counter_float64.go @@ -0,0 +1,153 @@ +package metrics + +import ( + "sync" +) + +// CounterFloat64 holds a float64 value that can be incremented and decremented. +type CounterFloat64 interface { + Clear() + Count() float64 + Dec(float64) + Inc(float64) + Snapshot() CounterFloat64 +} + +// GetOrRegisterCounterFloat64 returns an existing CounterFloat64 or constructs and registers +// a new StandardCounterFloat64. +func GetOrRegisterCounterFloat64(name string, r Registry) CounterFloat64 { + if nil == r { + r = DefaultRegistry + } + return r.GetOrRegister(name, NewCounterFloat64).(CounterFloat64) +} + +// GetOrRegisterCounterFloat64Forced returns an existing CounterFloat64 or constructs and registers a +// new CounterFloat64 no matter the global switch is enabled or not. +// Be sure to unregister the counter from the registry once it is of no use to +// allow for garbage collection. +func GetOrRegisterCounterFloat64Forced(name string, r Registry) CounterFloat64 { + if nil == r { + r = DefaultRegistry + } + return r.GetOrRegister(name, NewCounterFloat64Forced).(CounterFloat64) +} + +// NewCounterFloat64 constructs a new StandardCounterFloat64. +func NewCounterFloat64() CounterFloat64 { + if !Enabled { + return NilCounterFloat64{} + } + return &StandardCounterFloat64{count: 0.0} +} + +// NewCounterFloat64Forced constructs a new StandardCounterFloat64 and returns it no matter if +// the global switch is enabled or not. +func NewCounterFloat64Forced() CounterFloat64 { + return &StandardCounterFloat64{count: 0.0} +} + +// NewRegisteredCounterFloat64 constructs and registers a new StandardCounterFloat64. +func NewRegisteredCounterFloat64(name string, r Registry) CounterFloat64 { + c := NewCounterFloat64() + if nil == r { + r = DefaultRegistry + } + r.Register(name, c) + return c +} + +// NewRegisteredCounterFloat64Forced constructs and registers a new StandardCounterFloat64 +// and launches a goroutine no matter the global switch is enabled or not. +// Be sure to unregister the counter from the registry once it is of no use to +// allow for garbage collection. +func NewRegisteredCounterFloat64Forced(name string, r Registry) CounterFloat64 { + c := NewCounterFloat64Forced() + if nil == r { + r = DefaultRegistry + } + r.Register(name, c) + return c +} + +// CounterFloat64Snapshot is a read-only copy of another CounterFloat64. +type CounterFloat64Snapshot float64 + +// Clear panics. +func (CounterFloat64Snapshot) Clear() { + panic("Clear called on a CounterFloat64Snapshot") +} + +// Count returns the value at the time the snapshot was taken. +func (c CounterFloat64Snapshot) Count() float64 { return float64(c) } + +// Dec panics. +func (CounterFloat64Snapshot) Dec(float64) { + panic("Dec called on a CounterFloat64Snapshot") +} + +// Inc panics. +func (CounterFloat64Snapshot) Inc(float64) { + panic("Inc called on a CounterFloat64Snapshot") +} + +// Snapshot returns the snapshot. +func (c CounterFloat64Snapshot) Snapshot() CounterFloat64 { return c } + +// NilCounterFloat64 is a no-op CounterFloat64. +type NilCounterFloat64 struct{} + +// Clear is a no-op. +func (NilCounterFloat64) Clear() {} + +// Count is a no-op. +func (NilCounterFloat64) Count() float64 { return 0.0 } + +// Dec is a no-op. +func (NilCounterFloat64) Dec(i float64) {} + +// Inc is a no-op. +func (NilCounterFloat64) Inc(i float64) {} + +// Snapshot is a no-op. +func (NilCounterFloat64) Snapshot() CounterFloat64 { return NilCounterFloat64{} } + +// StandardCounterFloat64 is the standard implementation of a CounterFloat64 and uses the +// sync.Mutex package to manage a single float64 value. +type StandardCounterFloat64 struct { + mutex sync.Mutex + count float64 +} + +// Clear sets the counter to zero. +func (c *StandardCounterFloat64) Clear() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count = 0.0 +} + +// Count returns the current value. +func (c *StandardCounterFloat64) Count() float64 { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.count +} + +// Dec decrements the counter by the given amount. +func (c *StandardCounterFloat64) Dec(v float64) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count -= v +} + +// Inc increments the counter by the given amount. +func (c *StandardCounterFloat64) Inc(v float64) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count += v +} + +// Snapshot returns a read-only copy of the counter. +func (c *StandardCounterFloat64) Snapshot() CounterFloat64 { + return CounterFloat64Snapshot(c.Count()) +} diff --git a/metrics/counter_float_64_test.go b/metrics/counter_float_64_test.go new file mode 100644 index 0000000000..44d9b4c20c --- /dev/null +++ b/metrics/counter_float_64_test.go @@ -0,0 +1,77 @@ +package metrics + +import "testing" + +func BenchmarkCounterFloat64(b *testing.B) { + c := NewCounterFloat64() + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.Inc(1.0) + } +} + +func TestCounterFloat64Clear(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + c.Clear() + if count := c.Count(); count != 0 { + t.Errorf("c.Count(): 0 != %v\n", count) + } +} + +func TestCounterFloat64Dec1(t *testing.T) { + c := NewCounterFloat64() + c.Dec(1.0) + if count := c.Count(); count != -1.0 { + t.Errorf("c.Count(): -1.0 != %v\n", count) + } +} + +func TestCounterFloat64Dec2(t *testing.T) { + c := NewCounterFloat64() + c.Dec(2.0) + if count := c.Count(); count != -2.0 { + t.Errorf("c.Count(): -2.0 != %v\n", count) + } +} + +func TestCounterFloat64Inc1(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + if count := c.Count(); count != 1.0 { + t.Errorf("c.Count(): 1.0 != %v\n", count) + } +} + +func TestCounterFloat64Inc2(t *testing.T) { + c := NewCounterFloat64() + c.Inc(2.0) + if count := c.Count(); count != 2.0 { + t.Errorf("c.Count(): 2.0 != %v\n", count) + } +} + +func TestCounterFloat64Snapshot(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + snapshot := c.Snapshot() + c.Inc(1.0) + if count := snapshot.Count(); count != 1.0 { + t.Errorf("c.Count(): 1.0 != %v\n", count) + } +} + +func TestCounterFloat64Zero(t *testing.T) { + c := NewCounterFloat64() + if count := c.Count(); count != 0 { + t.Errorf("c.Count(): 0 != %v\n", count) + } +} + +func TestGetOrRegisterCounterFloat64(t *testing.T) { + r := NewRegistry() + NewRegisteredCounterFloat64("foo", r).Inc(47.0) + if c := GetOrRegisterCounterFloat64("foo", r); c.Count() != 47.0 { + t.Fatal(c) + } +} diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index 3ebe8cc68a..2b04eeab27 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -100,6 +100,11 @@ func (exp *exp) publishCounter(name string, metric metrics.Counter) { v.Set(metric.Count()) } +func (exp *exp) publishCounterFloat64(name string, metric metrics.CounterFloat64) { + v := exp.getFloat(name) + v.Set(metric.Count()) +} + func (exp *exp) publishGauge(name string, metric metrics.Gauge) { v := exp.getInt(name) v.Set(metric.Value()) @@ -167,6 +172,8 @@ func (exp *exp) syncToExpvar() { switch i := i.(type) { case metrics.Counter: exp.publishCounter(name, i) + case metrics.CounterFloat64: + exp.publishCounterFloat64(name, i) case metrics.Gauge: exp.publishGauge(name, i) case metrics.GaugeFloat64: diff --git a/metrics/graphite.go b/metrics/graphite.go index 142eec86be..29f72b0c41 100644 --- a/metrics/graphite.go +++ b/metrics/graphite.go @@ -67,6 +67,8 @@ func graphite(c *GraphiteConfig) error { switch metric := i.(type) { case Counter: fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, metric.Count(), now) + case CounterFloat64: + fmt.Fprintf(w, "%s.%s.count %f %d\n", c.Prefix, name, metric.Count(), now) case Gauge: fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Value(), now) case GaugeFloat64: diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 748c692e13..ea6c3d9dff 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -141,6 +141,16 @@ func (r *reporter) send() error { }, Time: now, }) + case metrics.CounterFloat64: + count := metric.Count() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.count", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "value": count, + }, + Time: now, + }) case metrics.Gauge: ms := metric.Snapshot() pts = append(pts, client.Point{ diff --git a/metrics/influxdb/influxdbv2.go b/metrics/influxdb/influxdbv2.go index bfb762196c..dc8a42cd5a 100644 --- a/metrics/influxdb/influxdbv2.go +++ b/metrics/influxdb/influxdbv2.go @@ -24,8 +24,6 @@ type v2Reporter struct { client influxdb2.Client write api.WriteAPI - - cache map[string]int64 } // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags @@ -39,7 +37,6 @@ func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, to organization: organization, namespace: namespace, tags: tags, - cache: make(map[string]int64), } rep.client = influxdb2.NewClient(rep.endpoint, rep.token) @@ -86,17 +83,25 @@ func (r *v2Reporter) send() { switch metric := i.(type) { case metrics.Counter: v := metric.Count() - l := r.cache[name] measurement := fmt.Sprintf("%s%s.count", namespace, name) fields := map[string]interface{}{ - "value": v - l, + "value": v, } pt := influxdb2.NewPoint(measurement, r.tags, fields, now) r.write.WritePoint(pt) - r.cache[name] = v + case metrics.CounterFloat64: + v := metric.Count() + + measurement := fmt.Sprintf("%s%s.count", namespace, name) + fields := map[string]interface{}{ + "value": v, + } + + pt := influxdb2.NewPoint(measurement, r.tags, fields, now) + r.write.WritePoint(pt) case metrics.Gauge: ms := metric.Snapshot() diff --git a/metrics/librato/librato.go b/metrics/librato/librato.go index b16493413e..3d45f4c7be 100644 --- a/metrics/librato/librato.go +++ b/metrics/librato/librato.go @@ -107,6 +107,17 @@ func (rep *Reporter) BuildRequest(now time.Time, r metrics.Registry) (snapshot B } snapshot.Counters = append(snapshot.Counters, measurement) } + case metrics.CounterFloat64: + if m.Count() > 0 { + measurement[Name] = fmt.Sprintf("%s.%s", name, "count") + measurement[Value] = m.Count() + measurement[Attributes] = map[string]interface{}{ + DisplayUnitsLong: Operations, + DisplayUnitsShort: OperationsShort, + DisplayMin: "0", + } + snapshot.Counters = append(snapshot.Counters, measurement) + } case metrics.Gauge: measurement[Name] = name measurement[Value] = float64(m.Value()) diff --git a/metrics/log.go b/metrics/log.go index 0c8ea7c971..d1ce627a83 100644 --- a/metrics/log.go +++ b/metrics/log.go @@ -24,6 +24,9 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) { case Counter: l.Printf("counter %s\n", name) l.Printf(" count: %9d\n", metric.Count()) + case CounterFloat64: + l.Printf("counter %s\n", name) + l.Printf(" count: %f\n", metric.Count()) case Gauge: l.Printf("gauge %s\n", name) l.Printf(" value: %9d\n", metric.Value()) diff --git a/metrics/metrics.go b/metrics/metrics.go index ff7196b564..c206f16924 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -144,6 +144,9 @@ func CollectProcessMetrics(refresh time.Duration) { cpuSysLoad = GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry) cpuSysWait = GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry) cpuProcLoad = GetOrRegisterGauge("system/cpu/procload", DefaultRegistry) + cpuSysLoadTotal = GetOrRegisterCounterFloat64("system/cpu/sysload/total", DefaultRegistry) + cpuSysWaitTotal = GetOrRegisterCounterFloat64("system/cpu/syswait/total", DefaultRegistry) + cpuProcLoadTotal = GetOrRegisterCounterFloat64("system/cpu/procload/total", DefaultRegistry) cpuThreads = GetOrRegisterGauge("system/cpu/threads", DefaultRegistry) cpuGoroutines = GetOrRegisterGauge("system/cpu/goroutines", DefaultRegistry) cpuSchedLatency = getOrRegisterRuntimeHistogram("system/cpu/schedlatency", secondsToNs, nil) @@ -172,13 +175,17 @@ func CollectProcessMetrics(refresh time.Duration) { secondsSinceLastCollect := collectTime.Sub(lastCollectTime).Seconds() lastCollectTime = collectTime if secondsSinceLastCollect > 0 { - sysLoad := (cpustats[now].GlobalTime - cpustats[prev].GlobalTime) / secondsSinceLastCollect - sysWait := (cpustats[now].GlobalWait - cpustats[prev].GlobalWait) / secondsSinceLastCollect - procLoad := (cpustats[now].LocalTime - cpustats[prev].LocalTime) / secondsSinceLastCollect + sysLoad := cpustats[now].GlobalTime - cpustats[prev].GlobalTime + sysWait := cpustats[now].GlobalWait - cpustats[prev].GlobalWait + procLoad := cpustats[now].LocalTime - cpustats[prev].LocalTime // Convert to integer percentage. - cpuSysLoad.Update(int64(sysLoad * 100)) - cpuSysWait.Update(int64(sysWait * 100)) - cpuProcLoad.Update(int64(procLoad * 100)) + cpuSysLoad.Update(int64(sysLoad / secondsSinceLastCollect * 100)) + cpuSysWait.Update(int64(sysWait / secondsSinceLastCollect * 100)) + cpuProcLoad.Update(int64(procLoad / secondsSinceLastCollect * 100)) + // increment counters (ms) + cpuSysLoadTotal.Inc(sysLoad) + cpuSysWaitTotal.Inc(sysWait) + cpuProcLoadTotal.Inc(procLoad) } // Threads diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index e3fde1ea62..534c44139b 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -18,6 +18,7 @@ func TestReadRuntimeValues(t *testing.T) { func BenchmarkMetrics(b *testing.B) { r := NewRegistry() c := NewRegisteredCounter("counter", r) + cf := NewRegisteredCounterFloat64("counterfloat64", r) g := NewRegisteredGauge("gauge", r) gf := NewRegisteredGaugeFloat64("gaugefloat64", r) h := NewRegisteredHistogram("histogram", r, NewUniformSample(100)) @@ -71,6 +72,7 @@ func BenchmarkMetrics(b *testing.B) { //log.Println("go", i) for i := 0; i < b.N; i++ { c.Inc(1) + cf.Inc(1.0) g.Update(int64(i)) gf.Update(float64(i)) h.Update(int64(i)) diff --git a/metrics/opentsdb.go b/metrics/opentsdb.go index 3fde55454b..c9fd2e75d5 100644 --- a/metrics/opentsdb.go +++ b/metrics/opentsdb.go @@ -71,6 +71,8 @@ func openTSDB(c *OpenTSDBConfig) error { switch metric := i.(type) { case Counter: fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) + case CounterFloat64: + fmt.Fprintf(w, "put %s.%s.count %d %f host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) case Gauge: fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) case GaugeFloat64: diff --git a/metrics/prometheus/collector.go b/metrics/prometheus/collector.go index e8d5e4f5d1..2bd9bf22cc 100644 --- a/metrics/prometheus/collector.go +++ b/metrics/prometheus/collector.go @@ -50,6 +50,10 @@ func (c *collector) addCounter(name string, m metrics.Counter) { c.writeGaugeCounter(name, m.Count()) } +func (c *collector) addCounterFloat64(name string, m metrics.CounterFloat64) { + c.writeGaugeCounter(name, m.Count()) +} + func (c *collector) addGauge(name string, m metrics.Gauge) { c.writeGaugeCounter(name, m.Value()) } diff --git a/metrics/prometheus/collector_test.go b/metrics/prometheus/collector_test.go index 43f2f804d3..ff87c8e765 100644 --- a/metrics/prometheus/collector_test.go +++ b/metrics/prometheus/collector_test.go @@ -20,6 +20,10 @@ func TestCollector(t *testing.T) { counter.Inc(12345) c.addCounter("test/counter", counter) + counterfloat64 := metrics.NewCounterFloat64() + counterfloat64.Inc(54321.98) + c.addCounterFloat64("test/counter_float64", counterfloat64) + gauge := metrics.NewGauge() gauge.Update(23456) c.addGauge("test/gauge", gauge) @@ -61,6 +65,9 @@ func TestCollector(t *testing.T) { const expectedOutput = `# TYPE test_counter gauge test_counter 12345 +# TYPE test_counter_float64 gauge +test_counter_float64 54321.98 + # TYPE test_gauge gauge test_gauge 23456 diff --git a/metrics/prometheus/prometheus.go b/metrics/prometheus/prometheus.go index c8408d8cab..d966fa9a86 100644 --- a/metrics/prometheus/prometheus.go +++ b/metrics/prometheus/prometheus.go @@ -45,6 +45,8 @@ func Handler(reg metrics.Registry) http.Handler { switch m := i.(type) { case metrics.Counter: c.addCounter(name, m.Snapshot()) + case metrics.CounterFloat64: + c.addCounterFloat64(name, m.Snapshot()) case metrics.Gauge: c.addGauge(name, m.Snapshot()) case metrics.GaugeFloat64: diff --git a/metrics/registry.go b/metrics/registry.go index c5435adf24..4c62248351 100644 --- a/metrics/registry.go +++ b/metrics/registry.go @@ -120,6 +120,8 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} { switch metric := i.(type) { case Counter: values["count"] = metric.Count() + case CounterFloat64: + values["count"] = metric.Count() case Gauge: values["value"] = metric.Value() case GaugeFloat64: @@ -196,7 +198,7 @@ func (r *StandardRegistry) register(name string, i interface{}) error { return DuplicateMetric(name) } switch i.(type) { - case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: + case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: r.metrics[name] = i } return nil diff --git a/metrics/syslog.go b/metrics/syslog.go index 551a2bd0f0..f23b07e199 100644 --- a/metrics/syslog.go +++ b/metrics/syslog.go @@ -17,6 +17,8 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) { switch metric := i.(type) { case Counter: w.Info(fmt.Sprintf("counter %s: count: %d", name, metric.Count())) + case CounterFloat64: + w.Info(fmt.Sprintf("counter %s: count: %f", name, metric.Count())) case Gauge: w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Value())) case GaugeFloat64: diff --git a/metrics/writer.go b/metrics/writer.go index 88521a80d9..256fbd14c9 100644 --- a/metrics/writer.go +++ b/metrics/writer.go @@ -29,6 +29,9 @@ func WriteOnce(r Registry, w io.Writer) { case Counter: fmt.Fprintf(w, "counter %s\n", namedMetric.name) fmt.Fprintf(w, " count: %9d\n", metric.Count()) + case CounterFloat64: + fmt.Fprintf(w, "counter %s\n", namedMetric.name) + fmt.Fprintf(w, " count: %f\n", metric.Count()) case Gauge: fmt.Fprintf(w, "gauge %s\n", namedMetric.name) fmt.Fprintf(w, " value: %9d\n", metric.Value())