@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb"
)
)
@ -37,7 +38,7 @@ var ErrAlreadyProcessed = errors.New("already processed")
type request struct {
type request struct {
hash common . Hash // Hash of the node data content to retrieve
hash common . Hash // Hash of the node data content to retrieve
data [ ] byte // Data content of the node, cached until all subtrees complete
data [ ] byte // Data content of the node, cached until all subtrees complete
raw bool // Whether this is a raw entry (code) or a trie node
code bool // Whether this is a code entry
parents [ ] * request // Parent state nodes referencing this entry (notify all upon completion)
parents [ ] * request // Parent state nodes referencing this entry (notify all upon completion)
depth int // Depth level within the trie the node is located to prioritise DFS
depth int // Depth level within the trie the node is located to prioritise DFS
@ -46,8 +47,7 @@ type request struct {
callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
}
}
// SyncResult is a simple list to return missing nodes along with their request
// SyncResult is a response with requested data along with it's hash.
// hashes.
type SyncResult struct {
type SyncResult struct {
Hash common . Hash // Hash of the originally unknown trie node
Hash common . Hash // Hash of the originally unknown trie node
Data [ ] byte // Data content of the retrieved node
Data [ ] byte // Data content of the retrieved node
@ -56,25 +56,40 @@ type SyncResult struct {
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
// persisted data items.
type syncMemBatch struct {
type syncMemBatch struct {
batch map [ common . Hash ] [ ] byte // In-memory membatch of recently completed items
nodes map [ common . Hash ] [ ] byte // In-memory membatch of recently completed nodes
codes map [ common . Hash ] [ ] byte // In-memory membatch of recently completed codes
}
}
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch ( ) * syncMemBatch {
func newSyncMemBatch ( ) * syncMemBatch {
return & syncMemBatch {
return & syncMemBatch {
batch : make ( map [ common . Hash ] [ ] byte ) ,
nodes : make ( map [ common . Hash ] [ ] byte ) ,
codes : make ( map [ common . Hash ] [ ] byte ) ,
}
}
}
}
// hasNode reports the trie node with specific hash is already cached.
func ( batch * syncMemBatch ) hasNode ( hash common . Hash ) bool {
_ , ok := batch . nodes [ hash ]
return ok
}
// hasCode reports the contract code with specific hash is already cached.
func ( batch * syncMemBatch ) hasCode ( hash common . Hash ) bool {
_ , ok := batch . codes [ hash ]
return ok
}
// Sync is the main state trie synchronisation scheduler, which provides yet
// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
// and reconstructs the trie step by step until all is done.
type Sync struct {
type Sync struct {
database ethdb . KeyValueReader // Persistent database to check for existing entries
database ethdb . KeyValueReader // Persistent database to check for existing entries
membatch * syncMemBatch // Memory buffer to avoid frequent database writes
membatch * syncMemBatch // Memory buffer to avoid frequent database writes
requests map [ common . Hash ] * request // Pending requests pertaining to a key hash
nodeReqs map [ common . Hash ] * request // Pending requests pertaining to a trie node hash
codeReqs map [ common . Hash ] * request // Pending requests pertaining to a code hash
queue * prque . Prque // Priority queue with the pending requests
queue * prque . Prque // Priority queue with the pending requests
bloom * SyncBloom // Bloom filter for fast node existence checks
bloom * SyncBloom // Bloom filter for fast stat e existence checks
}
}
// NewSync creates a new trie data download scheduler.
// NewSync creates a new trie data download scheduler.
@ -82,7 +97,8 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
ts := & Sync {
ts := & Sync {
database : database ,
database : database ,
membatch : newSyncMemBatch ( ) ,
membatch : newSyncMemBatch ( ) ,
requests : make ( map [ common . Hash ] * request ) ,
nodeReqs : make ( map [ common . Hash ] * request ) ,
codeReqs : make ( map [ common . Hash ] * request ) ,
queue : prque . New ( nil ) ,
queue : prque . New ( nil ) ,
bloom : bloom ,
bloom : bloom ,
}
}
@ -96,13 +112,15 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
if root == emptyRoot {
if root == emptyRoot {
return
return
}
}
if _ , ok := s . membatch . batch [ root ] ; ok {
if s . membatch . hasNode ( root ) {
return
return
}
}
if s . bloom == nil || s . bloom . Contains ( root [ : ] ) {
if s . bloom == nil || s . bloom . Contains ( root [ : ] ) {
// Bloom filter says this might be a duplicate, double check
// Bloom filter says this might be a duplicate, double check.
blob , _ := s . database . Get ( root [ : ] )
// If database says yes, then at least the trie node is present
if local , err := decodeNode ( root [ : ] , blob ) ; local != nil && err == nil {
// and we hold the assumption that it's NOT legacy contract code.
blob := rawdb . ReadTrieNode ( s . database , root )
if len ( blob ) > 0 {
return
return
}
}
// False positive, bump fault meter
// False positive, bump fault meter
@ -116,7 +134,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
}
}
// If this sub-trie has a designated parent, link them together
// If this sub-trie has a designated parent, link them together
if parent != ( common . Hash { } ) {
if parent != ( common . Hash { } ) {
ancestor := s . request s[ parent ]
ancestor := s . nodeReq s[ parent ]
if ancestor == nil {
if ancestor == nil {
panic ( fmt . Sprintf ( "sub-trie ancestor not found: %x" , parent ) )
panic ( fmt . Sprintf ( "sub-trie ancestor not found: %x" , parent ) )
}
}
@ -126,21 +144,25 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
s . schedule ( req )
s . schedule ( req )
}
}
// AddRawEntry schedules the direct retrieval of a state entry that should not be
// AddCodeEntry schedules the direct retrieval of a contract code that should not
// interpreted as a trie node, but rather accepted and stored into the database
// be interpreted as a trie node, but rather accepted and stored into the database
// as is. This method's goal is to support misc state metadata retrievals (e.g.
// as is.
// contract code).
func ( s * Sync ) AddCodeEntry ( hash common . Hash , depth int , parent common . Hash ) {
func ( s * Sync ) AddRawEntry ( hash common . Hash , depth int , parent common . Hash ) {
// Short circuit if the entry is empty or already known
// Short circuit if the entry is empty or already known
if hash == emptyState {
if hash == emptyState {
return
return
}
}
if _ , ok := s . membatch . batch [ hash ] ; ok {
if s . membatch . hasCode ( hash ) {
return
return
}
}
if s . bloom == nil || s . bloom . Contains ( hash [ : ] ) {
if s . bloom == nil || s . bloom . Contains ( hash [ : ] ) {
// Bloom filter says this might be a duplicate, double check
// Bloom filter says this might be a duplicate, double check.
if ok , _ := s . database . Has ( hash [ : ] ) ; ok {
// If database says yes, the blob is present for sure.
// Note we only check the existence with new code scheme, fast
// sync is expected to run with a fresh new node. Even there
// exists the code with legacy format, fetch and store with
// new scheme anyway.
if blob := rawdb . ReadCodeWithPrefix ( s . database , hash ) ; len ( blob ) > 0 {
return
return
}
}
// False positive, bump fault meter
// False positive, bump fault meter
@ -149,12 +171,12 @@ func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
// Assemble the new sub-trie sync request
// Assemble the new sub-trie sync request
req := & request {
req := & request {
hash : hash ,
hash : hash ,
raw : true ,
code : true ,
depth : depth ,
depth : depth ,
}
}
// If this sub-trie has a designated parent, link them together
// If this sub-trie has a designated parent, link them together
if parent != ( common . Hash { } ) {
if parent != ( common . Hash { } ) {
ancestor := s . request s[ parent ]
ancestor := s . nodeReq s[ parent ] // the parent of codereq can ONLY be nodereq
if ancestor == nil {
if ancestor == nil {
panic ( fmt . Sprintf ( "raw-entry ancestor not found: %x" , parent ) )
panic ( fmt . Sprintf ( "raw-entry ancestor not found: %x" , parent ) )
}
}
@ -173,61 +195,64 @@ func (s *Sync) Missing(max int) []common.Hash {
return requests
return requests
}
}
// Process injects a batch of retrieved trie nodes data, returning if something
// Process injects the received data for requested item. Note it can
// was committed to the database and also the index of an entry if its processing
// happpen that the single response commits two pending requests(e.g.
// failed.
// there are two requests one for code and one for node but the hash
func ( s * Sync ) Process ( results [ ] SyncResult ) ( bool , int , error ) {
// is same). In this case the second response for the same hash will
committed := false
// be treated as "non-requested" item or "already-processed" item but
// there is no downside.
for i , item := range results {
func ( s * Sync ) Process ( result SyncResult ) error {
// If the item was not requested, bail out
// If the item was not requested either for code or node, bail out
request := s . requests [ item . Hash ]
if s . nodeReqs [ result . Hash ] == nil && s . codeReqs [ result . Hash ] == nil {
if request == nil {
return ErrNotRequested
return committed , i , ErrNotRequested
}
}
// There is an pending code request for this data, commit directly
if request . data != nil {
var filled bool
return committed , i , ErrAlreadyProcessed
if req := s . codeReqs [ result . Hash ] ; req != nil && req . data == nil {
}
filled = true
// If the item is a raw entry request, commit directly
req . data = result . Data
if request . raw {
s . commit ( req )
request . data = item . Data
}
s . commit ( request )
// There is an pending node request for this data, fill it.
committed = true
if req := s . nodeReqs [ result . Hash ] ; req != nil && req . data == nil {
continue
filled = true
}
// Decode the node data content and update the request
// Decode the node data content and update the request
node , err := decodeNode ( item . Hash [ : ] , item . Data )
node , err := decodeNode ( result . Hash [ : ] , result . Data )
if err != nil {
if err != nil {
return committed , i , err
return err
}
}
request . data = item . Data
req . data = result . Data
// Create and schedule a request for all the children nodes
// Create and schedule a request for all the children nodes
requests , err := s . children ( request , node )
requests , err := s . children ( req , node )
if err != nil {
if err != nil {
return committed , i , err
return err
}
if len ( requests ) == 0 && request . deps == 0 {
s . commit ( request )
committed = true
continue
}
}
request . deps += len ( requests )
if len ( requests ) == 0 && req . deps == 0 {
for _ , child := range requests {
s . commit ( req )
s . schedule ( child )
} else {
req . deps += len ( requests )
for _ , child := range requests {
s . schedule ( child )
}
}
}
}
}
return committed , 0 , nil
if ! filled {
return ErrAlreadyProcessed
}
return nil
}
}
// Commit flushes the data stored in the internal membatch out to persistent
// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning any occurred error.
// storage, returning any occurred error.
func ( s * Sync ) Commit ( dbw ethdb . Batch ) error {
func ( s * Sync ) Commit ( dbw ethdb . Batch ) error {
// Dump the membatch into a database dbw
// Dump the membatch into a database dbw
for key , value := range s . membatch . batch {
for key , value := range s . membatch . nodes {
if err := dbw . Put ( key [ : ] , value ) ; err != nil {
rawdb . WriteTrieNode ( dbw , key , value )
return err
s . bloom . Add ( key [ : ] )
}
}
for key , value := range s . membatch . codes {
rawdb . WriteCode ( dbw , key , value )
s . bloom . Add ( key [ : ] )
s . bloom . Add ( key [ : ] )
}
}
// Drop the membatch data and return
// Drop the membatch data and return
@ -237,21 +262,30 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
// Pending returns the number of state entries currently pending for download.
// Pending returns the number of state entries currently pending for download.
func ( s * Sync ) Pending ( ) int {
func ( s * Sync ) Pending ( ) int {
return len ( s . request s)
return len ( s . nodeReqs ) + len ( s . codeReq s)
}
}
// schedule inserts a new state retrieval request into the fetch queue. If there
// schedule inserts a new state retrieval request into the fetch queue. If there
// is already a pending request for this node, the new request will be discarded
// is already a pending request for this node, the new request will be discarded
// and only a parent reference added to the old one.
// and only a parent reference added to the old one.
func ( s * Sync ) schedule ( req * request ) {
func ( s * Sync ) schedule ( req * request ) {
var reqset = s . nodeReqs
if req . code {
reqset = s . codeReqs
}
// If we're already requesting this node, add a new reference and stop
// If we're already requesting this node, add a new reference and stop
if old , ok := s . requests [ req . hash ] ; ok {
if old , ok := reqse t [ req . hash ] ; ok {
old . parents = append ( old . parents , req . parents ... )
old . parents = append ( old . parents , req . parents ... )
return
return
}
}
// Schedule the request for future retrieval
reqset [ req . hash ] = req
// Schedule the request for future retrieval. This queue is shared
// by both node requests and code requests. It can happen that there
// is a trie node and code has same hash. In this case two elements
// with same hash and same or different depth will be pushed. But it's
// ok the worst case is the second response will be treated as duplicated.
s . queue . Push ( req . hash , int64 ( req . depth ) )
s . queue . Push ( req . hash , int64 ( req . depth ) )
s . requests [ req . hash ] = req
}
}
// children retrieves all the missing children of a state trie entry for future
// children retrieves all the missing children of a state trie entry for future
@ -297,12 +331,14 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
if node , ok := ( child . node ) . ( hashNode ) ; ok {
if node , ok := ( child . node ) . ( hashNode ) ; ok {
// Try to resolve the node from the local database
// Try to resolve the node from the local database
hash := common . BytesToHash ( node )
hash := common . BytesToHash ( node )
if _ , ok := s . membatch . batch [ hash ] ; ok {
if s . membatch . hasNode ( hash ) {
continue
continue
}
}
if s . bloom == nil || s . bloom . Contains ( node ) {
if s . bloom == nil || s . bloom . Contains ( node ) {
// Bloom filter says this might be a duplicate, double check
// Bloom filter says this might be a duplicate, double check.
if ok , _ := s . database . Has ( node ) ; ok {
// If database says yes, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
if blob := rawdb . ReadTrieNode ( s . database , common . BytesToHash ( node ) ) ; len ( blob ) > 0 {
continue
continue
}
}
// False positive, bump fault meter
// False positive, bump fault meter
@ -325,10 +361,13 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
// committed themselves.
// committed themselves.
func ( s * Sync ) commit ( req * request ) ( err error ) {
func ( s * Sync ) commit ( req * request ) ( err error ) {
// Write the node content to the membatch
// Write the node content to the membatch
s . membatch . batch [ req . hash ] = req . data
if req . code {
s . membatch . codes [ req . hash ] = req . data
delete ( s . requests , req . hash )
delete ( s . codeReqs , req . hash )
} else {
s . membatch . nodes [ req . hash ] = req . data
delete ( s . nodeReqs , req . hash )
}
// Check all parents for completion
// Check all parents for completion
for _ , parent := range req . parents {
for _ , parent := range req . parents {
parent . deps --
parent . deps --