From 72d5a27a397f4397082df91dd1650251e828a251 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 10 Sep 2019 14:39:07 +0300 Subject: [PATCH] core, metrics, p2p: switch some invalid counters to gauges --- core/rawdb/freezer.go | 8 ++--- core/rawdb/freezer_table.go | 22 ++++++------ core/rawdb/freezer_table_test.go | 62 ++++++++++++++++---------------- core/tx_pool.go | 38 ++++++++++---------- metrics/gauge.go | 38 ++++++++++++++++++++ p2p/metrics.go | 10 +++--- 6 files changed, 108 insertions(+), 70 deletions(-) diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 41677fbba..5497c59d4 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -80,9 +80,9 @@ type freezer struct { func newFreezer(datadir string, namespace string) (*freezer, error) { // Create the initial freezer object var ( - readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) - writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) - sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil) + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil) ) // Ensure the datadir is not a symbolic link if it exists. if info, err := os.Lstat(datadir); !os.IsNotExist(err) { @@ -103,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { instanceLock: lock, } for name, disableSnappy := range freezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy) + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 61804f1f2..9fb341f02 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -94,18 +94,18 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file - readMeter metrics.Meter // Meter for measuring the effective amount of data read - writeMeter metrics.Meter // Meter for measuring the effective amount of data written - sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables + headBytes uint32 // Number of bytes written to the head file + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables logger log.Logger // Logger with database path and table name ambedded lock sync.RWMutex // Mutex protecting the data file descriptors } // newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy) +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -149,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error { // newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -172,7 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, - sizeCounter: sizeCounter, + sizeGauge: sizeGauge, name: name, path: path, logger: log.New("database", path, "table", name), @@ -189,7 +189,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete tab.Close() return nil, err } - tab.sizeCounter.Inc(int64(size)) + tab.sizeGauge.Inc(int64(size)) return tab, nil } @@ -378,7 +378,7 @@ func (t *freezerTable) truncate(items uint64) error { if err != nil { return err } - t.sizeCounter.Dec(int64(oldSize - newSize)) + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -510,7 +510,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.index.Write(idx.marshallBinary()) t.writeMeter.Mark(int64(bLen + indexEntrySize)) - t.sizeCounter.Inc(int64(bLen + indexEntrySize)) + t.sizeGauge.Inc(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) return nil diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 61ba7a17e..7de108151 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -56,7 +56,7 @@ func TestFreezerBasics(t *testing.T) { // set cutoff at 50 bytes f, err := newCustomTable(os.TempDir(), fmt.Sprintf("unittest-%d", rand.Uint64()), - metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter(), 50, true) + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true) if err != nil { t.Fatal(err) } @@ -99,11 +99,11 @@ func TestFreezerBasicsClosing(t *testing.T) { // set cutoff at 50 bytes var ( fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) - rm, wm, sc = metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg = metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() f *freezerTable err error ) - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func TestFreezerBasicsClosing(t *testing.T) { data := getChunk(15, x) f.Append(uint64(x), data) f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -129,7 +129,7 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -139,11 +139,11 @@ func TestFreezerBasicsClosing(t *testing.T) { // TestFreezerRepairDanglingHead tests that we can recover if index entries are removed func TestFreezerRepairDanglingHead(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) { idxFile.Close() // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -190,11 +190,11 @@ func TestFreezerRepairDanglingHead(t *testing.T) { // TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -222,7 +222,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { idxFile.Close() // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -243,7 +243,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { } // And if we open it, we should now be able to read all of them (new values) { - f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) @@ -260,11 +260,11 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // TestSnappyDetection tests that we fail to open a snappy database and vice versa func TestSnappyDetection(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -277,7 +277,7 @@ func TestSnappyDetection(t *testing.T) { } // Open without snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, false) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, false) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestSnappyDetection(t *testing.T) { // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -317,11 +317,11 @@ func assertFileSize(f string, size int64) error { // the index is repaired func TestFreezerRepairDanglingIndex(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -357,7 +357,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { // 45, 45, 15 // with 3+3+1 items { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -374,11 +374,11 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { func TestFreezerTruncate(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncation-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -395,7 +395,7 @@ func TestFreezerTruncate(t *testing.T) { } // Reopen, truncate { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -417,10 +417,10 @@ func TestFreezerTruncate(t *testing.T) { // That will rewind the index, and _should_ truncate the head file func TestFreezerRepairFirstFile(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -448,7 +448,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { } // Reopen { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -473,10 +473,10 @@ func TestFreezerRepairFirstFile(t *testing.T) { // - check that we did not keep the rdonly file descriptors func TestFreezerReadAndTruncate(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -493,7 +493,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Reopen and read all files { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -519,10 +519,10 @@ func TestFreezerReadAndTruncate(t *testing.T) { func TestOffset(t *testing.T) { t.Parallel() - rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("offset-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } @@ -578,7 +578,7 @@ func TestOffset(t *testing.T) { } // Now open again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } diff --git a/core/tx_pool.go b/core/tx_pool.go index a49b42261..161489cc5 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -101,9 +101,9 @@ var ( invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) - pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil) - queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil) - localCounter = metrics.NewRegisteredCounter("txpool/local", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -628,7 +628,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } } if local || pool.locals.contains(from) { - localCounter.Inc(1) + localGauge.Inc(1) } pool.journalTx(from, tx) @@ -658,7 +658,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er queuedReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the queued counter - queuedCounter.Inc(1) + queuedGauge.Inc(1) } if pool.all.Get(hash) == nil { pool.all.Add(tx) @@ -707,7 +707,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pendingReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the pending counter - pendingCounter.Inc(1) + pendingGauge.Inc(1) } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(hash) == nil { @@ -841,7 +841,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.priced.Removed(1) } if pool.locals.contains(addr) { - localCounter.Dec(1) + localGauge.Dec(1) } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { @@ -858,7 +858,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) // Reduce the pending counter - pendingCounter.Dec(int64(1 + len(invalids))) + pendingGauge.Dec(int64(1 + len(invalids))) return } } @@ -866,7 +866,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { if future := pool.queue[addr]; future != nil { if removed, _ := future.Remove(tx); removed { // Reduce the queued counter - queuedCounter.Dec(1) + queuedGauge.Dec(1) } if future.Empty() { delete(pool.queue, addr) @@ -1164,7 +1164,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans promoted = append(promoted, tx) } } - queuedCounter.Dec(int64(len(readies))) + queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit var caps types.Transactions @@ -1179,9 +1179,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans } // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) - queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) } // Delete the entire queue entry if it became empty. if list.Empty() { @@ -1240,9 +1240,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(offenders[i]) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1267,9 +1267,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1353,9 +1353,9 @@ func (pool *TxPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) } // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { @@ -1365,7 +1365,7 @@ func (pool *TxPool) demoteUnexecutables() { log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(gapped))) + pendingGauge.Dec(int64(len(gapped))) } // Delete the entire queue entry if it became empty. if list.Empty() { diff --git a/metrics/gauge.go b/metrics/gauge.go index 0fbfdb860..b6b2758b0 100644 --- a/metrics/gauge.go +++ b/metrics/gauge.go @@ -6,6 +6,8 @@ import "sync/atomic" type Gauge interface { Snapshot() Gauge Update(int64) + Dec(int64) + Inc(int64) Value() int64 } @@ -65,6 +67,16 @@ func (GaugeSnapshot) Update(int64) { panic("Update called on a GaugeSnapshot") } +// Dec panics. +func (GaugeSnapshot) Dec(int64) { + panic("Dec called on a GaugeSnapshot") +} + +// Inc panics. +func (GaugeSnapshot) Inc(int64) { + panic("Inc called on a GaugeSnapshot") +} + // Value returns the value at the time the snapshot was taken. func (g GaugeSnapshot) Value() int64 { return int64(g) } @@ -77,6 +89,12 @@ func (NilGauge) Snapshot() Gauge { return NilGauge{} } // Update is a no-op. func (NilGauge) Update(v int64) {} +// Dec is a no-op. +func (NilGauge) Dec(i int64) {} + +// Inc is a no-op. +func (NilGauge) Inc(i int64) {} + // Value is a no-op. func (NilGauge) Value() int64 { return 0 } @@ -101,6 +119,16 @@ func (g *StandardGauge) Value() int64 { return atomic.LoadInt64(&g.value) } +// Dec decrements the gauge's current value by the given amount. +func (g *StandardGauge) Dec(i int64) { + atomic.AddInt64(&g.value, -i) +} + +// Inc increments the gauge's current value by the given amount. +func (g *StandardGauge) Inc(i int64) { + atomic.AddInt64(&g.value, i) +} + // FunctionalGauge returns value from given function type FunctionalGauge struct { value func() int64 @@ -118,3 +146,13 @@ func (g FunctionalGauge) Snapshot() Gauge { return GaugeSnapshot(g.Value()) } func (FunctionalGauge) Update(int64) { panic("Update called on a FunctionalGauge") } + +// Dec panics. +func (FunctionalGauge) Dec(int64) { + panic("Dec called on a FunctionalGauge") +} + +// Inc panics. +func (FunctionalGauge) Inc(int64) { + panic("Inc called on a FunctionalGauge") +} diff --git a/p2p/metrics.go b/p2p/metrics.go index c04e5ab4c..8b29efdcd 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -45,7 +45,7 @@ var ( ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic - activePeerCounter = metrics.NewRegisteredCounter("p2p/peers", nil) // Gauge tracking the current peer count + activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) // Gauge tracking the current peer count PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress @@ -124,7 +124,7 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { } else { egressConnectMeter.Mark(1) } - activePeerCounter.Inc(1) + activePeerGauge.Inc(1) return &meteredConn{ Conn: conn, @@ -200,7 +200,7 @@ func (c *meteredConn) Close() error { IP: c.ip, Elapsed: time.Since(c.connected), }) - activePeerCounter.Dec(1) + activePeerGauge.Dec(1) return err } id := c.id @@ -212,7 +212,7 @@ func (c *meteredConn) Close() error { IP: c.ip, ID: id, }) - activePeerCounter.Dec(1) + activePeerGauge.Dec(1) return err } ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()) @@ -233,6 +233,6 @@ func (c *meteredConn) Close() error { Ingress: ingress, Egress: egress, }) - activePeerCounter.Dec(1) + activePeerGauge.Dec(1) return err }