trie: remove dependency on ethdb

This removes the core/types -> leveldb dependency.
pull/3518/head
Felix Lange 8 years ago
parent 7731061903
commit d3b751e4d9
  1. 7
      core/state/sync.go
  2. 10
      core/state/sync_test.go
  3. 15
      eth/downloader/queue.go
  4. 24
      trie/sync.go
  5. 12
      trie/sync_test.go
  6. 6
      trie/trie.go

@ -21,7 +21,6 @@ import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
@ -32,7 +31,7 @@ import (
type StateSync trie.TrieSync type StateSync trie.TrieSync
// NewStateSync create a new state trie download scheduler. // NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.Database) *StateSync { func NewStateSync(root common.Hash, database trie.DatabaseReader) *StateSync {
var syncer *trie.TrieSync var syncer *trie.TrieSync
callback := func(leaf []byte, parent common.Hash) error { callback := func(leaf []byte, parent common.Hash) error {
@ -62,8 +61,8 @@ func (s *StateSync) Missing(max int) []common.Hash {
// Process injects a batch of retrieved trie nodes data, returning if something // Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of // was committed to the database and also the index of an entry if processing of
// it failed. // it failed.
func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list) return (*trie.TrieSync)(s).Process(list, dbw)
} }
// Pending returns the number of state entries currently pending for download. // Pending returns the number of state entries currently pending for download.

@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(0)...) queue = append(queue[len(results):], sched.Missing(0)...)
@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data}) results = append(results, trie.SyncResult{Hash: hash, Data: data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, hash := range sched.Missing(0) { for _, hash := range sched.Missing(0) {
@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
// Process each of the state nodes // Process each of the state nodes
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {

@ -1123,15 +1123,20 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bo
callback(i, progressed, errNoFetchesPending) callback(i, progressed, errNoFetchesPending)
return return
} }
if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out batch := q.stateDatabase.NewBatch()
prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}, batch)
if err != nil {
q.stateSchedLock.Unlock()
callback(i, progressed, err)
}
if err = batch.Write(); err != nil {
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
callback(i, progressed, err) callback(i, progressed, err)
return
} else if prog {
progressed = true
} }
// Item processing succeeded, release the lock (temporarily) // Item processing succeeded, release the lock (temporarily)
progressed = progressed || prog
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
} }
callback(len(results), progressed, nil) callback(len(results), progressed, nil)

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
@ -58,13 +57,13 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
// unknown trie hashes to retrieve, accepts node data associated with said hashes // unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done. // and reconstructs the trie step by step until all is done.
type TrieSync struct { type TrieSync struct {
database ethdb.Database // State database for storing all the assembled node data database DatabaseReader
requests map[common.Hash]*request // Pending requests pertaining to a key hash requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests queue *prque.Prque // Priority queue with the pending requests
} }
// NewTrieSync creates a new trie data download scheduler. // NewTrieSync creates a new trie data download scheduler.
func NewTrieSync(root common.Hash, database ethdb.Database, callback TrieSyncLeafCallback) *TrieSync { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync {
ts := &TrieSync{ ts := &TrieSync{
database: database, database: database,
requests: make(map[common.Hash]*request), requests: make(map[common.Hash]*request),
@ -145,7 +144,7 @@ func (s *TrieSync) Missing(max int) []common.Hash {
// Process injects a batch of retrieved trie nodes data, returning if something // Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of // was committed to the database and also the index of an entry if processing of
// it failed. // it failed.
func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) {
committed := false committed := false
for i, item := range results { for i, item := range results {
@ -157,7 +156,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
// If the item is a raw entry request, commit directly // If the item is a raw entry request, commit directly
if request.raw { if request.raw {
request.data = item.Data request.data = item.Data
s.commit(request, nil) s.commit(request, dbw)
committed = true committed = true
continue continue
} }
@ -174,7 +173,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
return committed, i, err return committed, i, err
} }
if len(requests) == 0 && request.deps == 0 { if len(requests) == 0 && request.deps == 0 {
s.commit(request, nil) s.commit(request, dbw)
committed = true committed = true
continue continue
} }
@ -266,16 +265,9 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
// commit finalizes a retrieval request and stores it into the database. If any // commit finalizes a retrieval request and stores it into the database. If any
// of the referencing parent requests complete due to this commit, they are also // of the referencing parent requests complete due to this commit, they are also
// committed themselves. // committed themselves.
func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) { func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) {
// Create a new batch if none was specified
if batch == nil {
batch = s.database.NewBatch()
defer func() {
err = batch.Write()
}()
}
// Write the node content to disk // Write the node content to disk
if err := batch.Put(req.hash[:], req.data); err != nil { if err := dbw.Put(req.hash[:], req.data); err != nil {
return err return err
} }
delete(s.requests, req.hash) delete(s.requests, req.hash)
@ -284,7 +276,7 @@ func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) {
for _, parent := range req.parents { for _, parent := range req.parents {
parent.deps-- parent.deps--
if parent.deps == 0 { if parent.deps == 0 {
if err := s.commit(parent, batch); err != nil { if err := s.commit(parent, dbw); err != nil {
return err return err
} }
} }

@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(10000)...) queue = append(queue[len(results):], sched.Missing(10000)...)
@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data}) results = append(results, SyncResult{hash, data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {
@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(0)...) queue = append(queue[:0], sched.Missing(0)...)
@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
// Process each of the trie nodes // Process each of the trie nodes
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results, dstDb); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {

@ -60,8 +60,12 @@ func init() {
// Database must be implemented by backing stores for the trie. // Database must be implemented by backing stores for the trie.
type Database interface { type Database interface {
DatabaseReader
DatabaseWriter DatabaseWriter
// Get returns the value for key from the database. }
// DatabaseReader wraps the Get method of a backing store for the trie.
type DatabaseReader interface {
Get(key []byte) (value []byte, err error) Get(key []byte) (value []byte, err error)
} }

Loading…
Cancel
Save