@ -70,8 +70,9 @@ type Database struct {
seekCompGauge metrics . Gauge // Gauge for tracking the number of table compaction caused by read opt
seekCompGauge metrics . Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics . Gauge // Gauge for tracking amount of non-managed memory currently allocated
manualMemAllocGauge metrics . Gauge // Gauge for tracking amount of non-managed memory currently allocated
quitLock sync . Mutex // Mutex protecting the quit channel access
quitLock sync . RW Mutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed
log log . Logger // Contextual logger tracking the database path
log log . Logger // Contextual logger tracking the database path
@ -221,23 +222,29 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
func ( d * Database ) Close ( ) error {
func ( d * Database ) Close ( ) error {
d . quitLock . Lock ( )
d . quitLock . Lock ( )
defer d . quitLock . Unlock ( )
defer d . quitLock . Unlock ( )
// Allow double closing, simplifies things
// Allow double closing, simplifies things
if d . quitChan == nil {
if d . closed {
return nil
return nil
}
}
errc := make ( chan error )
d . closed = true
d . quitChan <- errc
if d . quitChan != nil {
if err := <- errc ; err != nil {
errc := make ( chan error )
d . log . Error ( "Metrics collection failed" , "err" , err )
d . quitChan <- errc
if err := <- errc ; err != nil {
d . log . Error ( "Metrics collection failed" , "err" , err )
}
d . quitChan = nil
}
}
d . quitChan = nil
return d . db . Close ( )
return d . db . Close ( )
}
}
// Has retrieves if a key is present in the key-value store.
// Has retrieves if a key is present in the key-value store.
func ( d * Database ) Has ( key [ ] byte ) ( bool , error ) {
func ( d * Database ) Has ( key [ ] byte ) ( bool , error ) {
d . quitLock . RLock ( )
defer d . quitLock . RUnlock ( )
if d . closed {
return false , pebble . ErrClosed
}
_ , closer , err := d . db . Get ( key )
_ , closer , err := d . db . Get ( key )
if err == pebble . ErrNotFound {
if err == pebble . ErrNotFound {
return false , nil
return false , nil
@ -250,6 +257,11 @@ func (d *Database) Has(key []byte) (bool, error) {
// Get retrieves the given key if it's present in the key-value store.
// Get retrieves the given key if it's present in the key-value store.
func ( d * Database ) Get ( key [ ] byte ) ( [ ] byte , error ) {
func ( d * Database ) Get ( key [ ] byte ) ( [ ] byte , error ) {
d . quitLock . RLock ( )
defer d . quitLock . RUnlock ( )
if d . closed {
return nil , pebble . ErrClosed
}
dat , closer , err := d . db . Get ( key )
dat , closer , err := d . db . Get ( key )
if err != nil {
if err != nil {
return nil , err
return nil , err
@ -262,11 +274,21 @@ func (d *Database) Get(key []byte) ([]byte, error) {
// Put inserts the given value into the key-value store.
// Put inserts the given value into the key-value store.
func ( d * Database ) Put ( key [ ] byte , value [ ] byte ) error {
func ( d * Database ) Put ( key [ ] byte , value [ ] byte ) error {
d . quitLock . RLock ( )
defer d . quitLock . RUnlock ( )
if d . closed {
return pebble . ErrClosed
}
return d . db . Set ( key , value , pebble . NoSync )
return d . db . Set ( key , value , pebble . NoSync )
}
}
// Delete removes the key from the key-value store.
// Delete removes the key from the key-value store.
func ( d * Database ) Delete ( key [ ] byte ) error {
func ( d * Database ) Delete ( key [ ] byte ) error {
d . quitLock . RLock ( )
defer d . quitLock . RUnlock ( )
if d . closed {
return pebble . ErrClosed
}
return d . db . Delete ( key , nil )
return d . db . Delete ( key , nil )
}
}
@ -274,7 +296,8 @@ func (d *Database) Delete(key []byte) error {
// database until a final write is called.
// database until a final write is called.
func ( d * Database ) NewBatch ( ) ethdb . Batch {
func ( d * Database ) NewBatch ( ) ethdb . Batch {
return & batch {
return & batch {
b : d . db . NewBatch ( ) ,
b : d . db . NewBatch ( ) ,
db : d ,
}
}
}
}
@ -481,6 +504,7 @@ func (d *Database) meter(refresh time.Duration) {
// when Write is called. A batch cannot be used concurrently.
// when Write is called. A batch cannot be used concurrently.
type batch struct {
type batch struct {
b * pebble . Batch
b * pebble . Batch
db * Database
size int
size int
}
}
@ -505,6 +529,11 @@ func (b *batch) ValueSize() int {
// Write flushes any accumulated data to disk.
// Write flushes any accumulated data to disk.
func ( b * batch ) Write ( ) error {
func ( b * batch ) Write ( ) error {
b . db . quitLock . RLock ( )
defer b . db . quitLock . RUnlock ( )
if b . db . closed {
return pebble . ErrClosed
}
return b . b . Commit ( pebble . NoSync )
return b . b . Commit ( pebble . NoSync )
}
}