pull/150/head
obscuren 11 years ago
parent b70fe3a9be
commit cb8a7d979d
  1. 73
      ethereum.go
  2. 35
      peer.go

@ -9,6 +9,7 @@ import (
"log" "log"
"net" "net"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -45,9 +46,14 @@ type Ethereum struct {
Addr net.Addr Addr net.Addr
nat NAT nat NAT
peerMut sync.Mutex
// Capabilities for outgoing peers
serverCaps Caps
} }
func New() (*Ethereum, error) { func New(caps Caps) (*Ethereum, error) {
//db, err := ethdb.NewLDBDatabase() //db, err := ethdb.NewLDBDatabase()
db, err := ethdb.NewMemDatabase() db, err := ethdb.NewMemDatabase()
if err != nil { if err != nil {
@ -56,12 +62,11 @@ func New() (*Ethereum, error) {
ethutil.Config.Db = db ethutil.Config.Db = db
/*
nat, err := Discover() nat, err := Discover()
if err != nil { if err != nil {
log.Printf("Can'them discover upnp: %v", err) log.Printf("Can't discover upnp: %v", err)
} }
*/ log.Println(nat)
nonce, _ := ethutil.RandomUint64() nonce, _ := ethutil.RandomUint64()
ethereum := &Ethereum{ ethereum := &Ethereum{
@ -69,7 +74,8 @@ func New() (*Ethereum, error) {
db: db, db: db,
peers: list.New(), peers: list.New(),
Nonce: nonce, Nonce: nonce,
//nat: nat, serverCaps: caps,
nat: nat,
} }
ethereum.TxPool = ethchain.NewTxPool() ethereum.TxPool = ethchain.NewTxPool()
ethereum.TxPool.Speaker = ethereum ethereum.TxPool.Speaker = ethereum
@ -85,14 +91,9 @@ func (s *Ethereum) AddPeer(conn net.Conn) {
peer := NewPeer(conn, s, true) peer := NewPeer(conn, s, true)
if peer != nil { if peer != nil {
if s.peers.Len() > 25 {
log.Println("SEED")
peer.Start(true)
} else {
s.peers.PushBack(peer) s.peers.PushBack(peer)
peer.Start(false) peer.Start(false)
} }
}
} }
func (s *Ethereum) ProcessPeerList(addrs []string) { func (s *Ethereum) ProcessPeerList(addrs []string) {
@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
return nil return nil
} }
peer := NewOutboundPeer(addr, s) peer := NewOutboundPeer(addr, s, s.serverCaps)
s.peers.PushBack(peer) s.peers.PushBack(peer)
@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer {
} }
func (s *Ethereum) InOutPeers() []*Peer { func (s *Ethereum) InOutPeers() []*Peer {
// Reap the dead peers first
s.reapPeers()
// Create a new peer slice with at least the length of the total peers // Create a new peer slice with at least the length of the total peers
inboundPeers := make([]*Peer, s.peers.Len()) inboundPeers := make([]*Peer, s.peers.Len())
length := 0 length := 0
eachPeer(s.peers, func(p *Peer, e *list.Element) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
// Only return peers with an actual ip
if len(p.host) > 0 {
inboundPeers[length] = p inboundPeers[length] = p
length++ length++
}
}) })
return inboundPeers[:length] return inboundPeers[:length]
@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer {
func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) { func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
msg := ethwire.NewMessage(msgType, data) msg := ethwire.NewMessage(msgType, data)
s.BroadcastMsg(msg)
}
func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
eachPeer(s.peers, func(p *Peer, e *list.Element) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(msg) p.QueueMessage(msg)
}) })
@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List {
return s.peers return s.peers
} }
func (s *Ethereum) ReapDeadPeers() { func (s *Ethereum) reapPeers() {
for { s.peerMut.Lock()
defer s.peerMut.Unlock()
eachPeer(s.peers, func(p *Peer, e *list.Element) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
s.peers.Remove(e) s.peers.Remove(e)
} }
}) })
}
func (s *Ethereum) ReapDeadPeerHandler() {
reapTimer := time.NewTicker(processReapingTimeout * time.Second)
time.Sleep(processReapingTimeout * time.Second) for {
select {
case <-reapTimer.C:
s.reapPeers()
}
} }
} }
@ -241,11 +262,23 @@ func (s *Ethereum) Start() {
} else { } else {
s.Addr = ln.Addr() s.Addr = ln.Addr()
// Starting accepting connections // Starting accepting connections
go func() {
log.Println("Ready and accepting connections") log.Println("Ready and accepting connections")
// Start the peer handler
go s.peerHandler(ln)
}
go s.upnpUpdateThread()
// Start the reaping processes
go s.ReapDeadPeerHandler()
// Start the tx pool
s.TxPool.Start()
}
func (s *Ethereum) peerHandler(listener net.Listener) {
for { for {
conn, err := ln.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -254,14 +287,6 @@ func (s *Ethereum) Start() {
go s.AddPeer(conn) go s.AddPeer(conn)
} }
}()
}
// Start the reaping processes
go s.ReapDeadPeers()
// Start the tx pool
s.TxPool.Start()
} }
func (s *Ethereum) Stop() { func (s *Ethereum) Stop() {

@ -24,6 +24,8 @@ const (
CapDiscoveryTy = 0x01 CapDiscoveryTy = 0x01
CapTxTy = 0x02 CapTxTy = 0x02
CapChainTy = 0x04 CapChainTy = 0x04
CapDefault = CapChainTy | CapTxTy | CapDiscoveryTy
) )
var capsToString = map[Caps]string{ var capsToString = map[Caps]string{
@ -95,7 +97,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
} }
} }
func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
p := &Peer{ p := &Peer{
outputQueue: make(chan *ethwire.Msg, outputBufferSize), outputQueue: make(chan *ethwire.Msg, outputBufferSize),
quit: make(chan bool), quit: make(chan bool),
@ -103,6 +105,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
inbound: false, inbound: false,
connected: 0, connected: 0,
disconnect: 0, disconnect: 0,
caps: caps,
} }
// Set up the connection in another goroutine so we don't block the main thread // Set up the connection in another goroutine so we don't block the main thread
@ -165,7 +168,8 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) {
// Outbound message handler. Outbound messages are handled here // Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() { func (p *Peer) HandleOutbound() {
// The ping timer. Makes sure that every 2 minutes a ping is send to the peer // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
tickleTimer := time.NewTicker(2 * time.Minute) pingTimer := time.NewTicker(2 * time.Minute)
serviceTimer := time.NewTicker(5 * time.Second)
out: out:
for { for {
select { select {
@ -175,11 +179,20 @@ out:
p.lastSend = time.Now() p.lastSend = time.Now()
case <-tickleTimer.C: // Ping timer sends a ping to the peer each 2 minutes
case <-pingTimer.C:
p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
// Break out of the for loop if a quit message is posted // Service timer takes care of peer broadcasting, transaction
// posting or block posting
case <-serviceTimer.C:
if p.caps&CapDiscoveryTy > 0 {
msg := p.peersMessage()
p.ethereum.BroadcastMsg(msg)
}
case <-p.quit: case <-p.quit:
// Break out of the for loop if a quit message is posted
break out break out
} }
} }
@ -387,7 +400,7 @@ func (p *Peer) Stop() {
func (p *Peer) pushHandshake() error { func (p *Peer) pushHandshake() error {
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", CapChainTy | CapTxTy | CapDiscoveryTy, p.port, uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", p.caps, p.port,
}) })
p.QueueMessage(msg) p.QueueMessage(msg)
@ -395,18 +408,20 @@ func (p *Peer) pushHandshake() error {
return nil return nil
} }
// Pushes the list of outbound peers to the client when requested func (p *Peer) peersMessage() *ethwire.Msg {
func (p *Peer) pushPeers() {
outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
// Serialise each peer // Serialise each peer
for i, peer := range p.ethereum.InOutPeers() { for i, peer := range p.ethereum.InOutPeers() {
outPeers[i] = peer.RlpData() outPeers[i] = peer.RlpData()
} }
// Send message to the peer with the known list of connected clients // Return the message to the peer with the known list of connected clients
msg := ethwire.NewMessage(ethwire.MsgPeersTy, outPeers) return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
}
p.QueueMessage(msg) // Pushes the list of outbound peers to the client when requested
func (p *Peer) pushPeers() {
p.QueueMessage(p.peersMessage())
} }
func (p *Peer) handleHandshake(msg *ethwire.Msg) { func (p *Peer) handleHandshake(msg *ethwire.Msg) {

Loading…
Cancel
Save