@ -1,15 +1,11 @@
package p2p
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"math/big"
"net"
"sync"
"sync/atomic"
"time"
@ -138,140 +134,6 @@ func (rw *lockedRW) WriteMsg(msg Msg) error {
return rw . wrapped . WriteMsg ( msg )
}
// frameRW is a MsgReadWriter that reads and writes devp2p message frames.
// As required by the interface, ReadMsg and WriteMsg can be called from
// multiple goroutines.
type frameRW struct {
net . Conn // make Conn methods available. be careful.
bufconn * bufio . ReadWriter
// this channel is used to 'lend' bufconn to a caller of ReadMsg
// until the message payload has been consumed. the channel
// receives a value when EOF is reached on the payload, unblocking
// a pending call to ReadMsg.
rsync chan struct { }
// this mutex guards writes to bufconn.
writeMu sync . Mutex
}
func newFrameRW ( conn net . Conn , timeout time . Duration ) * frameRW {
rsync := make ( chan struct { } , 1 )
rsync <- struct { } { }
return & frameRW {
Conn : conn ,
bufconn : bufio . NewReadWriter ( bufio . NewReader ( conn ) , bufio . NewWriter ( conn ) ) ,
rsync : rsync ,
}
}
var magicToken = [ ] byte { 34 , 64 , 8 , 145 }
func ( rw * frameRW ) WriteMsg ( msg Msg ) error {
rw . writeMu . Lock ( )
defer rw . writeMu . Unlock ( )
rw . SetWriteDeadline ( time . Now ( ) . Add ( msgWriteTimeout ) )
if err := writeMsg ( rw . bufconn , msg ) ; err != nil {
return err
}
return rw . bufconn . Flush ( )
}
func writeMsg ( w io . Writer , msg Msg ) error {
// TODO: handle case when Size + len(code) + len(listhdr) overflows uint32
code := ethutil . Encode ( uint32 ( msg . Code ) )
listhdr := makeListHeader ( msg . Size + uint32 ( len ( code ) ) )
payloadLen := uint32 ( len ( listhdr ) ) + uint32 ( len ( code ) ) + msg . Size
start := make ( [ ] byte , 8 )
copy ( start , magicToken )
binary . BigEndian . PutUint32 ( start [ 4 : ] , payloadLen )
for _ , b := range [ ] [ ] byte { start , listhdr , code } {
if _ , err := w . Write ( b ) ; err != nil {
return err
}
}
_ , err := io . CopyN ( w , msg . Payload , int64 ( msg . Size ) )
return err
}
func makeListHeader ( length uint32 ) [ ] byte {
if length < 56 {
return [ ] byte { byte ( length + 0xc0 ) }
}
enc := big . NewInt ( int64 ( length ) ) . Bytes ( )
lenb := byte ( len ( enc ) ) + 0xf7
return append ( [ ] byte { lenb } , enc ... )
}
func ( rw * frameRW ) ReadMsg ( ) ( msg Msg , err error ) {
<- rw . rsync // wait until bufconn is ours
rw . SetReadDeadline ( time . Now ( ) . Add ( frameReadTimeout ) )
// read magic and payload size
start := make ( [ ] byte , 8 )
if _ , err = io . ReadFull ( rw . bufconn , start ) ; err != nil {
return msg , err
}
if ! bytes . HasPrefix ( start , magicToken ) {
return msg , fmt . Errorf ( "bad magic token %x" , start [ : 4 ] )
}
size := binary . BigEndian . Uint32 ( start [ 4 : ] )
// decode start of RLP message to get the message code
posr := & postrack { rw . bufconn , 0 }
s := rlp . NewStream ( posr )
if _ , err := s . List ( ) ; err != nil {
return msg , err
}
msg . Code , err = s . Uint ( )
if err != nil {
return msg , err
}
msg . Size = size - posr . p
rw . SetReadDeadline ( time . Now ( ) . Add ( payloadReadTimeout ) )
if msg . Size <= wholePayloadSize {
// msg is small, read all of it and move on to the next message.
pbuf := make ( [ ] byte , msg . Size )
if _ , err := io . ReadFull ( rw . bufconn , pbuf ) ; err != nil {
return msg , err
}
rw . rsync <- struct { } { } // bufconn is available again
msg . Payload = bytes . NewReader ( pbuf )
} else {
// lend bufconn to the caller until it has
// consumed the payload. eofSignal will send a value
// on rw.rsync when EOF is reached.
pr := & eofSignal { rw . bufconn , msg . Size , rw . rsync }
msg . Payload = pr
}
return msg , nil
}
// postrack wraps an rlp.ByteReader with a position counter.
type postrack struct {
r rlp . ByteReader
p uint32
}
func ( r * postrack ) Read ( buf [ ] byte ) ( int , error ) {
n , err := r . r . Read ( buf )
r . p += uint32 ( n )
return n , err
}
func ( r * postrack ) ReadByte ( ) ( byte , error ) {
b , err := r . r . ReadByte ( )
if err == nil {
r . p ++
}
return b , err
}
// eofSignal wraps a reader with eof signaling. the eof channel is
// closed when the wrapped reader returns an error or when count bytes
// have been read.