mirror of https://github.com/ethereum/go-ethereum
commit
2b32f47d2c
@ -0,0 +1,12 @@ |
||||
# See http://help.github.com/ignore-files/ for more about ignoring files. |
||||
# |
||||
# If you find yourself ignoring temporary files generated by your text editor |
||||
# or operating system, you probably want to add a global ignore instead: |
||||
# git config --global core.excludesfile ~/.gitignore_global |
||||
|
||||
/tmp |
||||
*/**/*un~ |
||||
*un~ |
||||
.DS_Store |
||||
*/**/.DS_Store |
||||
|
@ -0,0 +1,213 @@ |
||||
package eth |
||||
|
||||
import ( |
||||
"container/list" |
||||
"github.com/ethereum/ethchain-go" |
||||
"github.com/ethereum/ethdb-go" |
||||
"github.com/ethereum/ethutil-go" |
||||
"github.com/ethereum/ethwire-go" |
||||
"log" |
||||
"net" |
||||
"sync/atomic" |
||||
"time" |
||||
) |
||||
|
||||
func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { |
||||
// Loop thru the peers and close them (if we had them)
|
||||
for e := peers.Front(); e != nil; e = e.Next() { |
||||
if peer, ok := e.Value.(*Peer); ok { |
||||
callback(peer, e) |
||||
} |
||||
} |
||||
} |
||||
|
||||
const ( |
||||
processReapingTimeout = 60 // TODO increase
|
||||
) |
||||
|
||||
type Ethereum struct { |
||||
// Channel for shutting down the ethereum
|
||||
shutdownChan chan bool |
||||
// DB interface
|
||||
//db *ethdb.LDBDatabase
|
||||
db *ethdb.MemDatabase |
||||
// Block manager for processing new blocks and managing the block chain
|
||||
BlockManager *ethchain.BlockManager |
||||
// The transaction pool. Transaction can be pushed on this pool
|
||||
// for later including in the blocks
|
||||
TxPool *ethchain.TxPool |
||||
// Peers (NYI)
|
||||
peers *list.List |
||||
// Nonce
|
||||
Nonce uint64 |
||||
} |
||||
|
||||
func New() (*Ethereum, error) { |
||||
//db, err := ethdb.NewLDBDatabase()
|
||||
db, err := ethdb.NewMemDatabase() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ethutil.Config.Db = db |
||||
|
||||
nonce, _ := ethutil.RandomUint64() |
||||
ethereum := &Ethereum{ |
||||
shutdownChan: make(chan bool), |
||||
db: db, |
||||
peers: list.New(), |
||||
Nonce: nonce, |
||||
} |
||||
ethereum.TxPool = ethchain.NewTxPool() |
||||
ethereum.TxPool.Speaker = ethereum |
||||
ethereum.BlockManager = ethchain.NewBlockManager() |
||||
|
||||
ethereum.TxPool.BlockManager = ethereum.BlockManager |
||||
ethereum.BlockManager.TransactionPool = ethereum.TxPool |
||||
|
||||
return ethereum, nil |
||||
} |
||||
|
||||
func (s *Ethereum) AddPeer(conn net.Conn) { |
||||
peer := NewPeer(conn, s, true) |
||||
|
||||
if peer != nil { |
||||
s.peers.PushBack(peer) |
||||
peer.Start() |
||||
|
||||
log.Println("Peer connected ::", conn.RemoteAddr()) |
||||
} |
||||
} |
||||
|
||||
func (s *Ethereum) ProcessPeerList(addrs []string) { |
||||
for _, addr := range addrs { |
||||
// TODO Probably requires some sanity checks
|
||||
s.ConnectToPeer(addr) |
||||
} |
||||
} |
||||
|
||||
func (s *Ethereum) ConnectToPeer(addr string) error { |
||||
peer := NewOutboundPeer(addr, s) |
||||
|
||||
s.peers.PushBack(peer) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (s *Ethereum) OutboundPeers() []*Peer { |
||||
// Create a new peer slice with at least the length of the total peers
|
||||
outboundPeers := make([]*Peer, s.peers.Len()) |
||||
length := 0 |
||||
eachPeer(s.peers, func(p *Peer, e *list.Element) { |
||||
if !p.inbound { |
||||
outboundPeers[length] = p |
||||
length++ |
||||
} |
||||
}) |
||||
|
||||
return outboundPeers[:length] |
||||
} |
||||
|
||||
func (s *Ethereum) InboundPeers() []*Peer { |
||||
// Create a new peer slice with at least the length of the total peers
|
||||
inboundPeers := make([]*Peer, s.peers.Len()) |
||||
length := 0 |
||||
eachPeer(s.peers, func(p *Peer, e *list.Element) { |
||||
if p.inbound { |
||||
inboundPeers[length] = p |
||||
length++ |
||||
} |
||||
}) |
||||
|
||||
return inboundPeers[:length] |
||||
} |
||||
|
||||
func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []byte) { |
||||
eachPeer(s.peers, func(p *Peer, e *list.Element) { |
||||
p.QueueMessage(ethwire.NewMessage(msgType, data)) |
||||
}) |
||||
} |
||||
|
||||
func (s *Ethereum) ReapDeadPeers() { |
||||
for { |
||||
eachPeer(s.peers, func(p *Peer, e *list.Element) { |
||||
if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { |
||||
log.Println("Dead peer found .. reaping") |
||||
|
||||
s.peers.Remove(e) |
||||
} |
||||
}) |
||||
|
||||
time.Sleep(processReapingTimeout * time.Second) |
||||
} |
||||
} |
||||
|
||||
// Start the ethereum
|
||||
func (s *Ethereum) Start() { |
||||
// For now this function just blocks the main thread
|
||||
ln, err := net.Listen("tcp", ":12345") |
||||
if err != nil { |
||||
// This is mainly for testing to create a "network"
|
||||
if ethutil.Config.Debug { |
||||
log.Println("Connection listening disabled. Acting as client") |
||||
|
||||
err = s.ConnectToPeer("localhost:12345") |
||||
if err != nil { |
||||
log.Println("Error starting ethereum", err) |
||||
|
||||
s.Stop() |
||||
} |
||||
} else { |
||||
log.Fatal(err) |
||||
} |
||||
} else { |
||||
// Starting accepting connections
|
||||
go func() { |
||||
for { |
||||
conn, err := ln.Accept() |
||||
if err != nil { |
||||
log.Println(err) |
||||
|
||||
continue |
||||
} |
||||
|
||||
go s.AddPeer(conn) |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// Start the reaping processes
|
||||
go s.ReapDeadPeers() |
||||
|
||||
// Start the tx pool
|
||||
s.TxPool.Start() |
||||
|
||||
// TMP
|
||||
/* |
||||
go func() { |
||||
for { |
||||
s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode()) |
||||
|
||||
time.Sleep(1000 * time.Millisecond) |
||||
} |
||||
}() |
||||
*/ |
||||
} |
||||
|
||||
func (s *Ethereum) Stop() { |
||||
// Close the database
|
||||
defer s.db.Close() |
||||
|
||||
eachPeer(s.peers, func(p *Peer, e *list.Element) { |
||||
p.Stop() |
||||
}) |
||||
|
||||
s.shutdownChan <- true |
||||
|
||||
s.TxPool.Stop() |
||||
} |
||||
|
||||
// This function will wait for a shutdown and resumes main thread execution
|
||||
func (s *Ethereum) WaitForShutdown() { |
||||
<-s.shutdownChan |
||||
} |
@ -0,0 +1,303 @@ |
||||
package eth |
||||
|
||||
import ( |
||||
"github.com/ethereum/ethutil-go" |
||||
"github.com/ethereum/ethwire-go" |
||||
"log" |
||||
"net" |
||||
"strconv" |
||||
"sync/atomic" |
||||
"time" |
||||
) |
||||
|
||||
const ( |
||||
// The size of the output buffer for writing messages
|
||||
outputBufferSize = 50 |
||||
) |
||||
|
||||
type Peer struct { |
||||
// Ethereum interface
|
||||
ethereum *Ethereum |
||||
// Net connection
|
||||
conn net.Conn |
||||
// Output queue which is used to communicate and handle messages
|
||||
outputQueue chan *ethwire.Msg |
||||
// Quit channel
|
||||
quit chan bool |
||||
// Determines whether it's an inbound or outbound peer
|
||||
inbound bool |
||||
// Flag for checking the peer's connectivity state
|
||||
connected int32 |
||||
disconnect int32 |
||||
// Last known message send
|
||||
lastSend time.Time |
||||
// Indicated whether a verack has been send or not
|
||||
// This flag is used by writeMessage to check if messages are allowed
|
||||
// to be send or not. If no version is known all messages are ignored.
|
||||
versionKnown bool |
||||
|
||||
// Last received pong message
|
||||
lastPong int64 |
||||
// Indicates whether a MsgGetPeersTy was requested of the peer
|
||||
// this to prevent receiving false peers.
|
||||
requestedPeerList bool |
||||
} |
||||
|
||||
func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { |
||||
return &Peer{ |
||||
outputQueue: make(chan *ethwire.Msg, outputBufferSize), |
||||
quit: make(chan bool), |
||||
ethereum: ethereum, |
||||
conn: conn, |
||||
inbound: inbound, |
||||
disconnect: 0, |
||||
connected: 1, |
||||
} |
||||
} |
||||
|
||||
func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { |
||||
p := &Peer{ |
||||
outputQueue: make(chan *ethwire.Msg, outputBufferSize), |
||||
quit: make(chan bool), |
||||
ethereum: ethereum, |
||||
inbound: false, |
||||
connected: 0, |
||||
disconnect: 0, |
||||
} |
||||
|
||||
// Set up the connection in another goroutine so we don't block the main thread
|
||||
go func() { |
||||
conn, err := net.Dial("tcp", addr) |
||||
if err != nil { |
||||
p.Stop() |
||||
} |
||||
p.conn = conn |
||||
|
||||
// Atomically set the connection state
|
||||
atomic.StoreInt32(&p.connected, 1) |
||||
atomic.StoreInt32(&p.disconnect, 0) |
||||
|
||||
log.Println("Connected to peer ::", conn.RemoteAddr()) |
||||
|
||||
p.Start() |
||||
}() |
||||
|
||||
return p |
||||
} |
||||
|
||||
// Outputs any RLP encoded data to the peer
|
||||
func (p *Peer) QueueMessage(msg *ethwire.Msg) { |
||||
p.outputQueue <- msg |
||||
} |
||||
|
||||
func (p *Peer) writeMessage(msg *ethwire.Msg) { |
||||
// Ignore the write if we're not connected
|
||||
if atomic.LoadInt32(&p.connected) != 1 { |
||||
return |
||||
} |
||||
|
||||
if !p.versionKnown { |
||||
switch msg.Type { |
||||
case ethwire.MsgHandshakeTy: // Ok
|
||||
default: // Anything but ack is allowed
|
||||
return |
||||
} |
||||
} |
||||
|
||||
err := ethwire.WriteMessage(p.conn, msg) |
||||
if err != nil { |
||||
log.Println("Can't send message:", err) |
||||
// Stop the client if there was an error writing to it
|
||||
p.Stop() |
||||
return |
||||
} |
||||
} |
||||
|
||||
// Outbound message handler. Outbound messages are handled here
|
||||
func (p *Peer) HandleOutbound() { |
||||
// The ping timer. Makes sure that every 2 minutes a ping is send to the peer
|
||||
tickleTimer := time.NewTicker(2 * time.Minute) |
||||
out: |
||||
for { |
||||
select { |
||||
// Main message queue. All outbound messages are processed through here
|
||||
case msg := <-p.outputQueue: |
||||
p.writeMessage(msg) |
||||
|
||||
p.lastSend = time.Now() |
||||
|
||||
case <-tickleTimer.C: |
||||
p.writeMessage(ðwire.Msg{Type: ethwire.MsgPingTy}) |
||||
|
||||
// Break out of the for loop if a quit message is posted
|
||||
case <-p.quit: |
||||
break out |
||||
} |
||||
} |
||||
|
||||
clean: |
||||
// This loop is for draining the output queue and anybody waiting for us
|
||||
for { |
||||
select { |
||||
case <-p.outputQueue: |
||||
// TODO
|
||||
default: |
||||
break clean |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Inbound handler. Inbound messages are received here and passed to the appropriate methods
|
||||
func (p *Peer) HandleInbound() { |
||||
|
||||
out: |
||||
for atomic.LoadInt32(&p.disconnect) == 0 { |
||||
// Wait for a message from the peer
|
||||
msg, err := ethwire.ReadMessage(p.conn) |
||||
if err != nil { |
||||
log.Println(err) |
||||
|
||||
break out |
||||
} |
||||
|
||||
if ethutil.Config.Debug { |
||||
log.Printf("Received %s\n", msg.Type.String()) |
||||
} |
||||
|
||||
switch msg.Type { |
||||
case ethwire.MsgHandshakeTy: |
||||
// Version message
|
||||
p.handleHandshake(msg) |
||||
case ethwire.MsgBlockTy: |
||||
err := p.ethereum.BlockManager.ProcessBlock(ethutil.NewBlock(msg.Data)) |
||||
if err != nil { |
||||
log.Println(err) |
||||
} |
||||
case ethwire.MsgTxTy: |
||||
p.ethereum.TxPool.QueueTransaction(ethutil.NewTransactionFromData(msg.Data)) |
||||
case ethwire.MsgInvTy: |
||||
case ethwire.MsgGetPeersTy: |
||||
p.requestedPeerList = true |
||||
// Peer asked for list of connected peers
|
||||
p.pushPeers() |
||||
case ethwire.MsgPeersTy: |
||||
// Received a list of peers (probably because MsgGetPeersTy was send)
|
||||
// Only act on message if we actually requested for a peers list
|
||||
if p.requestedPeerList { |
||||
data := ethutil.Conv(msg.Data) |
||||
// Create new list of possible peers for the ethereum to process
|
||||
peers := make([]string, data.Length()) |
||||
// Parse each possible peer
|
||||
for i := 0; i < data.Length(); i++ { |
||||
peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) |
||||
} |
||||
|
||||
// Connect to the list of peers
|
||||
p.ethereum.ProcessPeerList(peers) |
||||
// Mark unrequested again
|
||||
p.requestedPeerList = false |
||||
} |
||||
case ethwire.MsgPingTy: |
||||
// Respond back with pong
|
||||
p.QueueMessage(ðwire.Msg{Type: ethwire.MsgPongTy}) |
||||
case ethwire.MsgPongTy: |
||||
p.lastPong = time.Now().Unix() |
||||
} |
||||
} |
||||
|
||||
p.Stop() |
||||
} |
||||
|
||||
func (p *Peer) Start() { |
||||
if !p.inbound { |
||||
err := p.pushHandshake() |
||||
if err != nil { |
||||
log.Printf("Peer can't send outbound version ack", err) |
||||
|
||||
p.Stop() |
||||
} |
||||
} |
||||
|
||||
// Run the outbound handler in a new goroutine
|
||||
go p.HandleOutbound() |
||||
// Run the inbound handler in a new goroutine
|
||||
go p.HandleInbound() |
||||
} |
||||
|
||||
func (p *Peer) Stop() { |
||||
if atomic.AddInt32(&p.disconnect, 1) != 1 { |
||||
return |
||||
} |
||||
|
||||
close(p.quit) |
||||
if atomic.LoadInt32(&p.connected) != 0 { |
||||
p.conn.Close() |
||||
} |
||||
|
||||
log.Println("Peer shutdown") |
||||
} |
||||
|
||||
func (p *Peer) pushHandshake() error { |
||||
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{ |
||||
1, 0, p.ethereum.Nonce, |
||||
})) |
||||
|
||||
p.QueueMessage(msg) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Pushes the list of outbound peers to the client when requested
|
||||
func (p *Peer) pushPeers() { |
||||
outPeers := make([]interface{}, len(p.ethereum.OutboundPeers())) |
||||
// Serialise each peer
|
||||
for i, peer := range p.ethereum.OutboundPeers() { |
||||
outPeers[i] = peer.RlpEncode() |
||||
} |
||||
|
||||
// Send message to the peer with the known list of connected clients
|
||||
msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers)) |
||||
|
||||
p.QueueMessage(msg) |
||||
} |
||||
|
||||
func (p *Peer) handleHandshake(msg *ethwire.Msg) { |
||||
c := ethutil.Conv(msg.Data) |
||||
// [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
|
||||
if c.Get(2).AsUint() == p.ethereum.Nonce { |
||||
//if msg.Nonce == p.ethereum.Nonce {
|
||||
log.Println("Peer connected to self, disconnecting") |
||||
|
||||
p.Stop() |
||||
|
||||
return |
||||
} |
||||
|
||||
p.versionKnown = true |
||||
|
||||
// If this is an inbound connection send an ack back
|
||||
if p.inbound { |
||||
err := p.pushHandshake() |
||||
if err != nil { |
||||
log.Println("Peer can't send ack back") |
||||
|
||||
p.Stop() |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (p *Peer) RlpEncode() []byte { |
||||
host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String()) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
|
||||
i, err := strconv.Atoi(prt) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
|
||||
port := ethutil.NumberToBytes(uint16(i), 16) |
||||
|
||||
return ethutil.Encode([]interface{}{host, port}) |
||||
} |
Loading…
Reference in new issue