@ -22,8 +22,6 @@ package leveldb
import (
import (
"fmt"
"fmt"
"strconv"
"strings"
"sync"
"sync"
"time"
"time"
@ -77,6 +75,8 @@ type Database struct {
seekCompGauge metrics . Gauge // Gauge for tracking the number of table compaction caused by read opt
seekCompGauge metrics . Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics . Gauge // Gauge to track the amount of memory that has been manually allocated (not a part of runtime/GC)
manualMemAllocGauge metrics . Gauge // Gauge to track the amount of memory that has been manually allocated (not a part of runtime/GC)
levelsGauge [ 7 ] metrics . Gauge // Gauge for tracking the number of tables in levels
quitLock sync . Mutex // Mutex protecting the quit channel access
quitLock sync . Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
@ -146,6 +146,11 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option
ldb . seekCompGauge = metrics . NewRegisteredGauge ( namespace + "compact/seek" , nil )
ldb . seekCompGauge = metrics . NewRegisteredGauge ( namespace + "compact/seek" , nil )
ldb . manualMemAllocGauge = metrics . NewRegisteredGauge ( namespace + "memory/manualalloc" , nil )
ldb . manualMemAllocGauge = metrics . NewRegisteredGauge ( namespace + "memory/manualalloc" , nil )
// leveldb has only up to 7 levels
for i := range ldb . levelsGauge {
ldb . levelsGauge [ i ] = metrics . NewRegisteredGauge ( namespace + fmt . Sprintf ( "tables/level%v" , i ) , nil )
}
// Start up the metrics gathering and return
// Start up the metrics gathering and return
go ldb . meter ( metricsGatheringInterval )
go ldb . meter ( metricsGatheringInterval )
return ldb , nil
return ldb , nil
@ -266,122 +271,63 @@ func (db *Database) Path() string {
// meter periodically retrieves internal leveldb counters and reports them to
// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
// the metrics subsystem.
//
// This is how a LevelDB stats table looks like (currently):
//
// Compactions
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
// -------+------------+---------------+---------------+---------------+---------------
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
//
// This is how the write delay look like (currently):
// DelayN:5 Delay:406.604657ms Paused: false
//
// This is how the iostats look like (currently):
// Read(MB):3895.04860 Write(MB):3654.64712
func ( db * Database ) meter ( refresh time . Duration ) {
func ( db * Database ) meter ( refresh time . Duration ) {
// Create the counters to store current and previous compaction values
// Create the counters to store current and previous compaction values
compactions := make ( [ ] [ ] floa t64, 2 )
compactions := make ( [ ] [ ] int64 , 2 )
for i := 0 ; i < 2 ; i ++ {
for i := 0 ; i < 2 ; i ++ {
compactions [ i ] = make ( [ ] floa t64, 4 )
compactions [ i ] = make ( [ ] int64 , 4 )
}
}
// Create storage for iostats.
// Create storages for states and warning log tracer.
var iostats [ 2 ] float64
// Create storage and warning log tracer for write delay.
var (
delaystats [ 2 ] int64
lastWritePaused time . Time
)
var (
var (
errc chan error
errc chan error
merr error
merr error
)
stats leveldb . DBStats
iostats [ 2 ] int64
delaystats [ 2 ] int64
lastWritePaused time . Time
)
timer := time . NewTimer ( refresh )
timer := time . NewTimer ( refresh )
defer timer . Stop ( )
defer timer . Stop ( )
// Iterate ad infinitum and collect the stats
// Iterate ad infinitum and collect the stats
for i := 1 ; errc == nil && merr == nil ; i ++ {
for i := 1 ; errc == nil && merr == nil ; i ++ {
// Retrieve the database stats
// Retrieve the database stats
stats , err := db . db . GetProperty ( "leveldb.stats" )
// Stats method resets buffers inside therefore it's okay to just pass the struct.
err := db . db . Stats ( & stats )
if err != nil {
if err != nil {
db . log . Error ( "Failed to read database stats" , "err" , err )
db . log . Error ( "Failed to read database stats" , "err" , err )
merr = err
merr = err
continue
continue
}
}
// Find the compaction table, skip the header
lines := strings . Split ( stats , "\n" )
for len ( lines ) > 0 && strings . TrimSpace ( lines [ 0 ] ) != "Compactions" {
lines = lines [ 1 : ]
}
if len ( lines ) <= 3 {
db . log . Error ( "Compaction leveldbTable not found" )
merr = errors . New ( "compaction leveldbTable not found" )
continue
}
lines = lines [ 3 : ]
// Iterate over all the leveldbTable rows, and accumulate the entries
// Iterate over all the leveldbTable rows, and accumulate the entries
for j := 0 ; j < len ( compactions [ i % 2 ] ) ; j ++ {
for j := 0 ; j < len ( compactions [ i % 2 ] ) ; j ++ {
compactions [ i % 2 ] [ j ] = 0
compactions [ i % 2 ] [ j ] = 0
}
}
for _ , line := range lines {
compactions [ i % 2 ] [ 0 ] = stats . LevelSizes . Sum ( )
parts := strings . Split ( line , "|" )
for _ , t := range stats . LevelDurations {
if len ( parts ) != 6 {
compactions [ i % 2 ] [ 1 ] += t . Nanoseconds ( )
break
}
for idx , counter := range parts [ 2 : ] {
value , err := strconv . ParseFloat ( strings . TrimSpace ( counter ) , 64 )
if err != nil {
db . log . Error ( "Compaction entry parsing failed" , "err" , err )
merr = err
continue
}
compactions [ i % 2 ] [ idx ] += value
}
}
}
compactions [ i % 2 ] [ 2 ] = stats . LevelRead . Sum ( )
compactions [ i % 2 ] [ 3 ] = stats . LevelWrite . Sum ( )
// Update all the requested meters
// Update all the requested meters
if db . diskSizeGauge != nil {
if db . diskSizeGauge != nil {
db . diskSizeGauge . Update ( int64 ( compactions [ i % 2 ] [ 0 ] * 1024 * 1024 ) )
db . diskSizeGauge . Update ( compactions [ i % 2 ] [ 0 ] )
}
}
if db . compTimeMeter != nil {
if db . compTimeMeter != nil {
db . compTimeMeter . Mark ( int64 ( ( compactions [ i % 2 ] [ 1 ] - compactions [ ( i - 1 ) % 2 ] [ 1 ] ) * 1000 * 1000 * 1000 ) )
db . compTimeMeter . Mark ( compactions [ i % 2 ] [ 1 ] - compactions [ ( i - 1 ) % 2 ] [ 1 ] )
}
}
if db . compReadMeter != nil {
if db . compReadMeter != nil {
db . compReadMeter . Mark ( int64 ( ( compactions [ i % 2 ] [ 2 ] - compactions [ ( i - 1 ) % 2 ] [ 2 ] ) * 1024 * 1024 ) )
db . compReadMeter . Mark ( compactions [ i % 2 ] [ 2 ] - compactions [ ( i - 1 ) % 2 ] [ 2 ] )
}
}
if db . compWriteMeter != nil {
if db . compWriteMeter != nil {
db . compWriteMeter . Mark ( int64 ( ( compactions [ i % 2 ] [ 3 ] - compactions [ ( i - 1 ) % 2 ] [ 3 ] ) * 1024 * 1024 ) )
db . compWriteMeter . Mark ( compactions [ i % 2 ] [ 3 ] - compactions [ ( i - 1 ) % 2 ] [ 3 ] )
}
// Retrieve the write delay statistic
writedelay , err := db . db . GetProperty ( "leveldb.writedelay" )
if err != nil {
db . log . Error ( "Failed to read database write delay statistic" , "err" , err )
merr = err
continue
}
}
var (
var (
delayN int64
delayN = int64 ( stats . WriteDelayCount )
delayDuration string
duration = stats . WriteDelayDuration
duration time . Duration
paused = stats . WritePaused
paused bool
)
)
if n , err := fmt . Sscanf ( writedelay , "DelayN:%d Delay:%s Paused:%t" , & delayN , & delayDuration , & paused ) ; n != 3 || err != nil {
db . log . Error ( "Write delay statistic not found" )
merr = err
continue
}
duration , err = time . ParseDuration ( delayDuration )
if err != nil {
db . log . Error ( "Failed to parse delay duration" , "err" , err )
merr = err
continue
}
if db . writeDelayNMeter != nil {
if db . writeDelayNMeter != nil {
db . writeDelayNMeter . Mark ( delayN - delaystats [ 0 ] )
db . writeDelayNMeter . Mark ( delayN - delaystats [ 0 ] )
}
}
@ -397,60 +343,27 @@ func (db *Database) meter(refresh time.Duration) {
}
}
delaystats [ 0 ] , delaystats [ 1 ] = delayN , duration . Nanoseconds ( )
delaystats [ 0 ] , delaystats [ 1 ] = delayN , duration . Nanoseconds ( )
// Retrieve the database iostats.
var (
ioStats , err := db . db . GetProperty ( "leveldb.iostats" )
nRead = int64 ( stats . IORead )
if err != nil {
nWrite = int64 ( stats . IOWrite )
db . log . Error ( "Failed to read database iostats" , "err" , err )
)
merr = err
continue
}
var nRead , nWrite float64
parts := strings . Split ( ioStats , " " )
if len ( parts ) < 2 {
db . log . Error ( "Bad syntax of ioStats" , "ioStats" , ioStats )
merr = fmt . Errorf ( "bad syntax of ioStats %s" , ioStats )
continue
}
if n , err := fmt . Sscanf ( parts [ 0 ] , "Read(MB):%f" , & nRead ) ; n != 1 || err != nil {
db . log . Error ( "Bad syntax of read entry" , "entry" , parts [ 0 ] )
merr = err
continue
}
if n , err := fmt . Sscanf ( parts [ 1 ] , "Write(MB):%f" , & nWrite ) ; n != 1 || err != nil {
db . log . Error ( "Bad syntax of write entry" , "entry" , parts [ 1 ] )
merr = err
continue
}
if db . diskReadMeter != nil {
if db . diskReadMeter != nil {
db . diskReadMeter . Mark ( int64 ( ( nRead - iostats [ 0 ] ) * 1024 * 1024 ) )
db . diskReadMeter . Mark ( nRead - iostats [ 0 ] )
}
}
if db . diskWriteMeter != nil {
if db . diskWriteMeter != nil {
db . diskWriteMeter . Mark ( int64 ( ( nWrite - iostats [ 1 ] ) * 1024 * 1024 ) )
db . diskWriteMeter . Mark ( nWrite - iostats [ 1 ] )
}
}
iostats [ 0 ] , iostats [ 1 ] = nRead , nWrite
iostats [ 0 ] , iostats [ 1 ] = nRead , nWrite
compCount , err := db . db . GetProperty ( "leveldb.compcount" )
db . memCompGauge . Update ( int64 ( stats . MemComp ) )
if err != nil {
db . level0CompGauge . Update ( int64 ( stats . Level0Comp ) )
db . log . Error ( "Failed to read database iostats" , "err" , err )
db . nonlevel0CompGauge . Update ( int64 ( stats . NonLevel0Comp ) )
merr = err
db . seekCompGauge . Update ( int64 ( stats . SeekComp ) )
continue
}
var (
// update tables amount
memComp uint32
for i , tables := range stats . LevelTablesCounts {
level0Comp uint32
db . levelsGauge [ i ] . Update ( int64 ( tables ) )
nonLevel0Comp uint32
seekComp uint32
)
if n , err := fmt . Sscanf ( compCount , "MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d" , & memComp , & level0Comp , & nonLevel0Comp , & seekComp ) ; n != 4 || err != nil {
db . log . Error ( "Compaction count statistic not found" )
merr = err
continue
}
}
db . memCompGauge . Update ( int64 ( memComp ) )
db . level0CompGauge . Update ( int64 ( level0Comp ) )
db . nonlevel0CompGauge . Update ( int64 ( nonLevel0Comp ) )
db . seekCompGauge . Update ( int64 ( seekComp ) )
// Sleep a bit, then repeat the stats collection
// Sleep a bit, then repeat the stats collection
select {
select {