Implemented WebSocket package

pull/150/head
obscuren 10 years ago
parent ab6ede51d7
commit 3af211dd65
  1. 5
      block_pool.go
  2. 14
      ethchain/state_transition.go
  3. 2
      peer.go
  4. 122
      websocket/client.go
  5. 14
      websocket/message.go
  6. 127
      websocket/server.go

@ -11,7 +11,6 @@ import (
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
) )
var poollogger = ethlog.NewLogger("BPOOL") var poollogger = ethlog.NewLogger("BPOOL")
@ -99,8 +98,8 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) {
self.pool[hash] = &block{peer, peer, b, time.Now(), 0} self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes {
poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) //poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4])
peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)}))
} }
} else if self.pool[hash] != nil { } else if self.pool[hash] != nil {
self.pool[hash].block = b self.pool[hash].block = b

@ -142,14 +142,12 @@ func (self *StateTransition) preCheck() (err error) {
func (self *StateTransition) TransitionState() (err error) { func (self *StateTransition) TransitionState() (err error) {
statelogger.Debugf("(~) %x\n", self.tx.Hash()) statelogger.Debugf("(~) %x\n", self.tx.Hash())
/* defer func() {
defer func() { if r := recover(); r != nil {
if r := recover(); r != nil { statelogger.Infoln(r)
logger.Infoln(r) err = fmt.Errorf("state transition err %v", r)
err = fmt.Errorf("state transition err %v", r) }
} }()
}()
*/
// XXX Transactions after this point are considered valid. // XXX Transactions after this point are considered valid.
if err = self.preCheck(); err != nil { if err = self.preCheck(); err != nil {

@ -320,7 +320,7 @@ out:
case msg := <-p.outputQueue: case msg := <-p.outputQueue:
if !p.statusKnown { if !p.statusKnown {
switch msg.Type { switch msg.Type {
case ethwire.MsgStatusTy, ethwire.MsgGetTxsTy, ethwire.MsgTxTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockTy: case ethwire.MsgGetTxsTy, ethwire.MsgTxTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockTy:
break skip break skip
} }
} }

@ -0,0 +1,122 @@
package websocket
import (
"fmt"
"io"
ws "code.google.com/p/go.net/websocket"
)
const channelBufSize = 100
var maxId int = 0
type MsgFunc func(c *Client, msg *Message)
// Chat client.
type Client struct {
id int
ws *ws.Conn
server *Server
ch chan *Message
doneCh chan bool
onMessage MsgFunc
}
// Create new chat client.
func NewClient(ws *ws.Conn, server *Server) *Client {
if ws == nil {
panic("ws cannot be nil")
}
if server == nil {
panic("server cannot be nil")
}
maxId++
ch := make(chan *Message, channelBufSize)
doneCh := make(chan bool)
return &Client{maxId, ws, server, ch, doneCh, nil}
}
func (c *Client) Id() int {
return c.id
}
func (c *Client) Conn() *ws.Conn {
return c.ws
}
func (c *Client) Write(data interface{}, seed int) {
msg := &Message{Seed: seed, Data: data}
select {
case c.ch <- msg:
default:
c.server.Del(c)
err := fmt.Errorf("client %d is disconnected.", c.id)
c.server.Err(err)
}
}
func (c *Client) Done() {
c.doneCh <- true
}
// Listen Write and Read request via chanel
func (c *Client) Listen() {
go c.listenWrite()
c.listenRead()
}
// Listen write request via chanel
func (c *Client) listenWrite() {
logger.Debugln("Listening write to client")
for {
select {
// send message to the client
case msg := <-c.ch:
logger.Debugln("Send:", msg)
ws.JSON.Send(c.ws, msg)
// receive done request
case <-c.doneCh:
c.server.Del(c)
c.doneCh <- true // for listenRead method
return
}
}
}
// Listen read request via chanel
func (c *Client) listenRead() {
logger.Debugln("Listening read from client")
for {
select {
// receive done request
case <-c.doneCh:
c.server.Del(c)
c.doneCh <- true // for listenWrite method
return
// read data from ws connection
default:
var msg Message
err := ws.JSON.Receive(c.ws, &msg)
if err == io.EOF {
c.doneCh <- true
} else if err != nil {
c.server.Err(err)
} else {
logger.Debugln(&msg)
if c.onMessage != nil {
c.onMessage(c, &msg)
}
}
}
}
}

@ -0,0 +1,14 @@
package websocket
import "github.com/ethereum/eth-go/ethutil"
type Message struct {
Call string `json:"call"`
Args []interface{} `json:"args"`
Seed int `json:"seed"`
Data interface{} `json:"data"`
}
func (self *Message) Arguments() *ethutil.Value {
return ethutil.NewValue(self.Args)
}

@ -0,0 +1,127 @@
package websocket
import (
"net/http"
"github.com/ethereum/eth-go/ethlog"
ws "code.google.com/p/go.net/websocket"
)
var logger = ethlog.NewLogger("WS")
// Chat server.
type Server struct {
httpServ string
pattern string
messages []*Message
clients map[int]*Client
addCh chan *Client
delCh chan *Client
sendAllCh chan string
doneCh chan bool
errCh chan error
msgFunc MsgFunc
}
// Create new chat server.
func NewServer(pattern, httpServ string) *Server {
clients := make(map[int]*Client)
addCh := make(chan *Client)
delCh := make(chan *Client)
sendAllCh := make(chan string)
doneCh := make(chan bool)
errCh := make(chan error)
return &Server{
httpServ,
pattern,
nil,
clients,
addCh,
delCh,
sendAllCh,
doneCh,
errCh,
nil,
}
}
func (s *Server) Add(c *Client) {
s.addCh <- c
}
func (s *Server) Del(c *Client) {
s.delCh <- c
}
func (s *Server) SendAll(msg string) {
s.sendAllCh <- msg
}
func (s *Server) Done() {
s.doneCh <- true
}
func (s *Server) Err(err error) {
s.errCh <- err
}
func (s *Server) servHTTP() {
logger.Debugln("Serving http", s.httpServ)
err := http.ListenAndServe(s.httpServ, nil)
logger.Warnln(err)
}
func (s *Server) MessageFunc(f MsgFunc) {
s.msgFunc = f
}
// Listen and serve.
// It serves client connection and broadcast request.
func (s *Server) Listen() {
logger.Debugln("Listening server...")
// ws handler
onConnected := func(ws *ws.Conn) {
defer func() {
err := ws.Close()
if err != nil {
s.errCh <- err
}
}()
client := NewClient(ws, s)
client.onMessage = s.msgFunc
s.Add(client)
client.Listen()
}
// Disable Origin check. Request don't need to come necessarily from origin.
http.HandleFunc(s.pattern, func(w http.ResponseWriter, req *http.Request) {
s := ws.Server{Handler: ws.Handler(onConnected)}
s.ServeHTTP(w, req)
})
logger.Debugln("Created handler")
go s.servHTTP()
for {
select {
// Add new a client
case c := <-s.addCh:
s.clients[c.id] = c
// del a client
case c := <-s.delCh:
delete(s.clients, c.id)
case err := <-s.errCh:
logger.Debugln("Error:", err.Error())
case <-s.doneCh:
return
}
}
}
Loading…
Cancel
Save