metrics, cmd/geth: change init-process of metrics (#30814)

This PR modifies how the metrics library handles `Enabled`: previously,
the package `init` decided whether to serve real metrics or just
dummy-types.

This has several drawbacks: 
- During pkg init, we need to determine whether metrics are enabled or
not. So we first hacked in a check if certain geth-specific
commandline-flags were enabled. Then we added a similar check for
geth-env-vars. Then we almost added a very elaborate check for
toml-config-file, plus toml parsing.

- Using "real" types and dummy types interchangeably means that
everything is hidden behind interfaces. This has a performance penalty,
and also it just adds a lot of code.

This PR removes the interface stuff, uses concrete types, and allows for
the setting of Enabled to happen later. It is still assumed that
`metrics.Enable()` is invoked early on.

The somewhat 'heavy' operations, such as ticking meters and exp-decay,
now checks the enable-flag to prevent resource leak.

The change may be large, but it's mostly pretty trivial, and from the
last time I gutted the metrics, I ensured that we have fairly good test
coverage.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/30891/head
Martin HS 2 months ago committed by GitHub
parent 4ecf08584c
commit 9045b79bc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      cmd/geth/chaincmd.go
  2. 24
      cmd/geth/config.go
  3. 7
      cmd/geth/main.go
  4. 103
      cmd/utils/flags.go
  5. 12
      core/rawdb/freezer_table.go
  6. 32
      core/state/trie_prefetcher.go
  7. 4
      core/txpool/txpool.go
  8. 2
      eth/downloader/queue.go
  9. 2
      eth/handler.go
  10. 2
      eth/protocols/eth/handler.go
  11. 2
      eth/protocols/eth/handshake.go
  12. 12
      eth/protocols/eth/metrics.go
  13. 2
      eth/protocols/snap/handler.go
  14. 30
      ethdb/leveldb/leveldb.go
  15. 30
      ethdb/pebble/pebble.go
  16. 94
      metrics/counter.go
  17. 103
      metrics/counter_float64.go
  18. 58
      metrics/counter_float_64_test.go
  19. 38
      metrics/counter_test.go
  20. 8
      metrics/debug.go
  21. 64
      metrics/ewma.go
  22. 14
      metrics/ewma_test.go
  23. 22
      metrics/exp/exp.go
  24. 84
      metrics/gauge.go
  25. 65
      metrics/gauge_float64.go
  26. 50
      metrics/gauge_info.go
  27. 117
      metrics/graphite.go
  28. 22
      metrics/graphite_test.go
  29. 44
      metrics/healthcheck.go
  30. 23
      metrics/histogram.go
  31. 48
      metrics/inactive.go
  32. 16
      metrics/influxdb/influxdb.go
  33. 2
      metrics/influxdb/influxdb_test.go
  34. 2
      metrics/init_test.go
  35. 18
      metrics/log.go
  36. 156
      metrics/meter.go
  37. 12
      metrics/meter_test.go
  38. 57
      metrics/metrics.go
  39. 71
      metrics/metrics_test.go
  40. 14
      metrics/opentsdb.go
  41. 22
      metrics/prometheus/collector.go
  42. 2
      metrics/prometheus/collector_test.go
  43. 39
      metrics/registry.go
  44. 18
      metrics/registry_test.go
  45. 2
      metrics/resetting_sample.go
  46. 90
      metrics/resetting_timer.go
  47. 282
      metrics/sample.go
  48. 14
      metrics/sample_test.go
  49. 16
      metrics/syslog.go
  50. 104
      metrics/timer.go
  51. 16
      metrics/writer.go
  52. 4
      p2p/discover/metrics.go
  53. 4
      p2p/discover/table.go
  54. 35
      p2p/metrics.go
  55. 2
      p2p/peer.go
  56. 4
      p2p/tracker/tracker.go
  57. 2
      p2p/transport.go
  58. 2
      triedb/pathdb/states.go

@ -38,7 +38,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/era"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
)
@ -282,14 +281,12 @@ func importChain(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
utils.Fatalf("This command requires an argument.")
}
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)
stack, _ := makeConfigNode(ctx)
stack, cfg := makeConfigNode(ctx)
defer stack.Close()
// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)
chain, db := utils.MakeChain(ctx, stack, false)
defer db.Close()

@ -192,6 +192,9 @@ func makeFullNode(ctx *cli.Context) *node.Node {
cfg.Eth.OverrideVerkle = &v
}
// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
// Create gauge with geth system and build information
@ -325,6 +328,27 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) {
if ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) {
cfg.Metrics.InfluxDBOrganization = ctx.String(utils.MetricsInfluxDBOrganizationFlag.Name)
}
// Sanity-check the commandline flags. It is fine if some unused fields is part
// of the toml-config, but we expect the commandline to only contain relevant
// arguments, otherwise it indicates an error.
var (
enableExport = ctx.Bool(utils.MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(utils.MetricsEnableInfluxDBV2Flag.Name)
)
if enableExport || enableExportV2 {
v1FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBPasswordFlag.Name)
v2FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBBucketFlag.Name)
if enableExport && v2FlagIsSet {
utils.Fatalf("Flags --influxdb.metrics.organization, --influxdb.metrics.token, --influxdb.metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
utils.Fatalf("Flags --influxdb.metrics.username, --influxdb.metrics.password are only available for influxdb-v1")
}
}
}
func setAccountManagerBackends(conf *node.Config, am *accounts.Manager, keydir string) error {

@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"go.uber.org/automaxprocs/maxprocs"
@ -325,12 +324,6 @@ func prepare(ctx *cli.Context) {
ctx.Set(utils.CacheFlag.Name, strconv.Itoa(4096))
}
}
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)
}
// geth is the main entry point into the system if no special subcommand is run.

