p2p/discovery: fix issues raised in the nodeDb PR

release/1.0.1
Péter Szilágyi 10 years ago
parent 8646365b42
commit 0201c04b95
  1. 63
      p2p/discover/database.go
  2. 5
      p2p/discover/table.go

@ -70,7 +70,10 @@ func newPersistentNodeDB(path string) (*nodeDB, error) {
switch err { switch err {
case leveldb.ErrNotFound: case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it // Version not found (i.e. empty cache), insert it
err = db.Put(nodeDBVersionKey, currentVer, nil) if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil {
db.Close()
return nil, err
}
case nil: case nil:
// Version present, flush if different // Version present, flush if different
@ -82,22 +85,17 @@ func newPersistentNodeDB(path string) (*nodeDB, error) {
return newPersistentNodeDB(path) return newPersistentNodeDB(path)
} }
} }
// Clean up in case of an error
if err != nil {
db.Close()
return nil, err
}
return &nodeDB{lvl: db}, nil return &nodeDB{lvl: db}, nil
} }
// key generates the leveldb key-blob from a node id and its particular field of // makeKey generates the leveldb key-blob from a node id and its particular
// interest. // field of interest.
func (db *nodeDB) key(id NodeID, field string) []byte { func makeKey(id NodeID, field string) []byte {
return append(nodeDBItemPrefix, append(id[:], field...)...) return append(nodeDBItemPrefix, append(id[:], field...)...)
} }
// splitKey tries to split a database key into a node id and a field part. // splitKey tries to split a database key into a node id and a field part.
func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) { func splitKey(key []byte) (id NodeID, field string) {
// If the key is not of a node, return it plainly // If the key is not of a node, return it plainly
if !bytes.HasPrefix(key, nodeDBItemPrefix) { if !bytes.HasPrefix(key, nodeDBItemPrefix) {
return NodeID{}, string(key) return NodeID{}, string(key)
@ -110,27 +108,26 @@ func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) {
return id, field return id, field
} }
// fetchTime retrieves a time instance (encoded as a unix timestamp) associated // fetchInt64 retrieves an integer instance associated with a particular
// with a particular database key. // database key.
func (db *nodeDB) fetchTime(key []byte) time.Time { func (db *nodeDB) fetchInt64(key []byte) int64 {
blob, err := db.lvl.Get(key, nil) blob, err := db.lvl.Get(key, nil)
if err != nil { if err != nil {
return time.Time{} return 0
} }
var unix int64 val, read := binary.Varint(blob)
if err := rlp.DecodeBytes(blob, &unix); err != nil { if read <= 0 {
return time.Time{} return 0
} }
return time.Unix(unix, 0) return val
} }
// storeTime update a specific database entry to the current time instance as a // storeInt64 update a specific database entry to the current time instance as a
// unix timestamp. // unix timestamp.
func (db *nodeDB) storeTime(key []byte, instance time.Time) error { func (db *nodeDB) storeInt64(key []byte, n int64) error {
blob, err := rlp.EncodeToBytes(instance.Unix()) blob := make([]byte, binary.MaxVarintLen64)
if err != nil { blob = blob[:binary.PutVarint(blob, n)]
return err
}
return db.lvl.Put(key, blob, nil) return db.lvl.Put(key, blob, nil)
} }
@ -138,17 +135,17 @@ func (db *nodeDB) storeTime(key []byte, instance time.Time) error {
// purpose is to prevent contacting potential seed nodes multiple times in the // purpose is to prevent contacting potential seed nodes multiple times in the
// same boot cycle. // same boot cycle.
func (db *nodeDB) startup() time.Time { func (db *nodeDB) startup() time.Time {
return db.fetchTime(nodeDBStartupKey) return time.Unix(db.fetchInt64(nodeDBStartupKey), 0)
} }
// updateStartup updates the bootstrap initiation time to the one specified. // updateStartup updates the bootstrap initiation time to the one specified.
func (db *nodeDB) updateStartup(instance time.Time) error { func (db *nodeDB) updateStartup(instance time.Time) error {
return db.storeTime(nodeDBStartupKey, instance) return db.storeInt64(nodeDBStartupKey, instance.Unix())
} }
// node retrieves a node with a given id from the database. // node retrieves a node with a given id from the database.
func (db *nodeDB) node(id NodeID) *Node { func (db *nodeDB) node(id NodeID) *Node {
blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil) blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
if err != nil { if err != nil {
return nil return nil
} }
@ -165,28 +162,28 @@ func (db *nodeDB) updateNode(node *Node) error {
if err != nil { if err != nil {
return err return err
} }
return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil) return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
} }
// lastPing retrieves the time of the last ping packet send to a remote node, // lastPing retrieves the time of the last ping packet send to a remote node,
// requesting binding. // requesting binding.
func (db *nodeDB) lastPing(id NodeID) time.Time { func (db *nodeDB) lastPing(id NodeID) time.Time {
return db.fetchTime(db.key(id, nodeDBDiscoverPing)) return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0)
} }
// updateLastPing updates the last time we tried contacting a remote node. // updateLastPing updates the last time we tried contacting a remote node.
func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error { func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error {
return db.storeTime(db.key(id, nodeDBDiscoverPing), instance) return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix())
} }
// lastBond retrieves the time of the last successful bonding with a remote node. // lastBond retrieves the time of the last successful bonding with a remote node.
func (db *nodeDB) lastBond(id NodeID) time.Time { func (db *nodeDB) lastBond(id NodeID) time.Time {
return db.fetchTime(db.key(id, nodeDBDiscoverBond)) return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverBond)), 0)
} }
// updateLastBond updates the last time we successfully bound to a remote node. // updateLastBond updates the last time we successfully bound to a remote node.
func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error { func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error {
return db.storeTime(db.key(id, nodeDBDiscoverBond), instance) return db.storeInt64(makeKey(id, nodeDBDiscoverBond), instance.Unix())
} }
// querySeeds retrieves a batch of nodes to be used as potential seed servers // querySeeds retrieves a batch of nodes to be used as potential seed servers
@ -208,7 +205,7 @@ func (db *nodeDB) querySeeds(n int) []*Node {
nodes := make([]*Node, 0, n) nodes := make([]*Node, 0, n)
for len(nodes) < n && it.Next() { for len(nodes) < n && it.Next() {
// Iterate until a discovery node is found // Iterate until a discovery node is found
id, field := db.splitKey(it.Key()) id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot { if field != nodeDBDiscoverRoot {
continue continue
} }

@ -62,13 +62,12 @@ type bucket struct {
} }
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table { func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
// If no seed cache was given, use an in-memory one // If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath) db, err := newNodeDB(nodeDBPath)
if err != nil { if err != nil {
glog.V(logger.Warn).Infoln("Failed to open node database:", err) glog.V(logger.Warn).Infoln("Failed to open node database:", err)
db, _ = newNodeDB("") db, _ = newNodeDB("")
} }
// Create the bootstrap table
tab := &Table{ tab := &Table{
net: t, net: t,
db: db, db: db,
@ -90,7 +89,7 @@ func (tab *Table) Self() *Node {
return tab.self return tab.self
} }
// Close terminates the network listener and flushes the seed cache. // Close terminates the network listener and flushes the node database.
func (tab *Table) Close() { func (tab *Table) Close() {
tab.net.close() tab.net.close()
tab.db.close() tab.db.close()

Loading…
Cancel
Save