Peer reaping and fake network

pull/3/merge
obscuren 11 years ago
parent 52fb3b412c
commit 7ade1778fb
  1. 8
      ethereum.go
  2. 4
      peer.go
  3. 46
      server.go

@ -72,14 +72,6 @@ func main() {
server.Start() server.Start()
err = server.ConnectToPeer("localhost:12345")
if err != nil {
log.Println("Error starting server", err)
server.Stop()
return
}
// Wait for shutdown // Wait for shutdown
server.WaitForShutdown() server.WaitForShutdown()

@ -28,7 +28,11 @@ type Peer struct {
// Flag for checking the peer's connectivity state // Flag for checking the peer's connectivity state
connected int32 connected int32
disconnect int32 disconnect int32
// Last known message send
lastSend time.Time lastSend time.Time
// Indicated whether a verack has been send or not
// This flag is used by writeMessage to check if messages are allowed
// to be send or not. If no version is known all messages are ignored.
versionKnown bool versionKnown bool
} }

@ -8,13 +8,14 @@ import (
"log" "log"
"net" "net"
"time" "time"
"sync/atomic"
) )
func eachPeer(peers *list.List, callback func(*Peer)) { func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
// Loop thru the peers and close them (if we had them) // Loop thru the peers and close them (if we had them)
for e := peers.Front(); e != nil; e = e.Next() { for e := peers.Front(); e != nil; e = e.Next() {
if peer, ok := e.Value.(*Peer); ok { if peer, ok := e.Value.(*Peer); ok {
callback(peer) callback(peer, e)
} }
} }
} }
@ -75,19 +76,54 @@ func (s *Server) ConnectToPeer(addr string) error {
} }
func (s *Server) Broadcast(msgType string, data []byte) { func (s *Server) Broadcast(msgType string, data []byte) {
eachPeer(s.peers, func(p *Peer) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(ethwire.NewMessage(msgType, 0, data)) p.QueueMessage(ethwire.NewMessage(msgType, 0, data))
}) })
} }
const (
processReapingTimeout = 10 // TODO increase
)
func (s *Server) ReapDeadPeers() {
for {
eachPeer(s.peers, func(p *Peer, e *list.Element) {
if atomic.LoadInt32(&p.disconnect) == 1 {
log.Println("Dead peer found .. reaping")
s.peers.Remove(e)
}
})
time.Sleep(processReapingTimeout * time.Second)
}
}
// Start the server // Start the server
func (s *Server) Start() { func (s *Server) Start() {
// For now this function just blocks the main thread // For now this function just blocks the main thread
ln, err := net.Listen("tcp", ":12345") ln, err := net.Listen("tcp", ":12345")
if err != nil { if err != nil {
log.Fatal(err) // This is mainly for testing to create a "network"
if Debug {
log.Println("Connection listening disabled. Acting as client")
err = s.ConnectToPeer("localhost:12345")
if err != nil {
log.Println("Error starting server", err)
s.Stop()
}
return
} else {
log.Fatal(err)
}
} }
// Start the reaping processes
go s.ReapDeadPeers()
go func() { go func() {
for { for {
conn, err := ln.Accept() conn, err := ln.Accept()
@ -117,7 +153,7 @@ func (s *Server) Stop() {
// Close the database // Close the database
defer s.db.Close() defer s.db.Close()
eachPeer(s.peers, func(p *Peer) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.Stop() p.Stop()
}) })

Loading…
Cancel
Save