From f28da4f602fcd17624cf6d40d070253dd6663121 Mon Sep 17 00:00:00 2001 From: Jerzy Lasyk Date: Thu, 24 Jan 2019 18:57:20 +0100 Subject: [PATCH] swarm/metrics: Send the accounting registry to InfluxDB (#18470) --- metrics/registry.go | 5 +++-- p2p/protocols/accounting.go | 35 +++++++++++----------------------- p2p/protocols/reporter_test.go | 28 ++++++++++++++++----------- swarm/metrics/flags.go | 25 ++++++++++++++++++------ 4 files changed, 50 insertions(+), 43 deletions(-) diff --git a/metrics/registry.go b/metrics/registry.go index c1cf7906ce..c5435adf24 100644 --- a/metrics/registry.go +++ b/metrics/registry.go @@ -312,8 +312,9 @@ func (r *PrefixedRegistry) UnregisterAll() { } var ( - DefaultRegistry = NewRegistry() - EphemeralRegistry = NewRegistry() + DefaultRegistry = NewRegistry() + EphemeralRegistry = NewRegistry() + AccountingRegistry = NewRegistry() // registry used in swarm ) // Call the given function for each registered metric. diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index bdc490e591..558247254a 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -27,23 +27,21 @@ var ( // All metrics are cumulative // total amount of units credited - mBalanceCredit metrics.Counter + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", metrics.AccountingRegistry) // total amount of units debited - mBalanceDebit metrics.Counter + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", metrics.AccountingRegistry) // total amount of bytes credited - mBytesCredit metrics.Counter + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", metrics.AccountingRegistry) // total amount of bytes debited - mBytesDebit metrics.Counter + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", metrics.AccountingRegistry) // total amount of credited messages - mMsgCredit metrics.Counter + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", metrics.AccountingRegistry) // total amount of debited messages - mMsgDebit metrics.Counter + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", metrics.AccountingRegistry) // how many times local node had to drop remote peers - mPeerDrops metrics.Counter + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", metrics.AccountingRegistry) // how many times local node overdrafted and dropped - mSelfDrops metrics.Counter - - MetricsRegistry metrics.Registry + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", metrics.AccountingRegistry) ) // Prices defines how prices are being passed on to the accounting instance @@ -110,24 +108,13 @@ func NewAccounting(balance Balance, po Prices) *Accounting { return ah } -// SetupAccountingMetrics creates a separate registry for p2p accounting metrics; +// SetupAccountingMetrics uses a separate registry for p2p accounting metrics; // this registry should be independent of any other metrics as it persists at different endpoints. -// It also instantiates the given metrics and starts the persisting go-routine which +// It also starts the persisting go-routine which // at the passed interval writes the metrics to a LevelDB func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { - // create an empty registry - MetricsRegistry = metrics.NewRegistry() - // instantiate the metrics - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry) - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry) - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry) - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry) - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry) - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry) - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry) - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry) // create the DB and start persisting - return NewAccountingMetrics(MetricsRegistry, reportInterval, path) + return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path) } // Send takes a peer, a size and a msg and diff --git a/p2p/protocols/reporter_test.go b/p2p/protocols/reporter_test.go index 8f27d07e89..9b0da09b79 100644 --- a/p2p/protocols/reporter_test.go +++ b/p2p/protocols/reporter_test.go @@ -43,21 +43,27 @@ func TestReporter(t *testing.T) { metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) log.Debug("Done.") - //do some metrics + //change metrics mBalanceCredit.Inc(12) mBytesCredit.Inc(34) mMsgDebit.Inc(9) + //store expected metrics + expectedBalanceCredit := mBalanceCredit.Count() + expectedBytesCredit := mBytesCredit.Count() + expectedMsgDebit := mMsgDebit.Count() + //give the reporter time to write the metrics to DB time.Sleep(20 * time.Millisecond) - //set the metrics to nil - this effectively simulates the node having shut down... - mBalanceCredit = nil - mBytesCredit = nil - mMsgDebit = nil //close the DB also, or we can't create a new one metrics.Close() + //clear the metrics - this effectively simulates the node having shut down... + mBalanceCredit.Clear() + mBytesCredit.Clear() + mMsgDebit.Clear() + //setup the metrics again log.Debug("Setting up metrics second time") metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) @@ -65,13 +71,13 @@ func TestReporter(t *testing.T) { log.Debug("Done.") //now check the metrics, they should have the same value as before "shutdown" - if mBalanceCredit.Count() != 12 { - t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count()) + if mBalanceCredit.Count() != expectedBalanceCredit { + t.Fatalf("Expected counter to be %d, but is %d", expectedBalanceCredit, mBalanceCredit.Count()) } - if mBytesCredit.Count() != 34 { - t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count()) + if mBytesCredit.Count() != expectedBytesCredit { + t.Fatalf("Expected counter to be %d, but is %d", expectedBytesCredit, mBytesCredit.Count()) } - if mMsgDebit.Count() != 9 { - t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count()) + if mMsgDebit.Count() != expectedMsgDebit { + t.Fatalf("Expected counter to be %d, but is %d", expectedMsgDebit, mMsgDebit.Count()) } } diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go index 7c12120a60..38d30d9970 100644 --- a/swarm/metrics/flags.go +++ b/swarm/metrics/flags.go @@ -31,6 +31,10 @@ var ( Name: "metrics.influxdb.export", Usage: "Enable metrics export/push to an external InfluxDB database", } + MetricsEnableInfluxDBAccountingExportFlag = cli.BoolFlag{ + Name: "metrics.influxdb.accounting", + Usage: "Enable accounting metrics export/push to an external InfluxDB database", + } MetricsInfluxDBEndpointFlag = cli.StringFlag{ Name: "metrics.influxdb.endpoint", Usage: "Metrics InfluxDB endpoint", @@ -66,6 +70,7 @@ var ( var Flags = []cli.Flag{ utils.MetricsEnabledFlag, MetricsEnableInfluxDBExportFlag, + MetricsEnableInfluxDBAccountingExportFlag, MetricsInfluxDBEndpointFlag, MetricsInfluxDBDatabaseFlag, MetricsInfluxDBUsernameFlag, @@ -77,12 +82,13 @@ func Setup(ctx *cli.Context) { if gethmetrics.Enabled { log.Info("Enabling swarm metrics collection") var ( - enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) - endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) - database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) - username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) - password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) + enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) + enableAccountingExport = ctx.GlobalBool(MetricsEnableInfluxDBAccountingExportFlag.Name) + endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) ) // Start system runtime metrics collection @@ -94,5 +100,12 @@ func Setup(ctx *cli.Context) { "host": hosttag, }) } + + if enableAccountingExport { + log.Info("Exporting accounting metrics to InfluxDB") + go influxdb.InfluxDBWithTags(gethmetrics.AccountingRegistry, 10*time.Second, endpoint, database, username, password, "accounting.", map[string]string{ + "host": hosttag, + }) + } } }