// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package network
import (
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
const counterKeyPrefix = 0x01
/ *
syncDb is a queueing service for outgoing deliveries .
One instance per priority queue for each peer
a syncDb instance maintains an in - memory buffer ( of capacity bufferSize )
once its in - memory buffer is full it switches to persisting in db
and dbRead iterator iterates through the items keeping their order
once the db read catches up ( there is no more items in the db ) then
it switches back to in - memory buffer .
when syncdb is stopped all items in the buffer are saved to the db
* /
type syncDb struct {
start [ ] byte // this syncdb starting index in requestdb
key storage . Key // remote peers address key
counterKey [ ] byte // db key to persist counter
priority uint // priotity High|Medium|Low
buffer chan interface { } // incoming request channel
db * storage . LDBDatabase // underlying db (TODO should be interface)
done chan bool // chan to signal goroutines finished quitting
quit chan bool // chan to signal quitting to goroutines
total , dbTotal int // counts for one session
batch chan chan int // channel for batch requests
dbBatchSize uint // number of items before batch is saved
}
// constructor needs a shared request db (leveldb)
// priority is used in the index key
// uses a buffer and a leveldb for persistent storage
// bufferSize, dbBatchSize are config parameters
func newSyncDb ( db * storage . LDBDatabase , key storage . Key , priority uint , bufferSize , dbBatchSize uint , deliver func ( interface { } , chan bool ) bool ) * syncDb {
start := make ( [ ] byte , 42 )
start [ 1 ] = byte ( priorities - priority )
copy ( start [ 2 : 34 ] , key )
counterKey := make ( [ ] byte , 34 )
counterKey [ 0 ] = counterKeyPrefix
copy ( counterKey [ 1 : ] , start [ 1 : 34 ] )
syncdb := & syncDb {
start : start ,
key : key ,
counterKey : counterKey ,
priority : priority ,
buffer : make ( chan interface { } , bufferSize ) ,
db : db ,
done : make ( chan bool ) ,
quit : make ( chan bool ) ,
batch : make ( chan chan int ) ,
dbBatchSize : dbBatchSize ,
}
log . Trace ( fmt . Sprintf ( "syncDb[peer: %v, priority: %v] - initialised" , key . Log ( ) , priority ) )
// starts the main forever loop reading from buffer
go syncdb . bufferRead ( deliver )
return syncdb
}
/ *
bufferRead is a forever iterator loop that takes care of delivering
outgoing store requests reads from incoming buffer
its argument is the deliver function taking the item as first argument
and a quit channel as second .
Closing of this channel is supposed to abort all waiting for delivery
( typically network write )
The iteration switches between 2 modes ,
* buffer mode reads the in - memory buffer and delivers the items directly
* db mode reads from the buffer and writes to the db , parallelly another
routine is started that reads from the db and delivers items
If there is buffer contention in buffer mode ( slow network , high upload volume )
syncdb switches to db mode and starts dbRead
Once db backlog is delivered , it reverts back to in - memory buffer
It is automatically started when syncdb is initialised .
It saves the buffer to db upon receiving quit signal . syncDb # stop ( )
* /
func ( self * syncDb ) bufferRead ( deliver func ( interface { } , chan bool ) bool ) {
var buffer , db chan interface { } // channels representing the two read modes
var more bool
var req interface { }
var entry * syncDbEntry
var inBatch , inDb int
batch := new ( leveldb . Batch )
var dbSize chan int
quit := self . quit
counterValue := make ( [ ] byte , 8 )
// counter is used for keeping the items in order, persisted to db
// start counter where db was at, 0 if not found
data , err := self . db . Get ( self . counterKey )
var counter uint64
if err == nil {
counter = binary . BigEndian . Uint64 ( data )
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] - counter read from db at %v" , self . key . Log ( ) , self . priority , counter ) )
} else {
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] - counter starts at %v" , self . key . Log ( ) , self . priority , counter ) )
}
LOOP :
for {
// waiting for item next in the buffer, or quit signal or batch request
select {
// buffer only closes when writing to db
case req = <- buffer :
// deliver request : this is blocking on network write so
// it is passed the quit channel as argument, so that it returns
// if syncdb is stopped. In this case we need to save the item to the db
more = deliver ( req , self . quit )
if ! more {
log . Debug ( fmt . Sprintf ( "syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v" , self . key . Log ( ) , self . priority , self . dbTotal , self . total ) )
// received quit signal, save request currently waiting delivery
// by switching to db mode and closing the buffer
buffer = nil
db = self . buffer
close ( db )
quit = nil // needs to block the quit case in select
break // break from select, this item will be written to the db
}
self . total ++
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] deliver (db/total): %v/%v" , self . key . Log ( ) , self . priority , self . dbTotal , self . total ) )
// by the time deliver returns, there were new writes to the buffer
// if buffer contention is detected, switch to db mode which drains
// the buffer so no process will block on pushing store requests
if len ( buffer ) == cap ( buffer ) {
log . Debug ( fmt . Sprintf ( "syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v" , self . key . Log ( ) , self . priority , cap ( buffer ) , self . dbTotal , self . total ) )
buffer = nil
db = self . buffer
}
continue LOOP
// incoming entry to put into db
case req , more = <- db :
if ! more {
// only if quit is called, saved all the buffer
binary . BigEndian . PutUint64 ( counterValue , counter )
batch . Put ( self . counterKey , counterValue ) // persist counter in batch
self . writeSyncBatch ( batch ) // save batch
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] quitting: save current batch to db" , self . key . Log ( ) , self . priority ) )
break LOOP
}
self . dbTotal ++
self . total ++
// otherwise break after select
case dbSize = <- self . batch :
// explicit request for batch
if inBatch == 0 && quit != nil {
// there was no writes since the last batch so db depleted
// switch to buffer mode
log . Debug ( fmt . Sprintf ( "syncDb[%v/%v] empty db: switching to buffer" , self . key . Log ( ) , self . priority ) )
db = nil
buffer = self . buffer
dbSize <- 0 // indicates to 'caller' that batch has been written
inDb = 0
continue LOOP
}
binary . BigEndian . PutUint64 ( counterValue , counter )
batch . Put ( self . counterKey , counterValue )
log . Debug ( fmt . Sprintf ( "syncDb[%v/%v] write batch %v/%v - %x - %x" , self . key . Log ( ) , self . priority , inBatch , counter , self . counterKey , counterValue ) )
batch = self . writeSyncBatch ( batch )
dbSize <- inBatch // indicates to 'caller' that batch has been written
inBatch = 0
continue LOOP
// closing syncDb#quit channel is used to signal to all goroutines to quit
case <- quit :
// need to save backlog, so switch to db mode
db = self . buffer
buffer = nil
quit = nil
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] quitting: save buffer to db" , self . key . Log ( ) , self . priority ) )
close ( db )
continue LOOP
}
// only get here if we put req into db
entry , err = self . newSyncDbEntry ( req , counter )
if err != nil {
log . Warn ( fmt . Sprintf ( "syncDb[%v/%v] saving request %v (#%v/%v) failed: %v" , self . key . Log ( ) , self . priority , req , inBatch , inDb , err ) )
continue LOOP
}
batch . Put ( entry . key , entry . val )
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)" , self . key . Log ( ) , self . priority , req , entry , inBatch , inDb , counter ) )
// if just switched to db mode and not quitting, then launch dbRead
// in a parallel go routine to send deliveries from db
if inDb == 0 && quit != nil {
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] start dbRead" , self . key . Log ( ) , self . priority ) )
go self . dbRead ( true , counter , deliver )
}
inDb ++
inBatch ++
counter ++
// need to save the batch if it gets too large (== dbBatchSize)
if inBatch % int ( self . dbBatchSize ) == 0 {
batch = self . writeSyncBatch ( batch )
}
}
log . Info ( fmt . Sprintf ( "syncDb[%v:%v]: saved %v keys (saved counter at %v)" , self . key . Log ( ) , self . priority , inBatch , counter ) )
close ( self . done )
}
// writes the batch to the db and returns a new batch object
func ( self * syncDb ) writeSyncBatch ( batch * leveldb . Batch ) * leveldb . Batch {
err := self . db . Write ( batch )
if err != nil {
log . Warn ( fmt . Sprintf ( "syncDb[%v/%v] saving batch to db failed: %v" , self . key . Log ( ) , self . priority , err ) )
return batch
}
return new ( leveldb . Batch )
}
// abstract type for db entries (TODO could be a feature of Receipts)
type syncDbEntry struct {
key , val [ ] byte
}
func ( self syncDbEntry ) String ( ) string {
return fmt . Sprintf ( "key: %x, value: %x" , self . key , self . val )
}
/ *
dbRead is iterating over store requests to be sent over to the peer
this is mainly to prevent crashes due to network output buffer contention ( ? ? ? )
as well as to make syncronisation resilient to disconnects
the messages are supposed to be sent in the p2p priority queue .
the request DB is shared between peers , but domains for each syncdb
are disjoint . dbkeys ( 42 bytes ) are structured :
* 0 : 0x00 ( 0x01 reserved for counter key )
* 1 : priorities - priority ( so that high priority can be replayed first )
* 2 - 33 : peers address
* 34 - 41 : syncdb counter to preserve order ( this field is missing for the counter key )
values ( 40 bytes ) are :
* 0 - 31 : key
* 32 - 39 : request id
dbRead needs a boolean to indicate if on first round all the historical
record is synced . Second argument to indicate current db counter
The third is the function to apply
* /
func ( self * syncDb ) dbRead ( useBatches bool , counter uint64 , fun func ( interface { } , chan bool ) bool ) {
key := make ( [ ] byte , 42 )
copy ( key , self . start )
binary . BigEndian . PutUint64 ( key [ 34 : ] , counter )
var batches , n , cnt , total int
var more bool
var entry * syncDbEntry
var it iterator . Iterator
var del * leveldb . Batch
batchSizes := make ( chan int )
for {
// if useBatches is false, cnt is not set
if useBatches {
// this could be called before all cnt items sent out
// so that loop is not blocking while delivering
// only relevant if cnt is large
select {
case self . batch <- batchSizes :
case <- self . quit :
return
}
// wait for the write to finish and get the item count in the next batch
cnt = <- batchSizes
batches ++
if cnt == 0 {
// empty
return
}
}
it = self . db . NewIterator ( )
it . Seek ( key )
if ! it . Valid ( ) {
copy ( key , self . start )
useBatches = true
continue
}
del = new ( leveldb . Batch )
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v]: new iterator: %x (batch %v, count %v)" , self . key . Log ( ) , self . priority , key , batches , cnt ) )
for n = 0 ; ! useBatches || n < cnt ; it . Next ( ) {
copy ( key , it . Key ( ) )
if len ( key ) == 0 || key [ 0 ] != 0 {
copy ( key , self . start )
useBatches = true
break
}
val := make ( [ ] byte , 40 )
copy ( val , it . Value ( ) )
entry = & syncDbEntry { key , val }
// log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
more = fun ( entry , self . quit )
if ! more {
// quit received when waiting to deliver entry, the entry will not be deleted
log . Trace ( fmt . Sprintf ( "syncDb[%v/%v] batch %v quit after %v/%v items" , self . key . Log ( ) , self . priority , batches , n , cnt ) )
break
}
// since subsequent batches of the same db session are indexed incrementally
// deleting earlier batches can be delayed and parallelised
// this could be batch delete when db is idle (but added complexity esp when quitting)
del . Delete ( key )
n ++
total ++
}
log . Debug ( fmt . Sprintf ( "syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v" , self . key . Log ( ) , self . priority , batches , total , self . dbTotal , self . total ) )
self . db . Write ( del ) // this could be async called only when db is idle
it . Release ( )
}
}
//
func ( self * syncDb ) stop ( ) {
close ( self . quit )
<- self . done
}
// calculate a dbkey for the request, for the db to work
// see syncdb for db key structure
// polimorphic: accepted types, see syncer#addRequest
func ( self * syncDb ) newSyncDbEntry ( req interface { } , counter uint64 ) ( entry * syncDbEntry , err error ) {
var key storage . Key
var chunk * storage . Chunk
var id uint64
var ok bool
var sreq * storeRequestMsgData
if key , ok = req . ( storage . Key ) ; ok {
id = generateId ( )
} else if chunk , ok = req . ( * storage . Chunk ) ; ok {
key = chunk . Key
id = generateId ( )
} else if sreq , ok = req . ( * storeRequestMsgData ) ; ok {
key = sreq . Key
id = sreq . Id
} else if entry , ok = req . ( * syncDbEntry ) ; ! ok {
return nil , fmt . Errorf ( "type not allowed: %v (%T)" , req , req )
}
// order by peer > priority > seqid
// value is request id if exists
if entry == nil {
dbkey := make ( [ ] byte , 42 )
dbval := make ( [ ] byte , 40 )
// encode key
copy ( dbkey [ : ] , self . start [ : 34 ] ) // db peer
binary . BigEndian . PutUint64 ( dbkey [ 34 : ] , counter )
// encode value
copy ( dbval , key [ : ] )
binary . BigEndian . PutUint64 ( dbval [ 32 : ] , id )
entry = & syncDbEntry { dbkey , dbval }
}
return
}