@ -1968,67 +1968,56 @@ func RegisterFullSyncTester(stack *node.Node, eth *eth.Ethereum, target common.H
log.Info("Registered full-sync tester", "hash", target)
}
func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
var (
enableExport = ctx.Bool(MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(MetricsEnableInfluxDBV2Flag.Name)
)
if enableExport || enableExportV2 {
CheckExclusive(ctx, MetricsEnableInfluxDBFlag, MetricsEnableInfluxDBV2Flag)
v1FlagIsSet := ctx.IsSet(MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(MetricsInfluxDBPasswordFlag.Name)
v2FlagIsSet := ctx.IsSet(MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(MetricsInfluxDBBucketFlag.Name)
if enableExport && v2FlagIsSet {
Fatalf("Flags --influxdb.metrics.organization, --influxdb.metrics.token, --influxdb.metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
Fatalf("Flags --influxdb.metrics.username, --influxdb.metrics.password are only available for influxdb-v1")
}
}
var (
endpoint = ctx.String(MetricsInfluxDBEndpointFlag.Name)
database = ctx.String(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.String(MetricsInfluxDBUsernameFlag.Name)
password = ctx.String(MetricsInfluxDBPasswordFlag.Name)
token = ctx.String(MetricsInfluxDBTokenFlag.Name)
bucket = ctx.String(MetricsInfluxDBBucketFlag.Name)
organization = ctx.String(MetricsInfluxDBOrganizationFlag.Name)
)
if enableExport {
tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))
log.Info("Enabling metrics export to InfluxDB")
go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "geth.", tagsMap)
} else if enableExportV2 {
tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))
log.Info("Enabling metrics export to InfluxDB (v2)")
go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "geth.", tagsMap)
}
// SetupMetrics configures the metrics system.
func SetupMetrics(cfg *metrics.Config) {
if !cfg.Enabled {
return
}
log.Info("Enabling metrics collection")
metrics.Enable()
if ctx.IsSet(MetricsHTTPFlag.Name) {
address := net.JoinHostPort(ctx.String(MetricsHTTPFlag.Name), fmt.Sprintf("%d", ctx.Int(MetricsPortFlag.Name)))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if ctx.IsSet(MetricsPortFlag.Name) {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}
// InfluxDB exporter.
var (
enableExport = cfg.EnableInfluxDB
enableExportV2 = cfg.EnableInfluxDBV2
)
if cfg.EnableInfluxDB && cfg.EnableInfluxDBV2 {
Fatalf("Flags %v can't be used at the same time", strings.Join([]string{MetricsEnableInfluxDBFlag.Name, MetricsEnableInfluxDBV2Flag.Name}, ", "))
}
var (
endpoint = cfg.InfluxDBEndpoint
database = cfg.InfluxDBDatabase
username = cfg.InfluxDBUsername
password = cfg.InfluxDBPassword
token = cfg.InfluxDBToken
bucket = cfg.InfluxDBBucket
organization = cfg.InfluxDBOrganization
tagsMap = SplitTagsFlag(cfg.InfluxDBTags)
)
if enableExport {
log.Info("Enabling metrics export to InfluxDB")
go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "geth.", tagsMap)
} else if enableExportV2 {
tagsMap := SplitTagsFlag(cfg.InfluxDBTags)
log.Info("Enabling metrics export to InfluxDB (v2)")
go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "geth.", tagsMap)
}
// Expvar exporter.
if cfg.HTTP != "" {
address := net.JoinHostPort(cfg.HTTP, fmt.Sprintf("%d", cfg.Port))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if cfg.HTTP == "" && cfg.Port != 0 {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}
// Enable system metrics collection.
go metrics.CollectProcessMetrics(3 * time.Second)
}
// SplitTagsFlag parses a comma-separated list of k=v metrics tags.
func SplitTagsFlag(tagsFlag string) map[string]string {
tags := strings.Split(tagsFlag, ",")
tagsMap := map[string]string{}

@ -113,10 +113,10 @@ type freezerTable struct {
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
headBytes int64 // Number of bytes written to the head file
readMeter *metrics.Meter // Meter for measuring the effective amount of data read
writeMeter *metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge *metrics.Gauge // Gauge for tracking the combined size of all freezer tables
logger log.Logger // Logger with database path and table name embedded
lock sync.RWMutex // Mutex protecting the data file descriptors
@ -124,13 +124,13 @@ type freezerTable struct {
// newFreezerTable opens the given path as a freezer table.
func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) {
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly)
return newTable(path, name, metrics.NewInactiveMeter(), metrics.NewInactiveMeter(), metrics.NewGauge(), freezerTableSize, disableSnappy, readonly)
}
// newTable opens a freezer table, creating the data and index files if they are
// non-existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
func newTable(path string, name string, readMeter, writeMeter *metrics.Meter, sizeGauge *metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err

@ -47,21 +47,21 @@ type triePrefetcher struct {
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests
deliveryMissMeter metrics.Meter
accountLoadReadMeter metrics.Meter
accountLoadWriteMeter metrics.Meter
accountDupReadMeter metrics.Meter
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
deliveryMissMeter *metrics.Meter
accountLoadReadMeter *metrics.Meter
accountLoadWriteMeter *metrics.Meter
accountDupReadMeter *metrics.Meter
accountDupWriteMeter *metrics.Meter
accountDupCrossMeter *metrics.Meter
accountWasteMeter *metrics.Meter
storageLoadReadMeter *metrics.Meter
storageLoadWriteMeter *metrics.Meter
storageDupReadMeter *metrics.Meter
storageDupWriteMeter *metrics.Meter
storageDupCrossMeter *metrics.Meter
storageWasteMeter *metrics.Meter
}
func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
@ -111,7 +111,7 @@ func (p *triePrefetcher) terminate(async bool) {
// report aggregates the pre-fetching and usage metrics and reports them.
func (p *triePrefetcher) report() {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
for _, fetcher := range p.fetchers {

@ -126,7 +126,7 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return ErrAlreadyReserved
}
p.reservations[addr] = subpool
if metrics.Enabled {
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Inc(1)
}
@ -143,7 +143,7 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return errors.New("address not owned")
}
delete(p.reservations, addr)
if metrics.Enabled {
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Dec(1)
}

@ -882,7 +882,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei
// to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult)) (int, error) {
// Short circuit if the data was never requested

@ -366,7 +366,7 @@ func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error
defer h.decHandlers()
if err := h.peers.registerSnapExtension(peer); err != nil {
if metrics.Enabled {
if metrics.Enabled() {
if peer.Inbound() {
snap.IngressRegistrationErrorMeter.Mark(1)
} else {

@ -190,7 +190,7 @@ func handleMessage(backend Backend, peer *Peer) error {
var handlers = eth68
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
if metrics.Enabled() {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) {
sampler := func() metrics.Sample {

@ -112,7 +112,7 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
// markError registers the error with the corresponding metric.
func markError(p *Peer, err error) {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
m := meters.get(p.Inbound())

@ -41,23 +41,23 @@ func (h *bidirectionalMeters) get(ingress bool) *hsMeters {
type hsMeters struct {
// peerError measures the number of errors related to incorrect peer
// behaviour, such as invalid message code, size, encoding, etc.
peerError metrics.Meter
peerError *metrics.Meter
// timeoutError measures the number of timeouts.
timeoutError metrics.Meter
timeoutError *metrics.Meter
// networkIDMismatch measures the number of network id mismatch errors.
networkIDMismatch metrics.Meter
networkIDMismatch *metrics.Meter
// protocolVersionMismatch measures the number of differing protocol
// versions.
protocolVersionMismatch metrics.Meter
protocolVersionMismatch *metrics.Meter
// genesisMismatch measures the number of differing genesises.
genesisMismatch metrics.Meter
genesisMismatch *metrics.Meter
// forkidRejected measures the number of differing forkids.
forkidRejected metrics.Meter
forkidRejected *metrics.Meter
}
// newHandshakeMeters registers and returns handshake meters for the given

@ -132,7 +132,7 @@ func HandleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()
start := time.Now()
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
if metrics.Enabled() {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) {
sampler := func() metrics.Sample {

@ -62,21 +62,21 @@ type Database struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge to track the amount of memory that has been manually allocated (not a part of runtime/GC)
levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
compTimeMeter *metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter *metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter *metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter *metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter *metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge *metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter *metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter *metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge *metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge *metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge *metrics.Gauge // Gauge to track the amount of memory that has been manually allocated (not a part of runtime/GC)
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels
quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database

@ -58,21 +58,21 @@ type Database struct {
fn string // filename for reporting
db *pebble.DB // Underlying pebble storage engine
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
compTimeMeter *metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter *metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter *metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter *metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter *metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge *metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter *metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter *metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge *metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge *metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge *metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database

@ -4,109 +4,55 @@ import (
"sync/atomic"
)
type CounterSnapshot interface {
Count() int64
}
// Counter hold an int64 value that can be incremented and decremented.
type Counter interface {
Clear()
Dec(int64)
Inc(int64)
Snapshot() CounterSnapshot
}
// GetOrRegisterCounter returns an existing Counter or constructs and registers
// a new StandardCounter.
func GetOrRegisterCounter(name string, r Registry) Counter {
if nil == r {
// a new Counter.
func GetOrRegisterCounter(name string, r Registry) *Counter {
if r == nil {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounter).(Counter)
return r.GetOrRegister(name, NewCounter).(*Counter)
}
// GetOrRegisterCounterForced returns an existing Counter or constructs and registers a
// new Counter no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterCounterForced(name string, r Registry) Counter {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounterForced).(Counter)
// NewCounter constructs a new Counter.
func NewCounter() *Counter {
return new(Counter)
}
// NewCounter constructs a new StandardCounter.
func NewCounter() Counter {
if !Enabled {
return NilCounter{}
}
return new(StandardCounter)
}
// NewCounterForced constructs a new StandardCounter and returns it no matter if
// the global switch is enabled or not.
func NewCounterForced() Counter {
return new(StandardCounter)
}
// NewRegisteredCounter constructs and registers a new StandardCounter.
func NewRegisteredCounter(name string, r Registry) Counter {
// NewRegisteredCounter constructs and registers a new Counter.
func NewRegisteredCounter(name string, r Registry) *Counter {
c := NewCounter()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// NewRegisteredCounterForced constructs and registers a new StandardCounter
// and launches a goroutine no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredCounterForced(name string, r Registry) Counter {
c := NewCounterForced()
if nil == r {
if r == nil {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// counterSnapshot is a read-only copy of another Counter.
type counterSnapshot int64
// CounterSnapshot is a read-only copy of a Counter.
type CounterSnapshot int64
// Count returns the count at the time the snapshot was taken.
func (c counterSnapshot) Count() int64 { return int64(c) }
// NilCounter is a no-op Counter.
type NilCounter struct{}
func (c CounterSnapshot) Count() int64 { return int64(c) }
func (NilCounter) Clear() {}
func (NilCounter) Dec(i int64) {}
func (NilCounter) Inc(i int64) {}
func (NilCounter) Snapshot() CounterSnapshot { return (*emptySnapshot)(nil) }
// StandardCounter is the standard implementation of a Counter and uses the
// sync/atomic package to manage a single int64 value.
type StandardCounter atomic.Int64
// Counter hold an int64 value that can be incremented and decremented.
type Counter atomic.Int64
// Clear sets the counter to zero.
func (c *StandardCounter) Clear() {
func (c *Counter) Clear() {
(*atomic.Int64)(c).Store(0)
}
// Dec decrements the counter by the given amount.
func (c *StandardCounter) Dec(i int64) {
func (c *Counter) Dec(i int64) {
(*atomic.Int64)(c).Add(-i)
}
// Inc increments the counter by the given amount.
func (c *StandardCounter) Inc(i int64) {
func (c *Counter) Inc(i int64) {
(*atomic.Int64)(c).Add(i)
}
// Snapshot returns a read-only copy of the counter.
func (c *StandardCounter) Snapshot() CounterSnapshot {
return counterSnapshot((*atomic.Int64)(c).Load())
func (c *Counter) Snapshot() CounterSnapshot {
return CounterSnapshot((*atomic.Int64)(c).Load())
}

@ -5,114 +5,57 @@ import (
"sync/atomic"
)
type CounterFloat64Snapshot interface {
Count() float64
}
// CounterFloat64 holds a float64 value that can be incremented and decremented.
type CounterFloat64 interface {
Clear()
Dec(float64)
Inc(float64)
Snapshot() CounterFloat64Snapshot
}
// GetOrRegisterCounterFloat64 returns an existing CounterFloat64 or constructs and registers
// a new StandardCounterFloat64.
func GetOrRegisterCounterFloat64(name string, r Registry) CounterFloat64 {
// GetOrRegisterCounterFloat64 returns an existing *CounterFloat64 or constructs and registers
// a new CounterFloat64.
func GetOrRegisterCounterFloat64(name string, r Registry) *CounterFloat64 {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounterFloat64).(CounterFloat64)
}
// GetOrRegisterCounterFloat64Forced returns an existing CounterFloat64 or constructs and registers a
// new CounterFloat64 no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterCounterFloat64Forced(name string, r Registry) CounterFloat64 {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounterFloat64Forced).(CounterFloat64)
}
// NewCounterFloat64 constructs a new StandardCounterFloat64.
func NewCounterFloat64() CounterFloat64 {
if !Enabled {
return NilCounterFloat64{}
}
return &StandardCounterFloat64{}
return r.GetOrRegister(name, NewCounterFloat64).(*CounterFloat64)
}
// NewCounterFloat64Forced constructs a new StandardCounterFloat64 and returns it no matter if
// the global switch is enabled or not.
func NewCounterFloat64Forced() CounterFloat64 {
return &StandardCounterFloat64{}
// NewCounterFloat64 constructs a new CounterFloat64.
func NewCounterFloat64() *CounterFloat64 {
return new(CounterFloat64)
}
// NewRegisteredCounterFloat64 constructs and registers a new StandardCounterFloat64.
func NewRegisteredCounterFloat64(name string, r Registry) CounterFloat64 {
// NewRegisteredCounterFloat64 constructs and registers a new CounterFloat64.
func NewRegisteredCounterFloat64(name string, r Registry) *CounterFloat64 {
c := NewCounterFloat64()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// NewRegisteredCounterFloat64Forced constructs and registers a new StandardCounterFloat64
// and launches a goroutine no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredCounterFloat64Forced(name string, r Registry) CounterFloat64 {
c := NewCounterFloat64Forced()
if nil == r {
if r == nil {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// counterFloat64Snapshot is a read-only copy of another CounterFloat64.
type counterFloat64Snapshot float64
// CounterFloat64Snapshot is a read-only copy of a float64 counter.
type CounterFloat64Snapshot float64
// Count returns the value at the time the snapshot was taken.
func (c counterFloat64Snapshot) Count() float64 { return float64(c) }
type NilCounterFloat64 struct{}
func (c CounterFloat64Snapshot) Count() float64 { return float64(c) }
func (NilCounterFloat64) Clear() {}
func (NilCounterFloat64) Count() float64 { return 0.0 }
func (NilCounterFloat64) Dec(i float64) {}
func (NilCounterFloat64) Inc(i float64) {}
func (NilCounterFloat64) Snapshot() CounterFloat64Snapshot { return NilCounterFloat64{} }
// StandardCounterFloat64 is the standard implementation of a CounterFloat64 and uses the
// atomic to manage a single float64 value.
type StandardCounterFloat64 struct {
floatBits atomic.Uint64
}
// CounterFloat64 holds a float64 value that can be incremented and decremented.
type CounterFloat64 atomic.Uint64
// Clear sets the counter to zero.
func (c *StandardCounterFloat64) Clear() {
c.floatBits.Store(0)
func (c *CounterFloat64) Clear() {
(*atomic.Uint64)(c).Store(0)
}
// Dec decrements the counter by the given amount.
func (c *StandardCounterFloat64) Dec(v float64) {
atomicAddFloat(&c.floatBits, -v)
func (c *CounterFloat64) Dec(v float64) {
atomicAddFloat((*atomic.Uint64)(c), -v)
}
// Inc increments the counter by the given amount.
func (c *StandardCounterFloat64) Inc(v float64) {
atomicAddFloat(&c.floatBits, v)
func (c *CounterFloat64) Inc(v float64) {
atomicAddFloat((*atomic.Uint64)(c), v)
}
// Snapshot returns a read-only copy of the counter.
func (c *StandardCounterFloat64) Snapshot() CounterFloat64Snapshot {
v := math.Float64frombits(c.floatBits.Load())
return counterFloat64Snapshot(v)
func (c *CounterFloat64) Snapshot() CounterFloat64Snapshot {
return CounterFloat64Snapshot(math.Float64frombits((*atomic.Uint64)(c).Load()))
}
func atomicAddFloat(fbits *atomic.Uint64, v float64) {

@ -32,61 +32,35 @@ func BenchmarkCounterFloat64Parallel(b *testing.B) {
}
}
func TestCounterFloat64Clear(t *testing.T) {
func TestCounterFloat64(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
c.Clear()
if count := c.Snapshot().Count(); count != 0 {
t.Errorf("c.Count(): 0 != %v\n", count)
t.Errorf("wrong count: %v", count)
}
}
func TestCounterFloat64Dec1(t *testing.T) {
c := NewCounterFloat64()
c.Dec(1.0)
if count := c.Snapshot().Count(); count != -1.0 {
t.Errorf("c.Count(): -1.0 != %v\n", count)
t.Errorf("wrong count: %v", count)
}
}
func TestCounterFloat64Dec2(t *testing.T) {
c := NewCounterFloat64()
snapshot := c.Snapshot()
c.Dec(2.0)
if count := c.Snapshot().Count(); count != -2.0 {
t.Errorf("c.Count(): -2.0 != %v\n", count)
if count := c.Snapshot().Count(); count != -3.0 {
t.Errorf("wrong count: %v", count)
}
}
func TestCounterFloat64Inc1(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
if count := c.Snapshot().Count(); count != 1.0 {
t.Errorf("c.Count(): 1.0 != %v\n", count)
if count := c.Snapshot().Count(); count != -2.0 {
t.Errorf("wrong count: %v", count)
}
}
func TestCounterFloat64Inc2(t *testing.T) {
c := NewCounterFloat64()
c.Inc(2.0)
if count := c.Snapshot().Count(); count != 2.0 {
t.Errorf("c.Count(): 2.0 != %v\n", count)
if count := c.Snapshot().Count(); count != 0.0 {
t.Errorf("wrong count: %v", count)
}
}
func TestCounterFloat64Snapshot(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
snapshot := c.Snapshot()
c.Inc(1.0)
if count := snapshot.Count(); count != 1.0 {
t.Errorf("c.Count(): 1.0 != %v\n", count)
if count := snapshot.Count(); count != -1.0 {
t.Errorf("snapshot count wrong: %v", count)
}
}
func TestCounterFloat64Zero(t *testing.T) {
c := NewCounterFloat64()
if count := c.Snapshot().Count(); count != 0 {
t.Errorf("c.Count(): 0 != %v\n", count)
c.Inc(1.0)
c.Clear()
if count := c.Snapshot().Count(); count != 0.0 {
t.Errorf("wrong count: %v", count)
}
}

@ -19,35 +19,26 @@ func TestCounterClear(t *testing.T) {
}
}
func TestCounterDec1(t *testing.T) {
func TestCounter(t *testing.T) {
c := NewCounter()
if count := c.Snapshot().Count(); count != 0 {
t.Errorf("wrong count: %v", count)
}
c.Dec(1)
if count := c.Snapshot().Count(); count != -1 {
t.Errorf("c.Count(): -1 != %v\n", count)
t.Errorf("wrong count: %v", count)
}
}
func TestCounterDec2(t *testing.T) {
c := NewCounter()
c.Dec(2)
if count := c.Snapshot().Count(); count != -2 {
t.Errorf("c.Count(): -2 != %v\n", count)
if count := c.Snapshot().Count(); count != -3 {
t.Errorf("wrong count: %v", count)
}
}
func TestCounterInc1(t *testing.T) {
c := NewCounter()
c.Inc(1)
if count := c.Snapshot().Count(); count != 1 {
t.Errorf("c.Count(): 1 != %v\n", count)
if count := c.Snapshot().Count(); count != -2 {
t.Errorf("wrong count: %v", count)
}
}
func TestCounterInc2(t *testing.T) {
c := NewCounter()
c.Inc(2)
if count := c.Snapshot().Count(); count != 2 {
t.Errorf("c.Count(): 2 != %v\n", count)
if count := c.Snapshot().Count(); count != 0 {
t.Errorf("wrong count: %v", count)
}
}
@ -61,13 +52,6 @@ func TestCounterSnapshot(t *testing.T) {
}
}
func TestCounterZero(t *testing.T) {
c := NewCounter()
if count := c.Snapshot().Count(); count != 0 {
t.Errorf("c.Count(): 0 != %v\n", count)
}
}
func TestGetOrRegisterCounter(t *testing.T) {
r := NewRegistry()
NewRegisteredCounter("foo", r).Inc(47)

@ -8,13 +8,13 @@ import (
var (
debugMetrics struct {
GCStats struct {
LastGC Gauge
NumGC Gauge
LastGC *Gauge
NumGC *Gauge
Pause Histogram
//PauseQuantiles Histogram
PauseTotal Gauge
PauseTotal *Gauge
}
ReadGCStats Timer
ReadGCStats *Timer
}
gcStats debug.GCStats
)

@ -7,56 +7,36 @@ import (
"time"
)
type EWMASnapshot interface {
Rate() float64
}
// EWMASnapshot is a read-only copy of an EWMA.
type EWMASnapshot float64
// EWMAs continuously calculate an exponentially-weighted moving average
// based on an outside source of clock ticks.
type EWMA interface {
Snapshot() EWMASnapshot
Tick()
Update(int64)
}
// Rate returns the rate of events per second at the time the snapshot was
// taken.
func (a EWMASnapshot) Rate() float64 { return float64(a) }
// NewEWMA constructs a new EWMA with the given alpha.
func NewEWMA(alpha float64) EWMA {
return &StandardEWMA{alpha: alpha}
func NewEWMA(alpha float64) *EWMA {
return &EWMA{alpha: alpha}
}
// NewEWMA1 constructs a new EWMA for a one-minute moving average.
func NewEWMA1() EWMA {
func NewEWMA1() *EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/1))
}
// NewEWMA5 constructs a new EWMA for a five-minute moving average.
func NewEWMA5() EWMA {
func NewEWMA5() *EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/5))
}
// NewEWMA15 constructs a new EWMA for a fifteen-minute moving average.
func NewEWMA15() EWMA {
func NewEWMA15() *EWMA {
return NewEWMA(1 - math.Exp(-5.0/60.0/15))
}
// ewmaSnapshot is a read-only copy of another EWMA.
type ewmaSnapshot float64
// Rate returns the rate of events per second at the time the snapshot was
// taken.
func (a ewmaSnapshot) Rate() float64 { return float64(a) }
// NilEWMA is a no-op EWMA.
type NilEWMA struct{}
func (NilEWMA) Snapshot() EWMASnapshot { return (*emptySnapshot)(nil) }
func (NilEWMA) Tick() {}
func (NilEWMA) Update(n int64) {}
// StandardEWMA is the standard implementation of an EWMA and tracks the number
// of uncounted events and processes them on each tick. It uses the
// sync/atomic package to manage uncounted events.
type StandardEWMA struct {
// EWMA continuously calculate an exponentially-weighted moving average
// based on an outside source of clock ticks.
type EWMA struct {
uncounted atomic.Int64
alpha float64
rate atomic.Uint64
@ -65,27 +45,27 @@ type StandardEWMA struct {
}
// Snapshot returns a read-only copy of the EWMA.
func (a *StandardEWMA) Snapshot() EWMASnapshot {
func (a *EWMA) Snapshot() EWMASnapshot {
r := math.Float64frombits(a.rate.Load()) * float64(time.Second)
return ewmaSnapshot(r)
return EWMASnapshot(r)
}
// Tick ticks the clock to update the moving average. It assumes it is called
// tick ticks the clock to update the moving average. It assumes it is called
// every five seconds.
func (a *StandardEWMA) Tick() {
func (a *EWMA) tick() {
// Optimization to avoid mutex locking in the hot-path.
if a.init.Load() {
a.updateRate(a.fetchInstantRate())
return
}
// Slow-path: this is only needed on the first Tick() and preserves transactional updating
// Slow-path: this is only needed on the first tick() and preserves transactional updating
// of init and rate in the else block. The first conditional is needed below because
// a different thread could have set a.init = 1 between the time of the first atomic load and when
// the lock was acquired.
a.mutex.Lock()
if a.init.Load() {
// The fetchInstantRate() uses atomic loading, which is unnecessary in this critical section
// but again, this section is only invoked on the first successful Tick() operation.
// but again, this section is only invoked on the first successful tick() operation.
a.updateRate(a.fetchInstantRate())
} else {
a.init.Store(true)
@ -94,18 +74,18 @@ func (a *StandardEWMA) Tick() {
a.mutex.Unlock()
}
func (a *StandardEWMA) fetchInstantRate() float64 {
func (a *EWMA) fetchInstantRate() float64 {
count := a.uncounted.Swap(0)
return float64(count) / float64(5*time.Second)
}
func (a *StandardEWMA) updateRate(instantRate float64) {
func (a *EWMA) updateRate(instantRate float64) {
currentRate := math.Float64frombits(a.rate.Load())
currentRate += a.alpha * (instantRate - currentRate)
a.rate.Store(math.Float64bits(currentRate))
}
// Update adds n uncounted events.
func (a *StandardEWMA) Update(n int64) {
func (a *EWMA) Update(n int64) {
a.uncounted.Add(n)
}

@ -12,7 +12,7 @@ func BenchmarkEWMA(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
a.Update(1)
a.Tick()
a.tick()
}
}
@ -23,7 +23,7 @@ func BenchmarkEWMAParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(1)
a.Tick()
a.tick()
}
})
}
@ -31,7 +31,7 @@ func BenchmarkEWMAParallel(b *testing.B) {
func TestEWMA1(t *testing.T) {
a := NewEWMA1()
a.Update(3)
a.Tick()
a.tick()
for i, want := range []float64{0.6,
0.22072766470286553, 0.08120116994196772, 0.029872241020718428,
0.01098938333324054, 0.004042768199451294, 0.0014872513059998212,
@ -49,7 +49,7 @@ func TestEWMA1(t *testing.T) {
func TestEWMA5(t *testing.T) {
a := NewEWMA5()
a.Update(3)
a.Tick()
a.tick()
for i, want := range []float64{
0.6, 0.49123845184678905, 0.4021920276213837, 0.32928698165641596,
0.269597378470333, 0.2207276647028654, 0.18071652714732128,
@ -67,7 +67,7 @@ func TestEWMA5(t *testing.T) {
func TestEWMA15(t *testing.T) {
a := NewEWMA15()
a.Update(3)
a.Tick()
a.tick()
for i, want := range []float64{
0.6, 0.5613041910189706, 0.5251039914257684, 0.4912384518467888184678905,
0.459557003018789, 0.4299187863442732, 0.4021920276213831,
@ -82,8 +82,8 @@ func TestEWMA15(t *testing.T) {
}
}
func elapseMinute(a EWMA) {
func elapseMinute(a *EWMA) {
for i := 0; i < 12; i++ {
a.Tick()
a.tick()
}
}

@ -146,7 +146,7 @@ func (exp *exp) publishHistogram(name string, metric metrics.Histogram) {
exp.getFloat(name + ".999-percentile").Set(ps[4])
}
func (exp *exp) publishMeter(name string, metric metrics.Meter) {
func (exp *exp) publishMeter(name string, metric *metrics.Meter) {
m := metric.Snapshot()
exp.getInt(name + ".count").Set(m.Count())
exp.getFloat(name + ".one-minute").Set(m.Rate1())
@ -155,7 +155,7 @@ func (exp *exp) publishMeter(name string, metric metrics.Meter) {
exp.getFloat(name + ".mean").Set(m.RateMean())
}
func (exp *exp) publishTimer(name string, metric metrics.Timer) {
func (exp *exp) publishTimer(name string, metric *metrics.Timer) {
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
exp.getInt(name + ".count").Set(t.Count())
@ -174,7 +174,7 @@ func (exp *exp) publishTimer(name string, metric metrics.Timer) {
exp.getFloat(name + ".mean-rate").Set(t.RateMean())
}
func (exp *exp) publishResettingTimer(name string, metric metrics.ResettingTimer) {
func (exp *exp) publishResettingTimer(name string, metric *metrics.ResettingTimer) {
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.50, 0.75, 0.95, 0.99})
exp.getInt(name + ".count").Set(int64(t.Count()))
@ -188,23 +188,23 @@ func (exp *exp) publishResettingTimer(name string, metric metrics.ResettingTimer
func (exp *exp) syncToExpvar() {
exp.registry.Each(func(name string, i interface{}) {
switch i := i.(type) {
case metrics.Counter:
case *metrics.Counter:
exp.publishCounter(name, i.Snapshot())
case metrics.CounterFloat64:
case *metrics.CounterFloat64:
exp.publishCounterFloat64(name, i.Snapshot())
case metrics.Gauge:
case *metrics.Gauge:
exp.publishGauge(name, i.Snapshot())
case metrics.GaugeFloat64:
case *metrics.GaugeFloat64:
exp.publishGaugeFloat64(name, i.Snapshot())
case metrics.GaugeInfo:
case *metrics.GaugeInfo:
exp.publishGaugeInfo(name, i.Snapshot())
case metrics.Histogram:
exp.publishHistogram(name, i)
case metrics.Meter:
case *metrics.Meter:
exp.publishMeter(name, i)
case metrics.Timer:
case *metrics.Timer:
exp.publishTimer(name, i)
case metrics.ResettingTimer:
case *metrics.ResettingTimer:
exp.publishResettingTimer(name, i)
default:
panic(fmt.Sprintf("unsupported type for '%s': %T", name, i))

@ -2,97 +2,69 @@ package metrics
import "sync/atomic"
// GaugeSnapshot contains a readonly int64.
type GaugeSnapshot interface {
Value() int64
}
// GaugeSnapshot is a read-only copy of a Gauge.
type GaugeSnapshot int64
// Gauge holds an int64 value that can be set arbitrarily.
type Gauge interface {
Snapshot() GaugeSnapshot
Update(int64)
UpdateIfGt(int64)
Dec(int64)
Inc(int64)
}
// Value returns the value at the time the snapshot was taken.
func (g GaugeSnapshot) Value() int64 { return int64(g) }
// GetOrRegisterGauge returns an existing Gauge or constructs and registers a
// new StandardGauge.
func GetOrRegisterGauge(name string, r Registry) Gauge {
if nil == r {
// new Gauge.
func GetOrRegisterGauge(name string, r Registry) *Gauge {
if r == nil {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewGauge).(Gauge)
return r.GetOrRegister(name, NewGauge).(*Gauge)
}
// NewGauge constructs a new StandardGauge.
func NewGauge() Gauge {
if !Enabled {
return NilGauge{}
}
return &StandardGauge{}
// NewGauge constructs a new Gauge.
func NewGauge() *Gauge {
return &Gauge{}
}
// NewRegisteredGauge constructs and registers a new StandardGauge.
func NewRegisteredGauge(name string, r Registry) Gauge {
// NewRegisteredGauge constructs and registers a new Gauge.
func NewRegisteredGauge(name string, r Registry) *Gauge {
c := NewGauge()
if nil == r {
if r == nil {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// gaugeSnapshot is a read-only copy of another Gauge.
type gaugeSnapshot int64
// Value returns the value at the time the snapshot was taken.
func (g gaugeSnapshot) Value() int64 { return int64(g) }
// NilGauge is a no-op Gauge.
type NilGauge struct{}
func (NilGauge) Snapshot() GaugeSnapshot { return (*emptySnapshot)(nil) }
func (NilGauge) Update(v int64) {}
func (NilGauge) UpdateIfGt(v int64) {}
func (NilGauge) Dec(i int64) {}
func (NilGauge) Inc(i int64) {}
// StandardGauge is the standard implementation of a Gauge and uses the
// sync/atomic package to manage a single int64 value.
type StandardGauge struct {
value atomic.Int64
}
// Gauge holds an int64 value that can be set arbitrarily.
type Gauge atomic.Int64
// Snapshot returns a read-only copy of the gauge.
func (g *StandardGauge) Snapshot() GaugeSnapshot {
return gaugeSnapshot(g.value.Load())
func (g *Gauge) Snapshot() GaugeSnapshot {
return GaugeSnapshot((*atomic.Int64)(g).Load())
}
// Update updates the gauge's value.
func (g *StandardGauge) Update(v int64) {
g.value.Store(v)
func (g *Gauge) Update(v int64) {
(*atomic.Int64)(g).Store(v)
}
// UpdateIfGt updates the gauge's value if v is larger then the current value.
func (g *StandardGauge) UpdateIfGt(v int64) {
func (g *Gauge) UpdateIfGt(v int64) {
value := (*atomic.Int64)(g)
for {
exist := g.value.Load()
exist := value.Load()
if exist >= v {
break
}
if g.value.CompareAndSwap(exist, v) {
if value.CompareAndSwap(exist, v) {
break
}
}
}
// Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) {
g.value.Add(-i)
func (g *Gauge) Dec(i int64) {
(*atomic.Int64)(g).Add(-i)
}
// Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) {
g.value.Add(i)
func (g *Gauge) Inc(i int64) {
(*atomic.Int64)(g).Add(i)
}

@ -5,35 +5,28 @@ import (
"sync/atomic"
)
type GaugeFloat64Snapshot interface {
Value() float64
}
// GaugeFloat64 hold a float64 value that can be set arbitrarily.
type GaugeFloat64 interface {
Snapshot() GaugeFloat64Snapshot
Update(float64)
}
// GetOrRegisterGaugeFloat64 returns an existing GaugeFloat64 or constructs and registers a
// new StandardGaugeFloat64.
func GetOrRegisterGaugeFloat64(name string, r Registry) GaugeFloat64 {
// new GaugeFloat64.
func GetOrRegisterGaugeFloat64(name string, r Registry) *GaugeFloat64 {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewGaugeFloat64()).(GaugeFloat64)
return r.GetOrRegister(name, NewGaugeFloat64()).(*GaugeFloat64)
}
// NewGaugeFloat64 constructs a new StandardGaugeFloat64.
func NewGaugeFloat64() GaugeFloat64 {
if !Enabled {
return NilGaugeFloat64{}
}
return &StandardGaugeFloat64{}
// GaugeFloat64Snapshot is a read-only copy of a GaugeFloat64.
type GaugeFloat64Snapshot float64
// Value returns the value at the time the snapshot was taken.
func (g GaugeFloat64Snapshot) Value() float64 { return float64(g) }
// NewGaugeFloat64 constructs a new GaugeFloat64.
func NewGaugeFloat64() *GaugeFloat64 {
return new(GaugeFloat64)
}
// NewRegisteredGaugeFloat64 constructs and registers a new StandardGaugeFloat64.
func NewRegisteredGaugeFloat64(name string, r Registry) GaugeFloat64 {
// NewRegisteredGaugeFloat64 constructs and registers a new GaugeFloat64.
func NewRegisteredGaugeFloat64(name string, r Registry) *GaugeFloat64 {
c := NewGaugeFloat64()
if nil == r {
r = DefaultRegistry
@ -42,32 +35,16 @@ func NewRegisteredGaugeFloat64(name string, r Registry) GaugeFloat64 {
return c
}
// gaugeFloat64Snapshot is a read-only copy of another GaugeFloat64.
type gaugeFloat64Snapshot float64
// Value returns the value at the time the snapshot was taken.
func (g gaugeFloat64Snapshot) Value() float64 { return float64(g) }
// NilGaugeFloat64 is a no-op Gauge.
type NilGaugeFloat64 struct{}
func (NilGaugeFloat64) Snapshot() GaugeFloat64Snapshot { return NilGaugeFloat64{} }
func (NilGaugeFloat64) Update(v float64) {}
func (NilGaugeFloat64) Value() float64 { return 0.0 }
// StandardGaugeFloat64 is the standard implementation of a GaugeFloat64 and uses
// atomic to manage a single float64 value.
type StandardGaugeFloat64 struct {
floatBits atomic.Uint64
}
// GaugeFloat64 hold a float64 value that can be set arbitrarily.
type GaugeFloat64 atomic.Uint64
// Snapshot returns a read-only copy of the gauge.
func (g *StandardGaugeFloat64) Snapshot() GaugeFloat64Snapshot {
v := math.Float64frombits(g.floatBits.Load())
return gaugeFloat64Snapshot(v)
func (g *GaugeFloat64) Snapshot() GaugeFloat64Snapshot {
v := math.Float64frombits((*atomic.Uint64)(g).Load())
return GaugeFloat64Snapshot(v)
}
// Update updates the gauge's value.
func (g *StandardGaugeFloat64) Update(v float64) {
g.floatBits.Store(math.Float64bits(v))
func (g *GaugeFloat64) Update(v float64) {
(*atomic.Uint64)(g).Store(math.Float64bits(v))
}

@ -5,16 +5,6 @@ import (
"sync"
)
type GaugeInfoSnapshot interface {
Value() GaugeInfoValue
}
// GaugeInfo holds a GaugeInfoValue value that can be set arbitrarily.
type GaugeInfo interface {
Update(GaugeInfoValue)
Snapshot() GaugeInfoSnapshot
}
// GaugeInfoValue is a mapping of keys to values
type GaugeInfoValue map[string]string
@ -24,26 +14,23 @@ func (val GaugeInfoValue) String() string {
}
// GetOrRegisterGaugeInfo returns an existing GaugeInfo or constructs and registers a
// new StandardGaugeInfo.
func GetOrRegisterGaugeInfo(name string, r Registry) GaugeInfo {
// new GaugeInfo.
func GetOrRegisterGaugeInfo(name string, r Registry) *GaugeInfo {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewGaugeInfo()).(GaugeInfo)
return r.GetOrRegister(name, NewGaugeInfo()).(*GaugeInfo)
}
// NewGaugeInfo constructs a new StandardGaugeInfo.
func NewGaugeInfo() GaugeInfo {
if !Enabled {
return NilGaugeInfo{}
}
return &StandardGaugeInfo{
// NewGaugeInfo constructs a new GaugeInfo.
func NewGaugeInfo() *GaugeInfo {
return &GaugeInfo{
value: GaugeInfoValue{},
}
}
// NewRegisteredGaugeInfo constructs and registers a new StandardGaugeInfo.
func NewRegisteredGaugeInfo(name string, r Registry) GaugeInfo {
// NewRegisteredGaugeInfo constructs and registers a new GaugeInfo.
func NewRegisteredGaugeInfo(name string, r Registry) *GaugeInfo {
c := NewGaugeInfo()
if nil == r {
r = DefaultRegistry
@ -53,31 +40,24 @@ func NewRegisteredGaugeInfo(name string, r Registry) GaugeInfo {
}
// gaugeInfoSnapshot is a read-only copy of another GaugeInfo.
type gaugeInfoSnapshot GaugeInfoValue
type GaugeInfoSnapshot GaugeInfoValue
// Value returns the value at the time the snapshot was taken.
func (g gaugeInfoSnapshot) Value() GaugeInfoValue { return GaugeInfoValue(g) }
type NilGaugeInfo struct{}
func (NilGaugeInfo) Snapshot() GaugeInfoSnapshot { return NilGaugeInfo{} }
func (NilGaugeInfo) Update(v GaugeInfoValue) {}
func (NilGaugeInfo) Value() GaugeInfoValue { return GaugeInfoValue{} }
func (g GaugeInfoSnapshot) Value() GaugeInfoValue { return GaugeInfoValue(g) }
// StandardGaugeInfo is the standard implementation of a GaugeInfo and uses
// sync.Mutex to manage a single string value.
type StandardGaugeInfo struct {
// GaugeInfo maintains a set of key/value mappings.
type GaugeInfo struct {
mutex sync.Mutex
value GaugeInfoValue
}
// Snapshot returns a read-only copy of the gauge.
func (g *StandardGaugeInfo) Snapshot() GaugeInfoSnapshot {
return gaugeInfoSnapshot(g.value)
func (g *GaugeInfo) Snapshot() GaugeInfoSnapshot {
return GaugeInfoSnapshot(g.value)
}
// Update updates the gauge's value.
func (g *StandardGaugeInfo) Update(v GaugeInfoValue) {
func (g *GaugeInfo) Update(v GaugeInfoValue) {
g.mutex.Lock()
defer g.mutex.Unlock()
g.value = v

@ -1,117 +0,0 @@
package metrics
import (
"bufio"
"fmt"
"log"
"net"
"strconv"
"strings"
"time"
)
// GraphiteConfig provides a container with configuration parameters for
// the Graphite exporter
type GraphiteConfig struct {
Addr *net.TCPAddr // Network address to connect to
Registry Registry // Registry to be exported
FlushInterval time.Duration // Flush interval
DurationUnit time.Duration // Time conversion unit for durations
Prefix string // Prefix to be prepended to metric names
Percentiles []float64 // Percentiles to export from timers and histograms
}
// Graphite is a blocking exporter function which reports metrics in r
// to a graphite server located at addr, flushing them every d duration
// and prepending metric names with prefix.
func Graphite(r Registry, d time.Duration, prefix string, addr *net.TCPAddr) {
GraphiteWithConfig(GraphiteConfig{
Addr: addr,
Registry: r,
FlushInterval: d,
DurationUnit: time.Nanosecond,
Prefix: prefix,
Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999},
})
}
// GraphiteWithConfig is a blocking exporter function just like Graphite,
// but it takes a GraphiteConfig instead.
func GraphiteWithConfig(c GraphiteConfig) {
log.Printf("WARNING: This go-metrics client has been DEPRECATED! It has been moved to https://github.com/cyberdelia/go-metrics-graphite and will be removed from rcrowley/go-metrics on August 12th 2015")
for range time.Tick(c.FlushInterval) {
if err := graphite(&c); nil != err {
log.Println(err)
}
}
}
// GraphiteOnce performs a single submission to Graphite, returning a
// non-nil error on failed connections. This can be used in a loop
// similar to GraphiteWithConfig for custom error handling.
func GraphiteOnce(c GraphiteConfig) error {
log.Printf("WARNING: This go-metrics client has been DEPRECATED! It has been moved to https://github.com/cyberdelia/go-metrics-graphite and will be removed from rcrowley/go-metrics on August 12th 2015")
return graphite(&c)
}
func graphite(c *GraphiteConfig) error {
now := time.Now().Unix()
du := float64(c.DurationUnit)
conn, err := net.DialTCP("tcp", nil, c.Addr)
if nil != err {
return err
}
defer conn.Close()
w := bufio.NewWriter(conn)
c.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, metric.Snapshot().Count(), now)
case CounterFloat64:
fmt.Fprintf(w, "%s.%s.count %f %d\n", c.Prefix, name, metric.Snapshot().Count(), now)
case Gauge:
fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Snapshot().Value(), now)
case GaugeFloat64:
fmt.Fprintf(w, "%s.%s.value %f %d\n", c.Prefix, name, metric.Snapshot().Value(), now)
case GaugeInfo:
fmt.Fprintf(w, "%s.%s.value %s %d\n", c.Prefix, name, metric.Snapshot().Value().String(), now)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles(c.Percentiles)
fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, h.Count(), now)
fmt.Fprintf(w, "%s.%s.min %d %d\n", c.Prefix, name, h.Min(), now)
fmt.Fprintf(w, "%s.%s.max %d %d\n", c.Prefix, name, h.Max(), now)
fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", c.Prefix, name, h.Mean(), now)
fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", c.Prefix, name, h.StdDev(), now)
for psIdx, psKey := range c.Percentiles {
key := strings.Replace(strconv.FormatFloat(psKey*100.0, 'f', -1, 64), ".", "", 1)
fmt.Fprintf(w, "%s.%s.%s-percentile %.2f %d\n", c.Prefix, name, key, ps[psIdx], now)
}
case Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, m.Count(), now)
fmt.Fprintf(w, "%s.%s.one-minute %.2f %d\n", c.Prefix, name, m.Rate1(), now)
fmt.Fprintf(w, "%s.%s.five-minute %.2f %d\n", c.Prefix, name, m.Rate5(), now)
fmt.Fprintf(w, "%s.%s.fifteen-minute %.2f %d\n", c.Prefix, name, m.Rate15(), now)
fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", c.Prefix, name, m.RateMean(), now)
case Timer:
t := metric.Snapshot()
ps := t.Percentiles(c.Percentiles)
fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, t.Count(), now)
fmt.Fprintf(w, "%s.%s.min %d %d\n", c.Prefix, name, t.Min()/int64(du), now)
fmt.Fprintf(w, "%s.%s.max %d %d\n", c.Prefix, name, t.Max()/int64(du), now)
fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", c.Prefix, name, t.Mean()/du, now)
fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", c.Prefix, name, t.StdDev()/du, now)
for psIdx, psKey := range c.Percentiles {
key := strings.Replace(strconv.FormatFloat(psKey*100.0, 'f', -1, 64), ".", "", 1)
fmt.Fprintf(w, "%s.%s.%s-percentile %.2f %d\n", c.Prefix, name, key, ps[psIdx], now)
}
fmt.Fprintf(w, "%s.%s.one-minute %.2f %d\n", c.Prefix, name, t.Rate1(), now)
fmt.Fprintf(w, "%s.%s.five-minute %.2f %d\n", c.Prefix, name, t.Rate5(), now)
fmt.Fprintf(w, "%s.%s.fifteen-minute %.2f %d\n", c.Prefix, name, t.Rate15(), now)
fmt.Fprintf(w, "%s.%s.mean-rate %.2f %d\n", c.Prefix, name, t.RateMean(), now)
}
w.Flush()
})
return nil
}

@ -1,22 +0,0 @@
package metrics
import (
"net"
"time"
)
func ExampleGraphite() {
addr, _ := net.ResolveTCPAddr("net", ":2003")
go Graphite(DefaultRegistry, 1*time.Second, "some.prefix", addr)
}
func ExampleGraphiteWithConfig() {
addr, _ := net.ResolveTCPAddr("net", ":2003")
go GraphiteWithConfig(GraphiteConfig{
Addr: addr,
Registry: DefaultRegistry,
FlushInterval: 1 * time.Second,
DurationUnit: time.Millisecond,
Percentiles: []float64{0.5, 0.75, 0.99, 0.999},
})
}

@ -1,61 +1,35 @@
package metrics
// Healthcheck holds an error value describing an arbitrary up/down status.
type Healthcheck interface {
Check()
Error() error
Healthy()
Unhealthy(error)
}
// NewHealthcheck constructs a new Healthcheck which will use the given
// function to update its status.
func NewHealthcheck(f func(Healthcheck)) Healthcheck {
if !Enabled {
return NilHealthcheck{}
}
return &StandardHealthcheck{nil, f}
func NewHealthcheck(f func(*Healthcheck)) *Healthcheck {
return &Healthcheck{nil, f}
}
// NilHealthcheck is a no-op.
type NilHealthcheck struct{}
// Check is a no-op.
func (NilHealthcheck) Check() {}
// Error is a no-op.
func (NilHealthcheck) Error() error { return nil }
// Healthy is a no-op.
func (NilHealthcheck) Healthy() {}
// Unhealthy is a no-op.
func (NilHealthcheck) Unhealthy(error) {}
// StandardHealthcheck is the standard implementation of a Healthcheck and
// Healthcheck is the standard implementation of a Healthcheck and
// stores the status and a function to call to update the status.
type StandardHealthcheck struct {
type Healthcheck struct {
err error
f func(Healthcheck)
f func(*Healthcheck)
}
// Check runs the healthcheck function to update the healthcheck's status.
func (h *StandardHealthcheck) Check() {
func (h *Healthcheck) Check() {
h.f(h)
}
// Error returns the healthcheck's status, which will be nil if it is healthy.
func (h *StandardHealthcheck) Error() error {
func (h *Healthcheck) Error() error {
return h.err
}
// Healthy marks the healthcheck as healthy.
func (h *StandardHealthcheck) Healthy() {
func (h *Healthcheck) Healthy() {
h.err = nil
}
// Unhealthy marks the healthcheck as unhealthy. The error is stored and
// may be retrieved by the Error method.
func (h *StandardHealthcheck) Unhealthy(err error) {
func (h *Healthcheck) Unhealthy(err error) {
h.err = err
}

@ -1,7 +1,16 @@
package metrics
type HistogramSnapshot interface {
SampleSnapshot
Count() int64
Max() int64
Mean() float64
Min() int64
Percentile(float64) float64
Percentiles([]float64) []float64
Size() int
StdDev() float64
Sum() int64
Variance() float64
}
// Histogram calculates distribution statistics from a series of int64 values.
@ -31,10 +40,7 @@ func GetOrRegisterHistogramLazy(name string, r Registry, s func() Sample) Histog
// NewHistogram constructs a new StandardHistogram from a Sample.
func NewHistogram(s Sample) Histogram {
if !Enabled {
return NilHistogram{}
}
return &StandardHistogram{sample: s}
return &StandardHistogram{s}
}
// NewRegisteredHistogram constructs and registers a new StandardHistogram from
@ -48,13 +54,6 @@ func NewRegisteredHistogram(name string, r Registry, s Sample) Histogram {
return c
}
// NilHistogram is a no-op Histogram.
type NilHistogram struct{}
func (NilHistogram) Clear() {}
func (NilHistogram) Snapshot() HistogramSnapshot { return (*emptySnapshot)(nil) }
func (NilHistogram) Update(v int64) {}
// StandardHistogram is the standard implementation of a Histogram and uses a
// Sample to bound its memory use.
type StandardHistogram struct {

@ -1,48 +0,0 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package metrics
// compile-time checks that interfaces are implemented.
var (
_ SampleSnapshot = (*emptySnapshot)(nil)
_ HistogramSnapshot = (*emptySnapshot)(nil)
_ CounterSnapshot = (*emptySnapshot)(nil)
_ GaugeSnapshot = (*emptySnapshot)(nil)
_ MeterSnapshot = (*emptySnapshot)(nil)
_ EWMASnapshot = (*emptySnapshot)(nil)
_ TimerSnapshot = (*emptySnapshot)(nil)
)
type emptySnapshot struct{}
func (*emptySnapshot) Count() int64 { return 0 }
func (*emptySnapshot) Max() int64 { return 0 }
func (*emptySnapshot) Mean() float64 { return 0.0 }
func (*emptySnapshot) Min() int64 { return 0 }
func (*emptySnapshot) Percentile(p float64) float64 { return 0.0 }
func (*emptySnapshot) Percentiles(ps []float64) []float64 { return make([]float64, len(ps)) }
func (*emptySnapshot) Size() int { return 0 }
func (*emptySnapshot) StdDev() float64 { return 0.0 }
func (*emptySnapshot) Sum() int64 { return 0 }
func (*emptySnapshot) Values() []int64 { return []int64{} }
func (*emptySnapshot) Variance() float64 { return 0.0 }
func (*emptySnapshot) Value() int64 { return 0 }
func (*emptySnapshot) Rate() float64 { return 0.0 }
func (*emptySnapshot) Rate1() float64 { return 0.0 }
func (*emptySnapshot) Rate5() float64 { return 0.0 }
func (*emptySnapshot) Rate15() float64 { return 0.0 }
func (*emptySnapshot) RateMean() float64 { return 0.0 }

@ -8,31 +8,31 @@ import (
func readMeter(namespace, name string, i interface{}) (string, map[string]interface{}) {
switch metric := i.(type) {
case metrics.Counter:
case *metrics.Counter:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Count(),
}
return measurement, fields
case metrics.CounterFloat64:
case *metrics.CounterFloat64:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Count(),
}
return measurement, fields
case metrics.Gauge:
case *metrics.Gauge:
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.GaugeFloat64:
case *metrics.GaugeFloat64:
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.GaugeInfo:
case *metrics.GaugeInfo:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
@ -62,7 +62,7 @@ func readMeter(namespace, name string, i interface{}) (string, map[string]interf
"p9999": ps[6],
}
return measurement, fields
case metrics.Meter:
case *metrics.Meter:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
fields := map[string]interface{}{
@ -73,7 +73,7 @@ func readMeter(namespace, name string, i interface{}) (string, map[string]interf
"mean": ms.RateMean(),
}
return measurement, fields
case metrics.Timer:
case *metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
@ -97,7 +97,7 @@ func readMeter(namespace, name string, i interface{}) (string, map[string]interf
"meanrate": ms.RateMean(),
}
return measurement, fields
case metrics.ResettingTimer:
case *metrics.ResettingTimer:
ms := metric.Snapshot()
if ms.Count() == 0 {
break

@ -33,7 +33,7 @@ import (
)
func TestMain(m *testing.M) {
metrics.Enabled = true
metrics.Enable()
os.Exit(m.Run())
}

@ -1,5 +1,5 @@
package metrics
func init() {
Enabled = true
metricsEnabled = true
}

@ -21,25 +21,21 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
for range time.Tick(freq) {
r.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
case *Counter:
l.Printf("counter %s\n", name)
l.Printf(" count: %9d\n", metric.Snapshot().Count())
case CounterFloat64:
case *CounterFloat64:
l.Printf("counter %s\n", name)
l.Printf(" count: %f\n", metric.Snapshot().Count())
case Gauge:
case *Gauge:
l.Printf("gauge %s\n", name)
l.Printf(" value: %9d\n", metric.Snapshot().Value())
case GaugeFloat64:
case *GaugeFloat64:
l.Printf("gauge %s\n", name)
l.Printf(" value: %f\n", metric.Snapshot().Value())
case GaugeInfo:
case *GaugeInfo:
l.Printf("gauge %s\n", name)
l.Printf(" value: %s\n", metric.Snapshot().Value())
case Healthcheck:
metric.Check()
l.Printf("healthcheck %s\n", name)
l.Printf(" error: %v\n", metric.Error())
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
@ -54,7 +50,7 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
l.Printf(" 95%%: %12.2f\n", ps[2])
l.Printf(" 99%%: %12.2f\n", ps[3])
l.Printf(" 99.9%%: %12.2f\n", ps[4])
case Meter:
case *Meter:
m := metric.Snapshot()
l.Printf("meter %s\n", name)
l.Printf(" count: %9d\n", m.Count())
@ -62,7 +58,7 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
l.Printf(" 5-min rate: %12.2f\n", m.Rate5())
l.Printf(" 15-min rate: %12.2f\n", m.Rate15())
l.Printf(" mean rate: %12.2f\n", m.RateMean())
case Timer:
case *Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
l.Printf("timer %s\n", name)

@ -7,114 +7,78 @@ import (
"time"
)
type MeterSnapshot interface {
Count() int64
Rate1() float64
Rate5() float64
Rate15() float64
RateMean() float64
}
// Meters count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
type Meter interface {
Mark(int64)
Snapshot() MeterSnapshot
Stop()
}
// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new StandardMeter.
// new Meter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) Meter {
if nil == r {
func GetOrRegisterMeter(name string, r Registry) *Meter {
if r == nil {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewMeter).(Meter)
return r.GetOrRegister(name, NewMeter).(*Meter)
}
// NewMeter constructs a new StandardMeter and launches a goroutine.
// NewMeter constructs a new Meter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
func NewMeter() *Meter {
m := newMeter()
arbiter.add(m)
return m
}
// NewInactiveMeter returns a meter but does not start any goroutines. This
// method is mainly intended for testing.
func NewInactiveMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
return m
func NewInactiveMeter() *Meter {
return newMeter()
}
// NewRegisteredMeter constructs and registers a new StandardMeter
// NewRegisteredMeter constructs and registers a new Meter
// and launches a goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
func NewRegisteredMeter(name string, r Registry) *Meter {
return GetOrRegisterMeter(name, r)
}
// meterSnapshot is a read-only copy of the meter's internal values.
type meterSnapshot struct {
// MeterSnapshot is a read-only copy of the meter's internal values.
type MeterSnapshot struct {
count int64
rate1, rate5, rate15, rateMean float64
}
// Count returns the count of events at the time the snapshot was taken.
func (m *meterSnapshot) Count() int64 { return m.count }
func (m *MeterSnapshot) Count() int64 { return m.count }
// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (m *meterSnapshot) Rate1() float64 { return m.rate1 }
func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (m *meterSnapshot) Rate5() float64 { return m.rate5 }
func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (m *meterSnapshot) Rate15() float64 { return m.rate15 }
func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (m *meterSnapshot) RateMean() float64 { return m.rateMean }
// NilMeter is a no-op Meter.
type NilMeter struct{}
func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
func (NilMeter) Count() int64 { return 0 }
func (NilMeter) Mark(n int64) {}
func (NilMeter) Snapshot() MeterSnapshot { return (*emptySnapshot)(nil) }
func (NilMeter) Stop() {}
// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
// Meter count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
type Meter struct {
count atomic.Int64
uncounted atomic.Int64 // not yet added to the EWMAs
rateMean atomic.Uint64
a1, a5, a15 EWMA
a1, a5, a15 *EWMA
startTime time.Time
stopped atomic.Bool
}
func newStandardMeter() *StandardMeter {
return &StandardMeter{
func newMeter() *Meter {
return &Meter{
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
@ -123,22 +87,20 @@ func newStandardMeter() *StandardMeter {
}
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
func (m *Meter) Stop() {
if stopped := m.stopped.Swap(true); !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
arbiter.remove(m)
}
}
// Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) {
func (m *Meter) Mark(n int64) {
m.uncounted.Add(n)
}
// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() MeterSnapshot {
return &meterSnapshot{
func (m *Meter) Snapshot() *MeterSnapshot {
return &MeterSnapshot{
count: m.count.Load() + m.uncounted.Load(),
rate1: m.a1.Snapshot().Rate(),
rate5: m.a5.Snapshot().Rate(),
@ -147,7 +109,7 @@ func (m *StandardMeter) Snapshot() MeterSnapshot {
}
}
func (m *StandardMeter) tick() {
func (m *Meter) tick() {
// Take the uncounted values, add to count
n := m.uncounted.Swap(0)
count := m.count.Add(n)
@ -157,33 +119,51 @@ func (m *StandardMeter) tick() {
m.a5.Update(n)
m.a15.Update(n)
// And trigger them to calculate the rates
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
m.a1.tick()
m.a5.tick()
m.a15.tick()
}
// meterArbiter ticks meters every 5s from a single goroutine.
var arbiter = meterTicker{meters: make(map[*Meter]struct{})}
// meterTicker ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
type meterTicker struct {
mu sync.RWMutex
started bool
meters map[*StandardMeter]struct{}
ticker *time.Ticker
meters map[*Meter]struct{}
}
var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
// tick meters on the scheduled interval
func (ma *meterArbiter) tick() {
for range ma.ticker.C {
ma.tickMeters()
// add adds another *Meter ot the arbiter, and starts the arbiter ticker.
func (ma *meterTicker) add(m *Meter) {
ma.mu.Lock()
defer ma.mu.Unlock()
ma.meters[m] = struct{}{}
if !ma.started {
ma.started = true
go ma.loop()
}
}
func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for meter := range ma.meters {
meter.tick()
// remove removes a meter from the set of ticked meters.
func (ma *meterTicker) remove(m *Meter) {
ma.mu.Lock()
delete(ma.meters, m)
ma.mu.Unlock()
}
// loop ticks meters on a 5 second interval.
func (ma *meterTicker) loop() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
if !metricsEnabled {
continue
}
ma.mu.RLock()
for meter := range ma.meters {
meter.tick()
}
ma.mu.RUnlock()
}
}

@ -28,18 +28,12 @@ func TestGetOrRegisterMeter(t *testing.T) {
}
func TestMeterDecay(t *testing.T) {
ma := meterArbiter{
ticker: time.NewTicker(time.Millisecond),
meters: make(map[*StandardMeter]struct{}),
}
defer ma.ticker.Stop()
m := newStandardMeter()
ma.meters[m] = struct{}{}
m := newMeter()
m.Mark(1)
ma.tickMeters()
m.tick()
rateMean := m.Snapshot().RateMean()
time.Sleep(100 * time.Millisecond)
ma.tickMeters()
m.tick()
if m.Snapshot().RateMean() >= rateMean {
t.Error("m.RateMean() didn't decrease")
}

@ -6,52 +6,29 @@
package metrics
import (
"os"
"runtime/metrics"
"runtime/pprof"
"strconv"
"strings"
"syscall"
"time"
)
"github.com/ethereum/go-ethereum/log"
var (
metricsEnabled = false
)
// Enabled is checked by the constructor functions for all of the
// standard metrics. If it is true, the metric returned is a stub.
// Enabled is checked by functions that are deemed 'expensive', e.g. if a
// meter-type does locking and/or non-trivial math operations during update.
func Enabled() bool {
return metricsEnabled
}
// Enable enables the metrics system.
// The Enabled-flag is expected to be set, once, during startup, but toggling off and on
// is not supported.
//
// This global kill-switch helps quantify the observer effect and makes
// for less cluttered pprof profiles.
var Enabled = false
// enablerFlags is the CLI flag names to use to enable metrics collections.
var enablerFlags = []string{"metrics"}
// enablerEnvVars is the env var names to use to enable metrics collections.
var enablerEnvVars = []string{"GETH_METRICS"}
// init enables or disables the metrics system. Since we need this to run before
// any other code gets to create meters and timers, we'll actually do an ugly hack
// and peek into the command line args for the metrics flag.
func init() {
for _, enabler := range enablerEnvVars {
if val, found := syscall.Getenv(enabler); found && !Enabled {
if enable, _ := strconv.ParseBool(val); enable { // ignore error, flag parser will choke on it later
log.Info("Enabling metrics collection")
Enabled = true
}
}
}
for _, arg := range os.Args {
flag := strings.TrimLeft(arg, "-")
for _, enabler := range enablerFlags {
if !Enabled && flag == enabler {
log.Info("Enabling metrics collection")
Enabled = true
}
}
}
// Enable is not safe to call concurrently. You need to call this as early as possible in
// the program, before any metrics collection will happen.
func Enable() {
metricsEnabled = true
}
var threadCreateProfile = pprof.Lookup("threadcreate")
@ -128,7 +105,7 @@ func readRuntimeStats(v *runtimeStats) {
// CollectProcessMetrics periodically collects various metrics about the running process.
func CollectProcessMetrics(refresh time.Duration) {
// Short circuit if the metrics system is disabled
if !Enabled {
if !metricsEnabled {
return
}

@ -7,8 +7,6 @@ import (
"time"
)
const FANOUT = 128
func TestReadRuntimeValues(t *testing.T) {
var v runtimeStats
readRuntimeStats(&v)
@ -16,60 +14,23 @@ func TestReadRuntimeValues(t *testing.T) {
}
func BenchmarkMetrics(b *testing.B) {
r := NewRegistry()
c := NewRegisteredCounter("counter", r)
cf := NewRegisteredCounterFloat64("counterfloat64", r)
g := NewRegisteredGauge("gauge", r)
gf := NewRegisteredGaugeFloat64("gaugefloat64", r)
h := NewRegisteredHistogram("histogram", r, NewUniformSample(100))
m := NewRegisteredMeter("meter", r)
t := NewRegisteredTimer("timer", r)
var (
r = NewRegistry()
c = NewRegisteredCounter("counter", r)
cf = NewRegisteredCounterFloat64("counterfloat64", r)
g = NewRegisteredGauge("gauge", r)
gf = NewRegisteredGaugeFloat64("gaugefloat64", r)
h = NewRegisteredHistogram("histogram", r, NewUniformSample(100))
m = NewRegisteredMeter("meter", r)
t = NewRegisteredTimer("timer", r)
)
RegisterDebugGCStats(r)
b.ResetTimer()
ch := make(chan bool)
wgD := &sync.WaitGroup{}
/*
wgD.Add(1)
go func() {
defer wgD.Done()
//log.Println("go CaptureDebugGCStats")
for {
select {
case <-ch:
//log.Println("done CaptureDebugGCStats")
return
default:
CaptureDebugGCStatsOnce(r)
}
}
}()
//*/
wgW := &sync.WaitGroup{}
/*
wgW.Add(1)
var wg sync.WaitGroup
wg.Add(128)
for i := 0; i < 128; i++ {
go func() {
defer wgW.Done()
//log.Println("go Write")
for {
select {
case <-ch:
//log.Println("done Write")
return
default:
WriteOnce(r, io.Discard)
}
}
}()
//*/
wg := &sync.WaitGroup{}
wg.Add(FANOUT)
for i := 0; i < FANOUT; i++ {
go func(i int) {
defer wg.Done()
//log.Println("go", i)
for i := 0; i < b.N; i++ {
c.Inc(1)
cf.Inc(1.0)
@ -79,13 +40,9 @@ func BenchmarkMetrics(b *testing.B) {
m.Mark(1)
t.Update(1)
}
//log.Println("done", i)
}(i)
}()
}
wg.Wait()
close(ch)
wgD.Wait()
wgW.Wait()
}
func Example() {

@ -64,15 +64,15 @@ func (c *OpenTSDBConfig) writeRegistry(w io.Writer, now int64, shortHostname str
c.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
case *Counter:
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Snapshot().Count(), shortHostname)
case CounterFloat64:
case *CounterFloat64:
fmt.Fprintf(w, "put %s.%s.count %d %f host=%s\n", c.Prefix, name, now, metric.Snapshot().Count(), shortHostname)
case Gauge:
case *Gauge:
fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Snapshot().Value(), shortHostname)
case GaugeFloat64:
case *GaugeFloat64:
fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Snapshot().Value(), shortHostname)
case GaugeInfo:
case *GaugeInfo:
fmt.Fprintf(w, "put %s.%s.value %d %s host=%s\n", c.Prefix, name, now, metric.Snapshot().Value().String(), shortHostname)
case Histogram:
h := metric.Snapshot()
@ -87,14 +87,14 @@ func (c *OpenTSDBConfig) writeRegistry(w io.Writer, now int64, shortHostname str
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2], shortHostname)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3], shortHostname)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4], shortHostname)
case Meter:
case *Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, m.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate1(), shortHostname)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate5(), shortHostname)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, m.RateMean(), shortHostname)
case Timer:
case *Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, t.Count(), shortHostname)

@ -51,23 +51,23 @@ func newCollector() *collector {
// metric type is not supported/known.
func (c *collector) Add(name string, i any) error {
switch m := i.(type) {
case metrics.Counter:
case *metrics.Counter:
c.addCounter(name, m.Snapshot())
case metrics.CounterFloat64:
case *metrics.CounterFloat64:
c.addCounterFloat64(name, m.Snapshot())
case metrics.Gauge:
case *metrics.Gauge:
c.addGauge(name, m.Snapshot())
case metrics.GaugeFloat64:
case *metrics.GaugeFloat64:
c.addGaugeFloat64(name, m.Snapshot())
case metrics.GaugeInfo:
case *metrics.GaugeInfo:
c.addGaugeInfo(name, m.Snapshot())
case metrics.Histogram:
c.addHistogram(name, m.Snapshot())
case metrics.Meter:
case *metrics.Meter:
c.addMeter(name, m.Snapshot())
case metrics.Timer:
case *metrics.Timer:
c.addTimer(name, m.Snapshot())
case metrics.ResettingTimer:
case *metrics.ResettingTimer:
c.addResettingTimer(name, m.Snapshot())
default:
return fmt.Errorf("unknown prometheus metric type %T", i)
@ -106,11 +106,11 @@ func (c *collector) addHistogram(name string, m metrics.HistogramSnapshot) {
c.buff.WriteRune('\n')
}
func (c *collector) addMeter(name string, m metrics.MeterSnapshot) {
func (c *collector) addMeter(name string, m *metrics.MeterSnapshot) {
c.writeGaugeCounter(name, m.Count())
}
func (c *collector) addTimer(name string, m metrics.TimerSnapshot) {
func (c *collector) addTimer(name string, m *metrics.TimerSnapshot) {
pv := []float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}
ps := m.Percentiles(pv)
c.writeSummaryCounter(name, m.Count())
@ -121,7 +121,7 @@ func (c *collector) addTimer(name string, m metrics.TimerSnapshot) {
c.buff.WriteRune('\n')
}
func (c *collector) addResettingTimer(name string, m metrics.ResettingTimerSnapshot) {
func (c *collector) addResettingTimer(name string, m *metrics.ResettingTimerSnapshot) {
if m.Count() <= 0 {
return
}

@ -27,7 +27,7 @@ import (
)
func TestMain(m *testing.M) {
metrics.Enabled = true
metrics.Enable()
os.Exit(m.Run())
}

@ -1,6 +1,7 @@
package metrics
import (
"errors"
"fmt"
"reflect"
"sort"
@ -8,14 +9,10 @@ import (
"sync"
)
// DuplicateMetric is the error returned by Registry. Register when a metric
// ErrDuplicateMetric is the error returned by Registry.Register when a metric
// already exists. If you mean to Register that metric you must first
// Unregister the existing metric.
type DuplicateMetric string
func (err DuplicateMetric) Error() string {
return fmt.Sprintf("duplicate metric: %s", string(err))
}
var ErrDuplicateMetric = errors.New("duplicate metric")
// A Registry holds references to a set of metrics by name and can iterate
// over them, calling callback functions provided by the user.
@ -114,13 +111,13 @@ func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{}
return item
}
// Register the given metric under the given name. Returns a DuplicateMetric
// Register the given metric under the given name. Returns a ErrDuplicateMetric
// if a metric by the given name is already registered.
func (r *StandardRegistry) Register(name string, i interface{}) error {
// fast path
_, ok := r.metrics.Load(name)
if ok {
return DuplicateMetric(name)
return fmt.Errorf("%w: %v", ErrDuplicateMetric, name)
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
@ -128,7 +125,7 @@ func (r *StandardRegistry) Register(name string, i interface{}) error {
}
_, loaded, _ := r.loadOrRegister(name, i)
if loaded {
return DuplicateMetric(name)
return fmt.Errorf("%w: %v", ErrDuplicateMetric, name)
}
return nil
}
@ -136,7 +133,7 @@ func (r *StandardRegistry) Register(name string, i interface{}) error {
// RunHealthchecks run all registered healthchecks.
func (r *StandardRegistry) RunHealthchecks() {
r.metrics.Range(func(key, value any) bool {
if h, ok := value.(Healthcheck); ok {
if h, ok := value.(*Healthcheck); ok {
h.Check()
}
return true
@ -149,15 +146,15 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {
r.Each(func(name string, i interface{}) {
values := make(map[string]interface{})
switch metric := i.(type) {
case Counter:
case *Counter:
values["count"] = metric.Snapshot().Count()
case CounterFloat64:
case *CounterFloat64:
values["count"] = metric.Snapshot().Count()
case Gauge:
case *Gauge:
values["value"] = metric.Snapshot().Value()
case GaugeFloat64:
case *GaugeFloat64:
values["value"] = metric.Snapshot().Value()
case Healthcheck:
case *Healthcheck:
values["error"] = nil
metric.Check()
if err := metric.Error(); nil != err {
@ -176,14 +173,14 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {
values["95%"] = ps[2]
values["99%"] = ps[3]
values["99.9%"] = ps[4]
case Meter:
case *Meter:
m := metric.Snapshot()
values["count"] = m.Count()
values["1m.rate"] = m.Rate1()
values["5m.rate"] = m.Rate5()
values["15m.rate"] = m.Rate15()
values["mean.rate"] = m.RateMean()
case Timer:
case *Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
values["count"] = t.Count()
@ -214,7 +211,7 @@ func (r *StandardRegistry) Unregister(name string) {
func (r *StandardRegistry) loadOrRegister(name string, i interface{}) (interface{}, bool, bool) {
switch i.(type) {
case Counter, CounterFloat64, Gauge, GaugeFloat64, GaugeInfo, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
case *Counter, *CounterFloat64, *Gauge, *GaugeFloat64, *GaugeInfo, *Healthcheck, Histogram, *Meter, *Timer, *ResettingTimer:
default:
return nil, false, false
}
@ -326,9 +323,7 @@ func (r *PrefixedRegistry) Unregister(name string) {
}
var (
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
AccountingRegistry = NewRegistry() // registry used in swarm
DefaultRegistry = NewRegistry()
)
// Each call the given function for each registered metric.
@ -347,7 +342,7 @@ func GetOrRegister(name string, i interface{}) interface{} {
return DefaultRegistry.GetOrRegister(name, i)
}
// Register the given metric under the given name. Returns a DuplicateMetric
// Register the given metric under the given name. Returns a ErrDuplicateMetric
// if a metric by the given name is already registered.
func Register(name string, i interface{}) error {
return DefaultRegistry.Register(name, i)

@ -47,7 +47,7 @@ func TestRegistry(t *testing.T) {
if name != "foo" {
t.Fatal(name)
}
if _, ok := iface.(Counter); !ok {
if _, ok := iface.(*Counter); !ok {
t.Fatal(iface)
}
})
@ -73,7 +73,7 @@ func TestRegistryDuplicate(t *testing.T) {
i := 0
r.Each(func(name string, iface interface{}) {
i++
if _, ok := iface.(Counter); !ok {
if _, ok := iface.(*Counter); !ok {
t.Fatal(iface)
}
})
@ -85,11 +85,11 @@ func TestRegistryDuplicate(t *testing.T) {
func TestRegistryGet(t *testing.T) {
r := NewRegistry()
r.Register("foo", NewCounter())
if count := r.Get("foo").(Counter).Snapshot().Count(); count != 0 {
if count := r.Get("foo").(*Counter).Snapshot().Count(); count != 0 {
t.Fatal(count)
}
r.Get("foo").(Counter).Inc(1)
if count := r.Get("foo").(Counter).Snapshot().Count(); count != 1 {
r.Get("foo").(*Counter).Inc(1)
if count := r.Get("foo").(*Counter).Snapshot().Count(); count != 1 {
t.Fatal(count)
}
}
@ -100,7 +100,7 @@ func TestRegistryGetOrRegister(t *testing.T) {
// First metric wins with GetOrRegister
_ = r.GetOrRegister("foo", NewCounter())
m := r.GetOrRegister("foo", NewGauge())
if _, ok := m.(Counter); !ok {
if _, ok := m.(*Counter); !ok {
t.Fatal(m)
}
@ -110,7 +110,7 @@ func TestRegistryGetOrRegister(t *testing.T) {
if name != "foo" {
t.Fatal(name)
}
if _, ok := iface.(Counter); !ok {
if _, ok := iface.(*Counter); !ok {
t.Fatal(iface)
}
})
@ -125,7 +125,7 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) {
// First metric wins with GetOrRegister
_ = r.GetOrRegister("foo", NewCounter)
m := r.GetOrRegister("foo", NewGauge)
if _, ok := m.(Counter); !ok {
if _, ok := m.(*Counter); !ok {
t.Fatal(m)
}
@ -135,7 +135,7 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) {
if name != "foo" {
t.Fatal(name)
}
if _, ok := iface.(Counter); !ok {
if _, ok := iface.(*Counter); !ok {
t.Fatal(iface)
}
})

@ -17,7 +17,7 @@ type resettingSample struct {
}
// Snapshot returns a read-only copy of the sample with the original reset.
func (rs *resettingSample) Snapshot() SampleSnapshot {
func (rs *resettingSample) Snapshot() *sampleSnapshot {
s := rs.Sample.Snapshot()
rs.Sample.Clear()
return s

@ -5,36 +5,17 @@ import (
"time"
)
// Initial slice capacity for the values stored in a ResettingTimer
const InitialResettingTimerSliceCap = 10
type ResettingTimerSnapshot interface {
Count() int
Mean() float64
Max() int64
Min() int64
Percentiles([]float64) []float64
}
// ResettingTimer is used for storing aggregated values for timers, which are reset on every flush interval.
type ResettingTimer interface {
Snapshot() ResettingTimerSnapshot
Time(func())
Update(time.Duration)
UpdateSince(time.Time)
}
// GetOrRegisterResettingTimer returns an existing ResettingTimer or constructs and registers a
// new StandardResettingTimer.
func GetOrRegisterResettingTimer(name string, r Registry) ResettingTimer {
// new ResettingTimer.
func GetOrRegisterResettingTimer(name string, r Registry) *ResettingTimer {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewResettingTimer).(ResettingTimer)
return r.GetOrRegister(name, NewResettingTimer).(*ResettingTimer)
}
// NewRegisteredResettingTimer constructs and registers a new StandardResettingTimer.
func NewRegisteredResettingTimer(name string, r Registry) ResettingTimer {
// NewRegisteredResettingTimer constructs and registers a new ResettingTimer.
func NewRegisteredResettingTimer(name string, r Registry) *ResettingTimer {
c := NewResettingTimer()
if nil == r {
r = DefaultRegistry
@ -43,33 +24,15 @@ func NewRegisteredResettingTimer(name string, r Registry) ResettingTimer {
return c
}
// NewResettingTimer constructs a new StandardResettingTimer
func NewResettingTimer() ResettingTimer {
if !Enabled {
return NilResettingTimer{}
}
return &StandardResettingTimer{
values: make([]int64, 0, InitialResettingTimerSliceCap),
// NewResettingTimer constructs a new ResettingTimer
func NewResettingTimer() *ResettingTimer {
return &ResettingTimer{
values: make([]int64, 0, 10),
}
}
// NilResettingTimer is a no-op ResettingTimer.
type NilResettingTimer struct{}
func (NilResettingTimer) Values() []int64 { return nil }
func (n NilResettingTimer) Snapshot() ResettingTimerSnapshot { return n }
func (NilResettingTimer) Time(f func()) { f() }
func (NilResettingTimer) Update(time.Duration) {}
func (NilResettingTimer) Percentiles([]float64) []float64 { return nil }
func (NilResettingTimer) Mean() float64 { return 0.0 }
func (NilResettingTimer) Max() int64 { return 0 }
func (NilResettingTimer) Min() int64 { return 0 }
func (NilResettingTimer) UpdateSince(time.Time) {}
func (NilResettingTimer) Count() int { return 0 }
// StandardResettingTimer is the standard implementation of a ResettingTimer.
// and Meter.
type StandardResettingTimer struct {
// ResettingTimer is used for storing aggregated values for timers, which are reset on every flush interval.
type ResettingTimer struct {
values []int64
sum int64 // sum is a running count of the total sum, used later to calculate mean
@ -77,28 +40,31 @@ type StandardResettingTimer struct {
}
// Snapshot resets the timer and returns a read-only copy of its contents.
func (t *StandardResettingTimer) Snapshot() ResettingTimerSnapshot {
func (t *ResettingTimer) Snapshot() *ResettingTimerSnapshot {
t.mutex.Lock()
defer t.mutex.Unlock()
snapshot := &resettingTimerSnapshot{}
snapshot := &ResettingTimerSnapshot{}
if len(t.values) > 0 {
snapshot.mean = float64(t.sum) / float64(len(t.values))
snapshot.values = t.values
t.values = make([]int64, 0, InitialResettingTimerSliceCap)
t.values = make([]int64, 0, 10)
}
t.sum = 0
return snapshot
}
// Record the duration of the execution of the given function.
func (t *StandardResettingTimer) Time(f func()) {
func (t *ResettingTimer) Time(f func()) {
ts := time.Now()
f()
t.Update(time.Since(ts))
}
// Record the duration of an event.
func (t *StandardResettingTimer) Update(d time.Duration) {
func (t *ResettingTimer) Update(d time.Duration) {
if !metricsEnabled {
return
}
t.mutex.Lock()
defer t.mutex.Unlock()
t.values = append(t.values, int64(d))
@ -106,12 +72,12 @@ func (t *StandardResettingTimer) Update(d time.Duration) {
}
// Record the duration of an event that started at a time and ends now.
func (t *StandardResettingTimer) UpdateSince(ts time.Time) {
func (t *ResettingTimer) UpdateSince(ts time.Time) {
t.Update(time.Since(ts))
}
// resettingTimerSnapshot is a point-in-time copy of another ResettingTimer.
type resettingTimerSnapshot struct {
// ResettingTimerSnapshot is a point-in-time copy of another ResettingTimer.
type ResettingTimerSnapshot struct {
values []int64
mean float64
max int64
@ -121,20 +87,20 @@ type resettingTimerSnapshot struct {
}
// Count return the length of the values from snapshot.
func (t *resettingTimerSnapshot) Count() int {
func (t *ResettingTimerSnapshot) Count() int {
return len(t.values)
}
// Percentiles returns the boundaries for the input percentiles.
// note: this method is not thread safe
func (t *resettingTimerSnapshot) Percentiles(percentiles []float64) []float64 {
func (t *ResettingTimerSnapshot) Percentiles(percentiles []float64) []float64 {
t.calc(percentiles)
return t.thresholdBoundaries
}
// Mean returns the mean of the snapshotted values
// note: this method is not thread safe
func (t *resettingTimerSnapshot) Mean() float64 {
func (t *ResettingTimerSnapshot) Mean() float64 {
if !t.calculated {
t.calc(nil)
}
@ -144,7 +110,7 @@ func (t *resettingTimerSnapshot) Mean() float64 {
// Max returns the max of the snapshotted values
// note: this method is not thread safe
func (t *resettingTimerSnapshot) Max() int64 {
func (t *ResettingTimerSnapshot) Max() int64 {
if !t.calculated {
t.calc(nil)
}
@ -153,14 +119,14 @@ func (t *resettingTimerSnapshot) Max() int64 {
// Min returns the min of the snapshotted values
// note: this method is not thread safe
func (t *resettingTimerSnapshot) Min() int64 {
func (t *ResettingTimerSnapshot) Min() int64 {
if !t.calculated {
t.calc(nil)
}
return t.min
}
func (t *resettingTimerSnapshot) calc(percentiles []float64) {
func (t *ResettingTimerSnapshot) calc(percentiles []float64) {
scores := CalculatePercentiles(t.values, percentiles)
t.thresholdBoundaries = scores
if len(t.values) == 0 {

@ -10,27 +10,123 @@ import (
const rescaleThreshold = time.Hour
type SampleSnapshot interface {
Count() int64
Max() int64
Mean() float64
Min() int64
Percentile(float64) float64
Percentiles([]float64) []float64
Size() int
StdDev() float64
Sum() int64
Variance() float64
}
// Samples maintain a statistically-significant selection of values from
// Sample maintains a statistically-significant selection of values from
// a stream.
type Sample interface {
Snapshot() SampleSnapshot
Snapshot() *sampleSnapshot
Clear()
Update(int64)
}
var (
_ Sample = (*ExpDecaySample)(nil)
_ Sample = (*UniformSample)(nil)
_ Sample = (*resettingSample)(nil)
)
// sampleSnapshot is a read-only copy of a Sample.
type sampleSnapshot struct {
count int64
values []int64
max int64
min int64
mean float64
sum int64
variance float64
}
// newSampleSnapshotPrecalculated creates a read-only sampleSnapShot, using
// precalculated sums to avoid iterating the values
func newSampleSnapshotPrecalculated(count int64, values []int64, min, max, sum int64) *sampleSnapshot {
if len(values) == 0 {
return &sampleSnapshot{
count: count,
values: values,
}
}
return &sampleSnapshot{
count: count,
values: values,
max: max,
min: min,
mean: float64(sum) / float64(len(values)),
sum: sum,
}
}
// newSampleSnapshot creates a read-only sampleSnapShot, and calculates some
// numbers.
func newSampleSnapshot(count int64, values []int64) *sampleSnapshot {
var (
max int64 = math.MinInt64
min int64 = math.MaxInt64
sum int64
)
for _, v := range values {
sum += v
if v > max {
max = v
}
if v < min {
min = v
}
}
return newSampleSnapshotPrecalculated(count, values, min, max, sum)
}
// Count returns the count of inputs at the time the snapshot was taken.
func (s *sampleSnapshot) Count() int64 { return s.count }
// Max returns the maximal value at the time the snapshot was taken.
func (s *sampleSnapshot) Max() int64 { return s.max }
// Mean returns the mean value at the time the snapshot was taken.
func (s *sampleSnapshot) Mean() float64 { return s.mean }
// Min returns the minimal value at the time the snapshot was taken.
func (s *sampleSnapshot) Min() int64 { return s.min }
// Percentile returns an arbitrary percentile of values at the time the
// snapshot was taken.
func (s *sampleSnapshot) Percentile(p float64) float64 {
return SamplePercentile(s.values, p)
}
// Percentiles returns a slice of arbitrary percentiles of values at the time
// the snapshot was taken.
func (s *sampleSnapshot) Percentiles(ps []float64) []float64 {
return CalculatePercentiles(s.values, ps)
}
// Size returns the size of the sample at the time the snapshot was taken.
func (s *sampleSnapshot) Size() int { return len(s.values) }
// StdDev returns the standard deviation of values at the time the snapshot was
// taken.
func (s *sampleSnapshot) StdDev() float64 {
if s.variance == 0.0 {
s.variance = SampleVariance(s.mean, s.values)
}
return math.Sqrt(s.variance)
}
// Sum returns the sum of values at the time the snapshot was taken.
func (s *sampleSnapshot) Sum() int64 { return s.sum }
// Values returns a copy of the values in the sample.
func (s *sampleSnapshot) Values() []int64 {
return slices.Clone(s.values)
}
// Variance returns the variance of values at the time the snapshot was taken.
func (s *sampleSnapshot) Variance() float64 {
if s.variance == 0.0 {
s.variance = SampleVariance(s.mean, s.values)
}
return s.variance
}
// ExpDecaySample is an exponentially-decaying sample using a forward-decaying
// priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
// Decay Model for Streaming Systems".
@ -49,9 +145,6 @@ type ExpDecaySample struct {
// NewExpDecaySample constructs a new exponentially-decaying sample with the
// given reservoir size and alpha.
func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
if !Enabled {
return NilSample{}
}
s := &ExpDecaySample{
alpha: alpha,
reservoirSize: reservoirSize,
@ -79,7 +172,7 @@ func (s *ExpDecaySample) Clear() {
}
// Snapshot returns a read-only copy of the sample.
func (s *ExpDecaySample) Snapshot() SampleSnapshot {
func (s *ExpDecaySample) Snapshot() *sampleSnapshot {
s.mutex.Lock()
defer s.mutex.Unlock()
var (
@ -105,6 +198,9 @@ func (s *ExpDecaySample) Snapshot() SampleSnapshot {
// Update samples a new value.
func (s *ExpDecaySample) Update(v int64) {
if !metricsEnabled {
return
}
s.update(time.Now(), v)
}
@ -140,13 +236,6 @@ func (s *ExpDecaySample) update(t time.Time, v int64) {
}
}
// NilSample is a no-op Sample.
type NilSample struct{}
func (NilSample) Clear() {}
func (NilSample) Snapshot() SampleSnapshot { return (*emptySnapshot)(nil) }
func (NilSample) Update(v int64) {}
// SamplePercentile returns an arbitrary percentile of the slice of int64.
func SamplePercentile(values []int64, p float64) float64 {
return CalculatePercentiles(values, []float64{p})[0]
@ -181,114 +270,6 @@ func CalculatePercentiles(values []int64, ps []float64) []float64 {
return scores
}
// sampleSnapshot is a read-only copy of another Sample.
type sampleSnapshot struct {
count int64
values []int64
max int64
min int64
mean float64
sum int64
variance float64
}
// newSampleSnapshotPrecalculated creates a read-only sampleSnapShot, using
// precalculated sums to avoid iterating the values
func newSampleSnapshotPrecalculated(count int64, values []int64, min, max, sum int64) *sampleSnapshot {
if len(values) == 0 {
return &sampleSnapshot{
count: count,
values: values,
}
}
return &sampleSnapshot{
count: count,
values: values,
max: max,
min: min,
mean: float64(sum) / float64(len(values)),
sum: sum,
}
}
// newSampleSnapshot creates a read-only sampleSnapShot, and calculates some
// numbers.
func newSampleSnapshot(count int64, values []int64) *sampleSnapshot {
var (
max int64 = math.MinInt64
min int64 = math.MaxInt64
sum int64
)
for _, v := range values {
sum += v
if v > max {
max = v
}
if v < min {
min = v
}
}
return newSampleSnapshotPrecalculated(count, values, min, max, sum)
}
// Count returns the count of inputs at the time the snapshot was taken.
func (s *sampleSnapshot) Count() int64 { return s.count }
// Max returns the maximal value at the time the snapshot was taken.
func (s *sampleSnapshot) Max() int64 { return s.max }
// Mean returns the mean value at the time the snapshot was taken.
func (s *sampleSnapshot) Mean() float64 { return s.mean }
// Min returns the minimal value at the time the snapshot was taken.
func (s *sampleSnapshot) Min() int64 { return s.min }
// Percentile returns an arbitrary percentile of values at the time the
// snapshot was taken.
func (s *sampleSnapshot) Percentile(p float64) float64 {
return SamplePercentile(s.values, p)
}
// Percentiles returns a slice of arbitrary percentiles of values at the time
// the snapshot was taken.
func (s *sampleSnapshot) Percentiles(ps []float64) []float64 {
return CalculatePercentiles(s.values, ps)
}
// Size returns the size of the sample at the time the snapshot was taken.
func (s *sampleSnapshot) Size() int { return len(s.values) }
// Snapshot returns the snapshot.
func (s *sampleSnapshot) Snapshot() SampleSnapshot { return s }
// StdDev returns the standard deviation of values at the time the snapshot was
// taken.
func (s *sampleSnapshot) StdDev() float64 {
if s.variance == 0.0 {
s.variance = SampleVariance(s.mean, s.values)
}
return math.Sqrt(s.variance)
}
// Sum returns the sum of values at the time the snapshot was taken.
func (s *sampleSnapshot) Sum() int64 { return s.sum }
// Values returns a copy of the values in the sample.
func (s *sampleSnapshot) Values() []int64 {
values := make([]int64, len(s.values))
copy(values, s.values)
return values
}
// Variance returns the variance of values at the time the snapshot was taken.
func (s *sampleSnapshot) Variance() float64 {
if s.variance == 0.0 {
s.variance = SampleVariance(s.mean, s.values)
}
return s.variance
}
// SampleVariance returns the variance of the slice of int64.
func SampleVariance(mean float64, values []int64) float64 {
if len(values) == 0 {
@ -302,7 +283,7 @@ func SampleVariance(mean float64, values []int64) float64 {
return sum / float64(len(values))
}
// A uniform sample using Vitter's Algorithm R.
// UniformSample implements a uniform sample using Vitter's Algorithm R.
//
// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
type UniformSample struct {
@ -316,9 +297,6 @@ type UniformSample struct {
// NewUniformSample constructs a new uniform sample with the given reservoir
// size.
func NewUniformSample(reservoirSize int) Sample {
if !Enabled {
return NilSample{}
}
return &UniformSample{
reservoirSize: reservoirSize,
values: make([]int64, 0, reservoirSize),
@ -336,14 +314,13 @@ func (s *UniformSample) Clear() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.count = 0
s.values = make([]int64, 0, s.reservoirSize)
clear(s.values)
}
// Snapshot returns a read-only copy of the sample.
func (s *UniformSample) Snapshot() SampleSnapshot {
func (s *UniformSample) Snapshot() *sampleSnapshot {
s.mutex.Lock()
values := make([]int64, len(s.values))
copy(values, s.values)
values := slices.Clone(s.values)
count := s.count
s.mutex.Unlock()
return newSampleSnapshot(count, values)
@ -351,21 +328,24 @@ func (s *UniformSample) Snapshot() SampleSnapshot {
// Update samples a new value.
func (s *UniformSample) Update(v int64) {
if !metricsEnabled {
return
}
s.mutex.Lock()
defer s.mutex.Unlock()
s.count++
if len(s.values) < s.reservoirSize {
s.values = append(s.values, v)
return
}
var r int64
if s.rand != nil {
r = s.rand.Int63n(s.count)
} else {
var r int64
if s.rand != nil {
r = s.rand.Int63n(s.count)
} else {
r = rand.Int63n(s.count)
}
if r < int64(len(s.values)) {
s.values[int(r)] = v
}
r = rand.Int63n(s.count)
}
if r < int64(len(s.values)) {
s.values[int(r)] = v
}
}

@ -86,7 +86,7 @@ func TestExpDecaySample(t *testing.T) {
if have, want := snap.Size(), min(tc.updates, tc.reservoirSize); have != want {
t.Errorf("unexpected size: have %d want %d", have, want)
}
values := snap.(*sampleSnapshot).values
values := snap.values
if have, want := len(values), min(tc.updates, tc.reservoirSize); have != want {
t.Errorf("unexpected values length: have %d want %d", have, want)
}
@ -111,8 +111,7 @@ func TestExpDecaySampleNanosecondRegression(t *testing.T) {
for i := 0; i < 1000; i++ {
sw.Update(20)
}
s := sw.Snapshot()
v := s.(*sampleSnapshot).values
v := sw.Snapshot().values
avg := float64(0)
for i := 0; i < len(v); i++ {
avg += float64(v[i])
@ -166,7 +165,7 @@ func TestUniformSample(t *testing.T) {
if size := s.Size(); size != 100 {
t.Errorf("s.Size(): 100 != %v\n", size)
}
values := s.(*sampleSnapshot).values
values := s.values
if l := len(values); l != 100 {
t.Errorf("len(s.Values()): 100 != %v\n", l)
@ -184,8 +183,7 @@ func TestUniformSampleIncludesTail(t *testing.T) {
for i := 0; i < max; i++ {
sw.Update(int64(i))
}
s := sw.Snapshot()
v := s.(*sampleSnapshot).values
v := sw.Snapshot().values
sum := 0
exp := (max - 1) * max / 2
for i := 0; i < len(v); i++ {
@ -220,7 +218,7 @@ func benchmarkSample(b *testing.B, s Sample) {
}
}
func testExpDecaySampleStatistics(t *testing.T, s SampleSnapshot) {
func testExpDecaySampleStatistics(t *testing.T, s *sampleSnapshot) {
if sum := s.Sum(); sum != 496598 {
t.Errorf("s.Sum(): 496598 != %v\n", sum)
}
@ -251,7 +249,7 @@ func testExpDecaySampleStatistics(t *testing.T, s SampleSnapshot) {
}
}
func testUniformSampleStatistics(t *testing.T, s SampleSnapshot) {
func testUniformSampleStatistics(t *testing.T, s *sampleSnapshot) {
if count := s.Count(); count != 10000 {
t.Errorf("s.Count(): 10000 != %v\n", count)
}

@ -15,17 +15,17 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
for range time.Tick(d) {
r.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
case *Counter:
w.Info(fmt.Sprintf("counter %s: count: %d", name, metric.Snapshot().Count()))
case CounterFloat64:
case *CounterFloat64:
w.Info(fmt.Sprintf("counter %s: count: %f", name, metric.Snapshot().Count()))
case Gauge:
case *Gauge:
w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Snapshot().Value()))
case GaugeFloat64:
case *GaugeFloat64:
w.Info(fmt.Sprintf("gauge %s: value: %f", name, metric.Snapshot().Value()))
case GaugeInfo:
case *GaugeInfo:
w.Info(fmt.Sprintf("gauge %s: value: %s", name, metric.Snapshot().Value()))
case Healthcheck:
case *Healthcheck:
metric.Check()
w.Info(fmt.Sprintf("healthcheck %s: error: %v", name, metric.Error()))
case Histogram:
@ -45,7 +45,7 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
ps[3],
ps[4],
))
case Meter:
case *Meter:
m := metric.Snapshot()
w.Info(fmt.Sprintf(
"meter %s: count: %d 1-min: %.2f 5-min: %.2f 15-min: %.2f mean: %.2f",
@ -56,7 +56,7 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
m.Rate15(),
m.RateMean(),
))
case Timer:
case *Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
w.Info(fmt.Sprintf(

@ -5,47 +5,30 @@ import (
"time"
)
type TimerSnapshot interface {
HistogramSnapshot
MeterSnapshot
}
// Timer capture the duration and rate of events.
type Timer interface {
Snapshot() TimerSnapshot
Stop()
Time(func())
UpdateSince(time.Time)
Update(time.Duration)
}
// GetOrRegisterTimer returns an existing Timer or constructs and registers a
// new StandardTimer.
// new Timer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterTimer(name string, r Registry) Timer {
func GetOrRegisterTimer(name string, r Registry) *Timer {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewTimer).(Timer)
return r.GetOrRegister(name, NewTimer).(*Timer)
}
// NewCustomTimer constructs a new StandardTimer from a Histogram and a Meter.
// NewCustomTimer constructs a new Timer from a Histogram and a Meter.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewCustomTimer(h Histogram, m Meter) Timer {
if !Enabled {
return NilTimer{}
}
return &StandardTimer{
func NewCustomTimer(h Histogram, m *Meter) *Timer {
return &Timer{
histogram: h,
meter: m,
}
}
// NewRegisteredTimer constructs and registers a new StandardTimer.
// NewRegisteredTimer constructs and registers a new Timer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredTimer(name string, r Registry) Timer {
func NewRegisteredTimer(name string, r Registry) *Timer {
c := NewTimer()
if nil == r {
r = DefaultRegistry
@ -54,60 +37,47 @@ func NewRegisteredTimer(name string, r Registry) Timer {
return c
}
// NewTimer constructs a new StandardTimer using an exponentially-decaying
// NewTimer constructs a new Timer using an exponentially-decaying
// sample with the same reservoir size and alpha as UNIX load averages.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewTimer() Timer {
if !Enabled {
return NilTimer{}
}
return &StandardTimer{
func NewTimer() *Timer {
return &Timer{
histogram: NewHistogram(NewExpDecaySample(1028, 0.015)),
meter: NewMeter(),
}
}
// NilTimer is a no-op Timer.
type NilTimer struct{}
func (NilTimer) Snapshot() TimerSnapshot { return (*emptySnapshot)(nil) }
func (NilTimer) Stop() {}
func (NilTimer) Time(f func()) { f() }
func (NilTimer) Update(time.Duration) {}
func (NilTimer) UpdateSince(time.Time) {}
// StandardTimer is the standard implementation of a Timer and uses a Histogram
// and Meter.
type StandardTimer struct {
// Timer captures the duration and rate of events, using a Histogram and a Meter.
type Timer struct {
histogram Histogram
meter Meter
meter *Meter
mutex sync.Mutex
}
// Snapshot returns a read-only copy of the timer.
func (t *StandardTimer) Snapshot() TimerSnapshot {
func (t *Timer) Snapshot() *TimerSnapshot {
t.mutex.Lock()
defer t.mutex.Unlock()
return &timerSnapshot{
return &TimerSnapshot{
histogram: t.histogram.Snapshot(),
meter: t.meter.Snapshot(),
}
}
// Stop stops the meter.
func (t *StandardTimer) Stop() {
func (t *Timer) Stop() {
t.meter.Stop()
}
// Time record the duration of the execution of the given function.
func (t *StandardTimer) Time(f func()) {
func (t *Timer) Time(f func()) {
ts := time.Now()
f()
t.Update(time.Since(ts))
}
// Update the duration of an event, in nanoseconds.
func (t *StandardTimer) Update(d time.Duration) {
func (t *Timer) Update(d time.Duration) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.histogram.Update(d.Nanoseconds())
@ -116,67 +86,67 @@ func (t *StandardTimer) Update(d time.Duration) {
// UpdateSince update the duration of an event that started at a time and ends now.
// The record uses nanoseconds.
func (t *StandardTimer) UpdateSince(ts time.Time) {
func (t *Timer) UpdateSince(ts time.Time) {
t.Update(time.Since(ts))
}
// timerSnapshot is a read-only copy of another Timer.
type timerSnapshot struct {
// TimerSnapshot is a read-only copy of another Timer.
type TimerSnapshot struct {
histogram HistogramSnapshot
meter MeterSnapshot
meter *MeterSnapshot
}
// Count returns the number of events recorded at the time the snapshot was
// taken.
func (t *timerSnapshot) Count() int64 { return t.histogram.Count() }
func (t *TimerSnapshot) Count() int64 { return t.histogram.Count() }
// Max returns the maximum value at the time the snapshot was taken.
func (t *timerSnapshot) Max() int64 { return t.histogram.Max() }
func (t *TimerSnapshot) Max() int64 { return t.histogram.Max() }
// Size returns the size of the sample at the time the snapshot was taken.
func (t *timerSnapshot) Size() int { return t.histogram.Size() }
func (t *TimerSnapshot) Size() int { return t.histogram.Size() }
// Mean returns the mean value at the time the snapshot was taken.
func (t *timerSnapshot) Mean() float64 { return t.histogram.Mean() }
func (t *TimerSnapshot) Mean() float64 { return t.histogram.Mean() }
// Min returns the minimum value at the time the snapshot was taken.
func (t *timerSnapshot) Min() int64 { return t.histogram.Min() }
func (t *TimerSnapshot) Min() int64 { return t.histogram.Min() }
// Percentile returns an arbitrary percentile of sampled values at the time the
// snapshot was taken.
func (t *timerSnapshot) Percentile(p float64) float64 {
func (t *TimerSnapshot) Percentile(p float64) float64 {
return t.histogram.Percentile(p)
}
// Percentiles returns a slice of arbitrary percentiles of sampled values at
// the time the snapshot was taken.
func (t *timerSnapshot) Percentiles(ps []float64) []float64 {
func (t *TimerSnapshot) Percentiles(ps []float64) []float64 {
return t.histogram.Percentiles(ps)
}
// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (t *timerSnapshot) Rate1() float64 { return t.meter.Rate1() }
func (t *TimerSnapshot) Rate1() float64 { return t.meter.Rate1() }
// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (t *timerSnapshot) Rate5() float64 { return t.meter.Rate5() }
func (t *TimerSnapshot) Rate5() float64 { return t.meter.Rate5() }
// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (t *timerSnapshot) Rate15() float64 { return t.meter.Rate15() }
func (t *TimerSnapshot) Rate15() float64 { return t.meter.Rate15() }
// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (t *timerSnapshot) RateMean() float64 { return t.meter.RateMean() }
func (t *TimerSnapshot) RateMean() float64 { return t.meter.RateMean() }
// StdDev returns the standard deviation of the values at the time the snapshot
// was taken.
func (t *timerSnapshot) StdDev() float64 { return t.histogram.StdDev() }
func (t *TimerSnapshot) StdDev() float64 { return t.histogram.StdDev() }
// Sum returns the sum at the time the snapshot was taken.
func (t *timerSnapshot) Sum() int64 { return t.histogram.Sum() }
func (t *TimerSnapshot) Sum() int64 { return t.histogram.Sum() }
// Variance returns the variance of the values at the time the snapshot was
// taken.
func (t *timerSnapshot) Variance() float64 { return t.histogram.Variance() }
func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() }

@ -26,22 +26,22 @@ func WriteOnce(r Registry, w io.Writer) {
slices.SortFunc(namedMetrics, namedMetric.cmp)
for _, namedMetric := range namedMetrics {
switch metric := namedMetric.m.(type) {
case Counter:
case *Counter:
fmt.Fprintf(w, "counter %s\n", namedMetric.name)
fmt.Fprintf(w, " count: %9d\n", metric.Snapshot().Count())
case CounterFloat64:
case *CounterFloat64:
fmt.Fprintf(w, "counter %s\n", namedMetric.name)
fmt.Fprintf(w, " count: %f\n", metric.Snapshot().Count())
case Gauge:
case *Gauge:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %9d\n", metric.Snapshot().Value())
case GaugeFloat64:
case *GaugeFloat64:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %f\n", metric.Snapshot().Value())
case GaugeInfo:
case *GaugeInfo:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %s\n", metric.Snapshot().Value().String())
case Healthcheck:
case *Healthcheck:
metric.Check()
fmt.Fprintf(w, "healthcheck %s\n", namedMetric.name)
fmt.Fprintf(w, " error: %v\n", metric.Error())
@ -59,7 +59,7 @@ func WriteOnce(r Registry, w io.Writer) {
fmt.Fprintf(w, " 95%%: %12.2f\n", ps[2])
fmt.Fprintf(w, " 99%%: %12.2f\n", ps[3])
fmt.Fprintf(w, " 99.9%%: %12.2f\n", ps[4])
case Meter:
case *Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "meter %s\n", namedMetric.name)
fmt.Fprintf(w, " count: %9d\n", m.Count())
@ -67,7 +67,7 @@ func WriteOnce(r Registry, w io.Writer) {
fmt.Fprintf(w, " 5-min rate: %12.2f\n", m.Rate5())
fmt.Fprintf(w, " 15-min rate: %12.2f\n", m.Rate15())
fmt.Fprintf(w, " mean rate: %12.2f\n", m.RateMean())
case Timer:
case *Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "timer %s\n", namedMetric.name)

@ -34,7 +34,7 @@ const (
)
var (
bucketsCounter []metrics.Counter
bucketsCounter []*metrics.Counter
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
)
@ -53,7 +53,7 @@ type meteredUdpConn struct {
func newMeteredConn(conn UDPConn) UDPConn {
// Short circuit if metrics are disabled
if !metrics.Enabled {
if !metrics.Enabled() {
return conn
}
return &meteredUdpConn{udpConn: conn}

@ -570,7 +570,7 @@ func (tab *Table) nodeAdded(b *bucket, n *tableNode) {
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(b, n)
}
if metrics.Enabled {
if metrics.Enabled() {
bucketsCounter[b.index].Inc(1)
}
}
@ -580,7 +580,7 @@ func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
if tab.nodeRemovedHook != nil {
tab.nodeRemovedHook(b, n)
}
if metrics.Enabled {
if metrics.Enabled() {
bucketsCounter[b.index].Dec(1)
}
}

@ -37,19 +37,19 @@ const (
)
var (
activePeerGauge metrics.Gauge = metrics.NilGauge{}
activeInboundPeerGauge metrics.Gauge = metrics.NilGauge{}
activeOutboundPeerGauge metrics.Gauge = metrics.NilGauge{}
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
activeInboundPeerGauge = metrics.NewRegisteredGauge("p2p/peers/inbound", nil)
activeOutboundPeerGauge = metrics.NewRegisteredGauge("p2p/peers/outbound", nil)
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/ingress", nil)
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/egress", nil)
// general ingress/egress connection meters
serveMeter metrics.Meter = metrics.NilMeter{}
serveSuccessMeter metrics.Meter = metrics.NilMeter{}
dialMeter metrics.Meter = metrics.NilMeter{}
dialSuccessMeter metrics.Meter = metrics.NilMeter{}
dialConnectionError metrics.Meter = metrics.NilMeter{}
serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil)
dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil)
dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil)
// handshake error meters
dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil)
@ -61,25 +61,10 @@ var (
dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil)
)
func init() {
if !metrics.Enabled {
return
}
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
activeInboundPeerGauge = metrics.NewRegisteredGauge("p2p/peers/inbound", nil)
activeOutboundPeerGauge = metrics.NewRegisteredGauge("p2p/peers/outbound", nil)
serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil)
dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil)
dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil)
}
// markDialError matches errors that occur while setting up a dial connection
// to the corresponding meter.
func markDialError(err error) {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
if err2 := errors.Unwrap(err); err2 != nil {
@ -113,7 +98,7 @@ type meteredConn struct {
// connection meter and also increases the metered peer count. If the metrics
// system is disabled, function returns the original connection.
func newMeteredConn(conn net.Conn) net.Conn {
if !metrics.Enabled {
if !metrics.Enabled() {
return conn
}
return &meteredConn{Conn: conn}

@ -357,7 +357,7 @@ func (p *Peer) handle(msg Msg) error {
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
if metrics.Enabled {
if metrics.Enabled() {
m := fmt.Sprintf("%s/%s/%d/%#02x", ingressMeterName, proto.Name, proto.Version, msg.Code-proto.offset)
metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)

@ -84,7 +84,7 @@ func New(protocol string, timeout time.Duration) *Tracker {
// Track adds a network request to the tracker to wait for a response to arrive
// or until the request it cancelled or times out.
func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
t.lock.Lock()
@ -163,7 +163,7 @@ func (t *Tracker) schedule() {
// Fulfil fills a pending request, if any is available, reporting on various metrics.
func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
t.lock.Lock()

@ -98,7 +98,7 @@ func (t *rlpxTransport) WriteMsg(msg Msg) error {
// Set metrics.
msg.meterSize = size
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
if metrics.Enabled() && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode)
metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)

@ -43,7 +43,7 @@ func (c *counter) add(size int) {
}
// report uploads the cached statistics to meters.
func (c *counter) report(count metrics.Meter, size metrics.Meter) {
func (c *counter) report(count, size *metrics.Meter) {
count.Mark(int64(c.n))
size.Mark(int64(c.size))
}

Loading…
Cancel
Save