|
|
|
@ -37,15 +37,11 @@ type LDBDatabase struct { |
|
|
|
|
fn string // filename for reporting
|
|
|
|
|
db *leveldb.DB // LevelDB instance
|
|
|
|
|
|
|
|
|
|
getTimer metrics.Timer // Timer for measuring the database get request counts and latencies
|
|
|
|
|
putTimer metrics.Timer // Timer for measuring the database put request counts and latencies
|
|
|
|
|
delTimer metrics.Timer // Timer for measuring the database delete request counts and latencies
|
|
|
|
|
missMeter metrics.Meter // Meter for measuring the missed database get requests
|
|
|
|
|
readMeter metrics.Meter // Meter for measuring the database get request data usage
|
|
|
|
|
writeMeter metrics.Meter // Meter for measuring the database put request data usage
|
|
|
|
|
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
|
|
|
|
|
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
|
|
|
|
|
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
|
|
|
|
|
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
|
|
|
|
|
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
|
|
|
|
|
|
|
|
|
|
quitLock sync.Mutex // Mutex protecting the quit channel access
|
|
|
|
|
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
|
|
|
|
@ -94,16 +90,9 @@ func (db *LDBDatabase) Path() string { |
|
|
|
|
|
|
|
|
|
// Put puts the given key / value to the queue
|
|
|
|
|
func (db *LDBDatabase) Put(key []byte, value []byte) error { |
|
|
|
|
// Measure the database put latency, if requested
|
|
|
|
|
if db.putTimer != nil { |
|
|
|
|
defer db.putTimer.UpdateSince(time.Now()) |
|
|
|
|
} |
|
|
|
|
// Generate the data to write to disk, update the meter and write
|
|
|
|
|
//value = rle.Compress(value)
|
|
|
|
|
|
|
|
|
|
if db.writeMeter != nil { |
|
|
|
|
db.writeMeter.Mark(int64(len(value))) |
|
|
|
|
} |
|
|
|
|
return db.db.Put(key, value, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -113,32 +102,17 @@ func (db *LDBDatabase) Has(key []byte) (bool, error) { |
|
|
|
|
|
|
|
|
|
// Get returns the given key if it's present.
|
|
|
|
|
func (db *LDBDatabase) Get(key []byte) ([]byte, error) { |
|
|
|
|
// Measure the database get latency, if requested
|
|
|
|
|
if db.getTimer != nil { |
|
|
|
|
defer db.getTimer.UpdateSince(time.Now()) |
|
|
|
|
} |
|
|
|
|
// Retrieve the key and increment the miss counter if not found
|
|
|
|
|
dat, err := db.db.Get(key, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
if db.missMeter != nil { |
|
|
|
|
db.missMeter.Mark(1) |
|
|
|
|
} |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// Otherwise update the actually retrieved amount of data
|
|
|
|
|
if db.readMeter != nil { |
|
|
|
|
db.readMeter.Mark(int64(len(dat))) |
|
|
|
|
} |
|
|
|
|
return dat, nil |
|
|
|
|
//return rle.Decompress(dat)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Delete deletes the key from the queue and database
|
|
|
|
|
func (db *LDBDatabase) Delete(key []byte) error { |
|
|
|
|
// Measure the database delete latency, if requested
|
|
|
|
|
if db.delTimer != nil { |
|
|
|
|
defer db.delTimer.UpdateSince(time.Now()) |
|
|
|
|
} |
|
|
|
|
// Execute the actual operation
|
|
|
|
|
return db.db.Delete(key, nil) |
|
|
|
|
} |
|
|
|
@ -178,15 +152,11 @@ func (db *LDBDatabase) Meter(prefix string) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Initialize all the metrics collector at the requested prefix
|
|
|
|
|
db.getTimer = metrics.NewRegisteredTimer(prefix+"user/gets", nil) |
|
|
|
|
db.putTimer = metrics.NewRegisteredTimer(prefix+"user/puts", nil) |
|
|
|
|
db.delTimer = metrics.NewRegisteredTimer(prefix+"user/dels", nil) |
|
|
|
|
db.missMeter = metrics.NewRegisteredMeter(prefix+"user/misses", nil) |
|
|
|
|
db.readMeter = metrics.NewRegisteredMeter(prefix+"user/reads", nil) |
|
|
|
|
db.writeMeter = metrics.NewRegisteredMeter(prefix+"user/writes", nil) |
|
|
|
|
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) |
|
|
|
|
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) |
|
|
|
|
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) |
|
|
|
|
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) |
|
|
|
|
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) |
|
|
|
|
|
|
|
|
|
// Create a quit channel for the periodic collector and run it
|
|
|
|
|
db.quitLock.Lock() |
|
|
|
@ -207,12 +177,17 @@ func (db *LDBDatabase) Meter(prefix string) { |
|
|
|
|
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
|
|
|
|
|
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
|
|
|
|
|
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
|
|
|
|
|
//
|
|
|
|
|
// This is how the iostats look like (currently):
|
|
|
|
|
// Read(MB):3895.04860 Write(MB):3654.64712
|
|
|
|
|
func (db *LDBDatabase) meter(refresh time.Duration) { |
|
|
|
|
// Create the counters to store current and previous values
|
|
|
|
|
counters := make([][]float64, 2) |
|
|
|
|
// Create the counters to store current and previous compaction values
|
|
|
|
|
compactions := make([][]float64, 2) |
|
|
|
|
for i := 0; i < 2; i++ { |
|
|
|
|
counters[i] = make([]float64, 3) |
|
|
|
|
compactions[i] = make([]float64, 3) |
|
|
|
|
} |
|
|
|
|
// Create storage for iostats.
|
|
|
|
|
var iostats [2]float64 |
|
|
|
|
// Iterate ad infinitum and collect the stats
|
|
|
|
|
for i := 1; ; i++ { |
|
|
|
|
// Retrieve the database stats
|
|
|
|
@ -233,8 +208,8 @@ func (db *LDBDatabase) meter(refresh time.Duration) { |
|
|
|
|
lines = lines[3:] |
|
|
|
|
|
|
|
|
|
// Iterate over all the table rows, and accumulate the entries
|
|
|
|
|
for j := 0; j < len(counters[i%2]); j++ { |
|
|
|
|
counters[i%2][j] = 0 |
|
|
|
|
for j := 0; j < len(compactions[i%2]); j++ { |
|
|
|
|
compactions[i%2][j] = 0 |
|
|
|
|
} |
|
|
|
|
for _, line := range lines { |
|
|
|
|
parts := strings.Split(line, "|") |
|
|
|
@ -247,19 +222,60 @@ func (db *LDBDatabase) meter(refresh time.Duration) { |
|
|
|
|
db.log.Error("Compaction entry parsing failed", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
counters[i%2][idx] += value |
|
|
|
|
compactions[i%2][idx] += value |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Update all the requested meters
|
|
|
|
|
if db.compTimeMeter != nil { |
|
|
|
|
db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000)) |
|
|
|
|
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) |
|
|
|
|
} |
|
|
|
|
if db.compReadMeter != nil { |
|
|
|
|
db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024)) |
|
|
|
|
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) |
|
|
|
|
} |
|
|
|
|
if db.compWriteMeter != nil { |
|
|
|
|
db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024)) |
|
|
|
|
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Retrieve the database iostats.
|
|
|
|
|
ioStats, err := db.db.GetProperty("leveldb.iostats") |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Failed to read database iostats", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
parts := strings.Split(ioStats, " ") |
|
|
|
|
if len(parts) < 2 { |
|
|
|
|
db.log.Error("Bad syntax of ioStats", "ioStats", ioStats) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
r := strings.Split(parts[0], ":") |
|
|
|
|
if len(r) < 2 { |
|
|
|
|
db.log.Error("Bad syntax of read entry", "entry", parts[0]) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
read, err := strconv.ParseFloat(r[1], 64) |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Read entry parsing failed", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
w := strings.Split(parts[1], ":") |
|
|
|
|
if len(w) < 2 { |
|
|
|
|
db.log.Error("Bad syntax of write entry", "entry", parts[1]) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
write, err := strconv.ParseFloat(w[1], 64) |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Write entry parsing failed", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if db.diskReadMeter != nil { |
|
|
|
|
db.diskReadMeter.Mark(int64((read - iostats[0]) * 1024 * 1024)) |
|
|
|
|
} |
|
|
|
|
if db.diskWriteMeter != nil { |
|
|
|
|
db.diskWriteMeter.Mark(int64((write - iostats[1]) * 1024 * 1024)) |
|
|
|
|
} |
|
|
|
|
iostats[0] = read |
|
|
|
|
iostats[1] = write |
|
|
|
|
|
|
|
|
|
// Sleep a bit, then repeat the stats collection
|
|
|
|
|
select { |
|
|
|
|
case errc := <-db.quitChan: |
|
|
|
|