diff --git a/p2p/discover/database.go b/p2p/discover/database.go index c6f70972e..d03bf0ab5 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -9,8 +9,11 @@ import ( "os" "time" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rlp" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/storage" ) @@ -19,13 +22,13 @@ var nodeDBNilNodeID = NodeID{} // nodeDB stores all nodes we know about. type nodeDB struct { - lvl *leveldb.DB + lvl *leveldb.DB // Interface to the database itself + seeder iterator.Iterator // Iterator for fetching possible seed nodes } // Schema layout for the node database var ( nodeDBVersionKey = []byte("version") // Version of the database to flush if changes - nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection) nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with nodeDBDiscoverRoot = ":discover" @@ -137,26 +140,16 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error { return db.lvl.Put(key, blob, nil) } -// startup retrieves the time instance when the bootstrapping last begun. Its -// purpose is to prevent contacting potential seed nodes multiple times in the -// same boot cycle. -func (db *nodeDB) startup() time.Time { - return time.Unix(db.fetchInt64(nodeDBStartupKey), 0) -} - -// updateStartup updates the bootstrap initiation time to the one specified. -func (db *nodeDB) updateStartup(instance time.Time) error { - return db.storeInt64(nodeDBStartupKey, instance.Unix()) -} - // node retrieves a node with a given id from the database. func (db *nodeDB) node(id NodeID) *Node { blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) if err != nil { + glog.V(logger.Warn).Infof("failed to retrieve node: %v", err) return nil } node := new(Node) if err := rlp.DecodeBytes(blob, node); err != nil { + glog.V(logger.Warn).Infof("failed to decode node RLP: %v", err) return nil } return node @@ -203,34 +196,35 @@ func (db *nodeDB) updateLastPong(id NodeID, instance time.Time) error { // If the database runs out of potential seeds, we restart the startup counter // and start iterating over the peers again. func (db *nodeDB) querySeeds(n int) []*Node { - startup := db.startup() - - it := db.lvl.NewIterator(nil, nil) - defer it.Release() - + // Create a new seed iterator if none exists + if db.seeder == nil { + db.seeder = db.lvl.NewIterator(nil, nil) + } + // Iterate over the nodes and find suitable seeds nodes := make([]*Node, 0, n) - for len(nodes) < n && it.Next() { + for len(nodes) < n && db.seeder.Next() { // Iterate until a discovery node is found - id, field := splitKey(it.Key()) + id, field := splitKey(db.seeder.Key()) if field != nodeDBDiscoverRoot { continue } - // Retrieve the last ping time, and if older than startup, query - lastPing := db.lastPing(id) - if lastPing.Before(startup) { - if node := db.node(id); node != nil { - nodes = append(nodes, node) - } + // Load it as a potential seed + if node := db.node(id); node != nil { + nodes = append(nodes, node) } } - // Reset the startup time if no seeds were found + // Release the iterator if we reached the end if len(nodes) == 0 { - db.updateStartup(time.Now()) + db.seeder.Release() + db.seeder = nil } return nodes } // close flushes and closes the database files. func (db *nodeDB) close() { + if db.seeder != nil { + db.seeder.Release() + } db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 580b13d41..b067d458d 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -82,24 +82,13 @@ func TestNodeDBInt64(t *testing.T) { func TestNodeDBFetchStore(t *testing.T) { node := &Node{ - ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), - IP: net.IP([]byte{192, 168, 0, 1}), - DiscPort: 31313, - TCPPort: 30303, + ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: net.IP([]byte{192, 168, 0, 1}), + TCPPort: 30303, } inst := time.Now() db, _ := newNodeDB("") - // Check fetch/store operations on the startup object - if stored := db.startup(); stored.Unix() != 0 { - t.Errorf("startup: non-existing object: %v", stored) - } - if err := db.updateStartup(inst); err != nil { - t.Errorf("startup: failed to update: %v", err) - } - if stored := db.startup(); stored.Unix() != inst.Unix() { - t.Errorf("startup: value mismatch: have %v, want %v", stored, inst) - } // Check fetch/store operations on a node ping object if stored := db.lastPing(node.ID); stored.Unix() != 0 { t.Errorf("ping: non-existing object: %v", stored) @@ -129,8 +118,98 @@ func TestNodeDBFetchStore(t *testing.T) { } if stored := db.node(node.ID); stored == nil { t.Errorf("node: not found") - } else if !bytes.Equal(stored.ID[:], node.ID[:]) || !bytes.Equal(stored.IP, node.IP) || - stored.DiscPort != node.DiscPort || stored.TCPPort != node.TCPPort { + } else if !bytes.Equal(stored.ID[:], node.ID[:]) || !stored.IP.Equal(node.IP) || stored.TCPPort != node.TCPPort { t.Errorf("node: data mismatch: have %v, want %v", stored, node) } } + +var nodeDBSeedQueryNodes = []struct { + node Node + pong time.Time +}{ + { + node: Node{ + ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 1}, + }, + pong: time.Now().Add(-2 * time.Second), + }, + { + node: Node{ + ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 2}, + }, + pong: time.Now().Add(-3 * time.Second), + }, + { + node: Node{ + ID: MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 3}, + }, + pong: time.Now().Add(-1 * time.Second), + }, +} + +func TestNodeDBSeedQuery(t *testing.T) { + db, _ := newNodeDB("") + + // Insert a batch of nodes for querying + for i, seed := range nodeDBSeedQueryNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + } + // Retrieve the entire batch and check for duplicates + seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) + if len(seeds) != len(nodeDBSeedQueryNodes) { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)) + } + have := make(map[NodeID]struct{}) + for _, seed := range seeds { + have[seed.ID] = struct{}{} + } + want := make(map[NodeID]struct{}) + for _, seed := range nodeDBSeedQueryNodes { + want[seed.node.ID] = struct{}{} + } + for id, _ := range have { + if _, ok := want[id]; !ok { + t.Errorf("extra seed: %v", id) + } + } + for id, _ := range want { + if _, ok := have[id]; !ok { + t.Errorf("missing seed: %v", id) + } + } + // Make sure the next batch is empty (seed EOF) + seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes)) + if len(seeds) != 0 { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0) + } +} + +func TestNodeDBSeedQueryContinuation(t *testing.T) { + db, _ := newNodeDB("") + + // Insert a batch of nodes for querying + for i, seed := range nodeDBSeedQueryNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + } + // Iteratively retrieve the batch, checking for an empty batch on reset + for i := 0; i < len(nodeDBSeedQueryNodes); i++ { + if seeds := db.querySeeds(1); len(seeds) != 1 { + t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) + } + } + if seeds := db.querySeeds(1); len(seeds) != 0 { + t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0) + } + for i := 0; i < len(nodeDBSeedQueryNodes); i++ { + if seeds := db.querySeeds(1); len(seeds) != 1 { + t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) + } + } +}