|
|
|
package influxdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
uurl "net/url"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
|
|
client "github.com/influxdata/influxdb1-client/v2"
|
|
|
|
)
|
|
|
|
|
|
|
|
type reporter struct {
|
|
|
|
reg metrics.Registry
|
|
|
|
interval time.Duration
|
|
|
|
|
|
|
|
url uurl.URL
|
|
|
|
database string
|
|
|
|
username string
|
|
|
|
password string
|
|
|
|
namespace string
|
|
|
|
tags map[string]string
|
|
|
|
|
|
|
|
client client.Client
|
|
|
|
|
|
|
|
cache map[string]int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
|
|
|
|
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
|
|
|
|
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
|
|
|
|
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
|
|
|
|
u, err := uurl.Parse(url)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
rep := &reporter{
|
|
|
|
reg: r,
|
|
|
|
interval: d,
|
|
|
|
url: *u,
|
|
|
|
database: database,
|
|
|
|
username: username,
|
|
|
|
password: password,
|
|
|
|
namespace: namespace,
|
|
|
|
tags: tags,
|
|
|
|
cache: make(map[string]int64),
|
|
|
|
}
|
|
|
|
if err := rep.makeClient(); err != nil {
|
|
|
|
log.Warn("Unable to make InfluxDB client", "err", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
rep.run()
|
|
|
|
}
|
|
|
|
|
|
|
|
// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
|
|
|
|
func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
|
|
|
|
u, err := uurl.Parse(url)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
rep := &reporter{
|
|
|
|
reg: r,
|
|
|
|
url: *u,
|
|
|
|
database: database,
|
|
|
|
username: username,
|
|
|
|
password: password,
|
|
|
|
namespace: namespace,
|
|
|
|
tags: tags,
|
|
|
|
cache: make(map[string]int64),
|
|
|
|
}
|
|
|
|
if err := rep.makeClient(); err != nil {
|
|
|
|
return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := rep.send(0); err != nil {
|
|
|
|
return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *reporter) makeClient() (err error) {
|
|
|
|
r.client, err = client.NewHTTPClient(client.HTTPConfig{
|
|
|
|
Addr: r.url.String(),
|
|
|
|
Username: r.username,
|
|
|
|
Password: r.password,
|
|
|
|
Timeout: 10 * time.Second,
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *reporter) run() {
|
|
|
|
intervalTicker := time.NewTicker(r.interval)
|
|
|
|
pingTicker := time.NewTicker(time.Second * 5)
|
|
|
|
|
|
|
|
defer intervalTicker.Stop()
|
|
|
|
defer pingTicker.Stop()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-intervalTicker.C:
|
|
|
|
if err := r.send(0); err != nil {
|
|
|
|
log.Warn("Unable to send to InfluxDB", "err", err)
|
|
|
|
}
|
|
|
|
case <-pingTicker.C:
|
|
|
|
_, _, err := r.client.Ping(0)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
|
|
|
|
|
|
|
|
if err = r.makeClient(); err != nil {
|
|
|
|
log.Warn("Unable to make InfluxDB client", "err", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// send sends the measurements. If provided tstamp is >0, it is used. Otherwise,
|
|
|
|
// a 'fresh' timestamp is used.
|
|
|
|
func (r *reporter) send(tstamp int64) error {
|
|
|
|
bps, err := client.NewBatchPoints(
|
|
|
|
client.BatchPointsConfig{
|
|
|
|
Database: r.database,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.reg.Each(func(name string, i interface{}) {
|
|
|
|
var now time.Time
|
|
|
|
if tstamp <= 0 {
|
|
|
|
now = time.Now()
|
|
|
|
} else {
|
|
|
|
now = time.Unix(tstamp, 0)
|
|
|
|
}
|
|
|
|
measurement, fields := readMeter(r.namespace, name, i)
|
|
|
|
if fields == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if p, err := client.NewPoint(measurement, r.tags, fields, now); err == nil {
|
|
|
|
bps.AddPoint(p)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
return r.client.Write(bps)
|
|
|
|
}
|