swarm/metrics: Send the accounting registry to InfluxDB (#18470)

pull/18523/head
Jerzy Lasyk 6 years ago committed by Anton Evangelatov
parent 2abeb35d54
commit f28da4f602
  1. 5
      metrics/registry.go
  2. 35
      p2p/protocols/accounting.go
  3. 28
      p2p/protocols/reporter_test.go
  4. 25
      swarm/metrics/flags.go

@ -312,8 +312,9 @@ func (r *PrefixedRegistry) UnregisterAll() {
} }
var ( var (
DefaultRegistry = NewRegistry() DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry() EphemeralRegistry = NewRegistry()
AccountingRegistry = NewRegistry() // registry used in swarm
) )
// Call the given function for each registered metric. // Call the given function for each registered metric.

@ -27,23 +27,21 @@ var (
// All metrics are cumulative // All metrics are cumulative
// total amount of units credited // total amount of units credited
mBalanceCredit metrics.Counter mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", metrics.AccountingRegistry)
// total amount of units debited // total amount of units debited
mBalanceDebit metrics.Counter mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", metrics.AccountingRegistry)
// total amount of bytes credited // total amount of bytes credited
mBytesCredit metrics.Counter mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", metrics.AccountingRegistry)
// total amount of bytes debited // total amount of bytes debited
mBytesDebit metrics.Counter mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", metrics.AccountingRegistry)
// total amount of credited messages // total amount of credited messages
mMsgCredit metrics.Counter mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", metrics.AccountingRegistry)
// total amount of debited messages // total amount of debited messages
mMsgDebit metrics.Counter mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", metrics.AccountingRegistry)
// how many times local node had to drop remote peers // how many times local node had to drop remote peers
mPeerDrops metrics.Counter mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", metrics.AccountingRegistry)
// how many times local node overdrafted and dropped // how many times local node overdrafted and dropped
mSelfDrops metrics.Counter mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", metrics.AccountingRegistry)
MetricsRegistry metrics.Registry
) )
// Prices defines how prices are being passed on to the accounting instance // Prices defines how prices are being passed on to the accounting instance
@ -110,24 +108,13 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah return ah
} }
// SetupAccountingMetrics creates a separate registry for p2p accounting metrics; // SetupAccountingMetrics uses a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints. // this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which // It also starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB // at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting // create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path) return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path)
} }
// Send takes a peer, a size and a msg and // Send takes a peer, a size and a msg and

@ -43,21 +43,27 @@ func TestReporter(t *testing.T) {
metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.") log.Debug("Done.")
//do some metrics //change metrics
mBalanceCredit.Inc(12) mBalanceCredit.Inc(12)
mBytesCredit.Inc(34) mBytesCredit.Inc(34)
mMsgDebit.Inc(9) mMsgDebit.Inc(9)
//store expected metrics
expectedBalanceCredit := mBalanceCredit.Count()
expectedBytesCredit := mBytesCredit.Count()
expectedMsgDebit := mMsgDebit.Count()
//give the reporter time to write the metrics to DB //give the reporter time to write the metrics to DB
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
//set the metrics to nil - this effectively simulates the node having shut down...
mBalanceCredit = nil
mBytesCredit = nil
mMsgDebit = nil
//close the DB also, or we can't create a new one //close the DB also, or we can't create a new one
metrics.Close() metrics.Close()
//clear the metrics - this effectively simulates the node having shut down...
mBalanceCredit.Clear()
mBytesCredit.Clear()
mMsgDebit.Clear()
//setup the metrics again //setup the metrics again
log.Debug("Setting up metrics second time") log.Debug("Setting up metrics second time")
metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
@ -65,13 +71,13 @@ func TestReporter(t *testing.T) {
log.Debug("Done.") log.Debug("Done.")
//now check the metrics, they should have the same value as before "shutdown" //now check the metrics, they should have the same value as before "shutdown"
if mBalanceCredit.Count() != 12 { if mBalanceCredit.Count() != expectedBalanceCredit {
t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count()) t.Fatalf("Expected counter to be %d, but is %d", expectedBalanceCredit, mBalanceCredit.Count())
} }
if mBytesCredit.Count() != 34 { if mBytesCredit.Count() != expectedBytesCredit {
t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count()) t.Fatalf("Expected counter to be %d, but is %d", expectedBytesCredit, mBytesCredit.Count())
} }
if mMsgDebit.Count() != 9 { if mMsgDebit.Count() != expectedMsgDebit {
t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count()) t.Fatalf("Expected counter to be %d, but is %d", expectedMsgDebit, mMsgDebit.Count())
} }
} }

@ -31,6 +31,10 @@ var (
Name: "metrics.influxdb.export", Name: "metrics.influxdb.export",
Usage: "Enable metrics export/push to an external InfluxDB database", Usage: "Enable metrics export/push to an external InfluxDB database",
} }
MetricsEnableInfluxDBAccountingExportFlag = cli.BoolFlag{
Name: "metrics.influxdb.accounting",
Usage: "Enable accounting metrics export/push to an external InfluxDB database",
}
MetricsInfluxDBEndpointFlag = cli.StringFlag{ MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint", Name: "metrics.influxdb.endpoint",
Usage: "Metrics InfluxDB endpoint", Usage: "Metrics InfluxDB endpoint",
@ -66,6 +70,7 @@ var (
var Flags = []cli.Flag{ var Flags = []cli.Flag{
utils.MetricsEnabledFlag, utils.MetricsEnabledFlag,
MetricsEnableInfluxDBExportFlag, MetricsEnableInfluxDBExportFlag,
MetricsEnableInfluxDBAccountingExportFlag,
MetricsInfluxDBEndpointFlag, MetricsInfluxDBEndpointFlag,
MetricsInfluxDBDatabaseFlag, MetricsInfluxDBDatabaseFlag,
MetricsInfluxDBUsernameFlag, MetricsInfluxDBUsernameFlag,
@ -77,12 +82,13 @@ func Setup(ctx *cli.Context) {
if gethmetrics.Enabled { if gethmetrics.Enabled {
log.Info("Enabling swarm metrics collection") log.Info("Enabling swarm metrics collection")
var ( var (
enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) enableAccountingExport = ctx.GlobalBool(MetricsEnableInfluxDBAccountingExportFlag.Name)
database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
) )
// Start system runtime metrics collection // Start system runtime metrics collection
@ -94,5 +100,12 @@ func Setup(ctx *cli.Context) {
"host": hosttag, "host": hosttag,
}) })
} }
if enableAccountingExport {
log.Info("Exporting accounting metrics to InfluxDB")
go influxdb.InfluxDBWithTags(gethmetrics.AccountingRegistry, 10*time.Second, endpoint, database, username, password, "accounting.", map[string]string{
"host": hosttag,
})
}
} }
} }

Loading…
Cancel
Save