|
|
|
@ -36,6 +36,13 @@ type AccountingMetrics struct { |
|
|
|
|
//for a graceful cleanup
|
|
|
|
|
func (am *AccountingMetrics) Close() { |
|
|
|
|
close(am.reporter.quit) |
|
|
|
|
// wait for reporter loop to finish saving metrics
|
|
|
|
|
// before reporter database is closed
|
|
|
|
|
select { |
|
|
|
|
case <-time.After(10 * time.Second): |
|
|
|
|
log.Error("accounting metrics reporter timeout") |
|
|
|
|
case <-am.reporter.done: |
|
|
|
|
} |
|
|
|
|
am.reporter.db.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -46,6 +53,7 @@ type reporter struct { |
|
|
|
|
interval time.Duration //duration at which the reporter will persist metrics
|
|
|
|
|
db *leveldb.DB //the actual DB
|
|
|
|
|
quit chan struct{} //quit the reporter loop
|
|
|
|
|
done chan struct{} //signal that reporter loop is done
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//NewMetricsDB creates a new LevelDB instance used to persist metrics defined
|
|
|
|
@ -92,6 +100,7 @@ func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *Acc |
|
|
|
|
interval: d, |
|
|
|
|
db: db, |
|
|
|
|
quit: make(chan struct{}), |
|
|
|
|
done: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//run the go routine
|
|
|
|
@ -106,6 +115,9 @@ func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *Acc |
|
|
|
|
|
|
|
|
|
//run is the goroutine which periodically sends the metrics to the configured LevelDB
|
|
|
|
|
func (r *reporter) run() { |
|
|
|
|
// signal that the reporter loop is done
|
|
|
|
|
defer close(r.done) |
|
|
|
|
|
|
|
|
|
intervalTicker := time.NewTicker(r.interval) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
@ -121,6 +133,9 @@ func (r *reporter) run() { |
|
|
|
|
} |
|
|
|
|
case <-r.quit: |
|
|
|
|
//graceful shutdown
|
|
|
|
|
if err := r.save(); err != nil { |
|
|
|
|
log.Error("unable to send metrics to LevelDB", "err", err) |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|