From 7aefe123e98240ad4df440a8d1be4446744c8ca2 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 11:58:41 +0200 Subject: [PATCH 1/7] core/types: add Transaction.Size --- core/types/transaction.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/types/transaction.go b/core/types/transaction.go index 3d6d31ae7..a03a6b847 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -67,6 +67,13 @@ func (tx *Transaction) Hash() common.Hash { }) } +// Size returns the encoded RLP size of tx. +func (self *Transaction) Size() common.StorageSize { + c := writeCounter(0) + rlp.Encode(&c, self) + return common.StorageSize(c) +} + func (self *Transaction) Data() []byte { return self.Payload } From 41b2008a669a8454ae19f783eb2dcd967e8752cf Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:00:41 +0200 Subject: [PATCH 2/7] eth: limit number of sent blocks based on message size If blocks get larger, sending 256 at once can make messages large enough to exceed the low-level write timeout. --- eth/handler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index f2027c3c6..a67d956fb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +// This is the target maximum size of returned blocks for the +// getBlocks message. The reply message may exceed it +// if a single block is larger than the limit. +const maxBlockRespSize = 2 * 1024 * 1024 + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -246,7 +251,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := msgStream.List(); err != nil { return err } - var i int + var ( + i int + totalsize common.StorageSize + ) for { i++ var hash common.Hash @@ -260,8 +268,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { block := self.chainman.GetBlock(hash) if block != nil { blocks = append(blocks, block) + totalsize += block.Size() } - if i == downloader.MaxBlockFetch { + if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize { break } } From 6c73a5980640581903d8f56b3912b22641d5195c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:03:14 +0200 Subject: [PATCH 3/7] eth: limit number of sent transactions based on message size Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth. --- eth/handler.go | 24 ++++++++----- eth/sync.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index a67d956fb..f002727f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -53,9 +53,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer newHashCh chan []*blockAnnounce newBlockCh chan chan []*types.Block + txsyncCh chan *txsync quitSync chan struct{} // wait group is used for graceful shutdowns during downloading @@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), newBlockCh: make(chan chan []*types.Block), + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), @@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // start sync handlers go pm.syncer() go pm.fetcher() + go pm.txsyncLoop() } func (pm *ProtocolManager) Stop() { @@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() { pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits the sync handler + close(pm.quitSync) // quits syncer, fetcher, txsyncLoop // Wait for any process action pm.wg.Wait() @@ -150,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake, short circuit if fails + // Execute the Ethereum handshake. if err := p.handleStatus(); err != nil { return err } - // Register the peer locally and in the downloader too + + // Register the peer locally. glog.V(logger.Detail).Infoln("Adding peer", p.id) if err := pm.peers.Register(p); err != nil { glog.V(logger.Error).Infoln("Addition failed:", err) @@ -162,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) + // Register the peer in the downloader. If the downloader + // considers it banned, we disconnect. if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // propagate existing transactions. new transactions appearing + + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. - if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { - return err - } + pm.syncTransactions(p) + // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { diff --git a/eth/sync.go b/eth/sync.go index 8e4e3cfbe..a25d4d4fd 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "math/rand" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -20,6 +22,10 @@ const ( notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 + + // This is the target size for the packs of transactions sent by txsyncLoop. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 ) // blockAnnounce is the hash notification of the availability of a new block in @@ -30,6 +36,94 @@ type blockAnnounce struct { time time.Time } +type txsync struct { + p *peer + txs []*types.Transaction +} + +// syncTransactions starts sending all currently pending transactions to the given peer. +func (pm *ProtocolManager) syncTransactions(p *peer) { + txs := pm.txpool.GetTransactions() + if len(txs) == 0 { + return + } + select { + case pm.txsyncCh <- &txsync{p, txs}: + case <-pm.quitSync: + } +} + +// txsyncLoop takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (pm *ProtocolManager) txsyncLoop() { + var ( + pending = make(map[discover.NodeID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size) + sending = true + go func() { done <- pack.p.sendTransactions(pack.txs) }() + } + + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.txsyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // fetcher is responsible for collecting hash notifications, and periodically // checking all unknown ones and individually fetching them. func (pm *ProtocolManager) fetcher() { From 2c24a73e253a0b0c205406c7dc9fc4b8a7d97e86 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:10:19 +0200 Subject: [PATCH 4/7] eth: add protocol tests The protocol tests were commented out when eth/downloader was introduced. --- eth/protocol_test.go | 526 ++++++++++++++++--------------------------- 1 file changed, 190 insertions(+), 336 deletions(-) diff --git a/eth/protocol_test.go b/eth/protocol_test.go index d44f66b89..bbea9fc11 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -1,388 +1,242 @@ package eth -/* -TODO All of these tests need to be re-written - -var logsys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel)) - -var ini = false - -func logInit() { - if !ini { - ethlogger.AddLogSystem(logsys) - ini = true - } -} - -type testTxPool struct { - getTransactions func() []*types.Transaction - addTransactions func(txs []*types.Transaction) -} - -type testChainManager struct { - getBlockHashes func(hash common.Hash, amount uint64) (hashes []common.Hash) - getBlock func(hash common.Hash) *types.Block - status func() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) -} - -type testBlockPool struct { - addBlockHashes func(next func() (common.Hash, bool), peerId string) - addBlock func(block *types.Block, peerId string) (err error) - addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) - removePeer func(peerId string) -} - -func (self *testTxPool) AddTransactions(txs []*types.Transaction) { - if self.addTransactions != nil { - self.addTransactions(txs) - } -} - -func (self *testTxPool) GetTransactions() types.Transactions { return nil } - -func (self *testChainManager) GetBlockHashesFromHash(hash common.Hash, amount uint64) (hashes []common.Hash) { - if self.getBlockHashes != nil { - hashes = self.getBlockHashes(hash, amount) - } - return -} - -func (self *testChainManager) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) { - if self.status != nil { - td, currentBlock, genesisBlock = self.status() - } else { - td = common.Big1 - currentBlock = common.Hash{1} - genesisBlock = common.Hash{2} - } - return -} - -func (self *testChainManager) GetBlock(hash common.Hash) (block *types.Block) { - if self.getBlock != nil { - block = self.getBlock(hash) - } - return -} - -func (self *testBlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) { - if self.addBlockHashes != nil { - self.addBlockHashes(next, peerId) - } -} - -func (self *testBlockPool) AddBlock(block *types.Block, peerId string) { - if self.addBlock != nil { - self.addBlock(block, peerId) - } -} - -func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) { - if self.addPeer != nil { - best, suspended = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError) - } - return -} - -func (self *testBlockPool) RemovePeer(peerId string) { - if self.removePeer != nil { - self.removePeer(peerId) - } -} - -func testPeer() *p2p.Peer { - var id discover.NodeID - pk := crypto.GenerateNewKeyPair().PublicKey - copy(id[:], pk) - return p2p.NewPeer(id, "test peer", []p2p.Cap{}) -} - -type ethProtocolTester struct { - p2p.MsgReadWriter // writing to the tester feeds the protocol - - quit chan error - pipe *p2p.MsgPipeRW // the protocol read/writes on this end - txPool *testTxPool // txPool - chainManager *testChainManager // chainManager - blockPool *testBlockPool // blockPool - t *testing.T -} - -func newEth(t *testing.T) *ethProtocolTester { - p1, p2 := p2p.MsgPipe() - return ðProtocolTester{ - MsgReadWriter: p1, - quit: make(chan error, 1), - pipe: p2, - txPool: &testTxPool{}, - chainManager: &testChainManager{}, - blockPool: &testBlockPool{}, - t: t, - } -} - -func (self *ethProtocolTester) reset() { - self.pipe.Close() - - p1, p2 := p2p.MsgPipe() - self.MsgReadWriter = p1 - self.pipe = p2 - self.quit = make(chan error, 1) -} - -func (self *ethProtocolTester) checkError(expCode int, delay time.Duration) (err error) { - var timer = time.After(delay) - select { - case err = <-self.quit: - case <-timer: - self.t.Errorf("no error after %v, expected %v", delay, expCode) - return - } - perr, ok := err.(*errs.Error) - if ok && perr != nil { - if code := perr.Code; code != expCode { - self.t.Errorf("expected protocol error (code %v), got %v (%v)", expCode, code, err) - } - } else { - self.t.Errorf("expected protocol error (code %v), got %v", expCode, err) - } - return -} - -func (self *ethProtocolTester) run() { - err := runEthProtocol(ProtocolVersion, NetworkId, self.txPool, self.chainManager, self.blockPool, testPeer(), self.pipe) - self.quit <- err -} - -func (self *ethProtocolTester) handshake(t *testing.T, mock bool) { - td, currentBlock, genesis := self.chainManager.Status() - // first outgoing msg should be StatusMsg. - err := p2p.ExpectMsg(self, StatusMsg, &statusMsgData{ - ProtocolVersion: ProtocolVersion, - NetworkId: NetworkId, - TD: td, - CurrentBlock: currentBlock, - GenesisBlock: genesis, - }) - if err != nil { - t.Fatalf("incorrect outgoing status: %v", err) - } - if mock { - go p2p.Send(self, StatusMsg, &statusMsgData{ProtocolVersion, NetworkId, td, currentBlock, genesis}) - } -} +import ( + "crypto/rand" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +func init() { + // glog.SetToStderr(true) + // glog.SetV(6) +} + +var testAccount = crypto.NewKey(rand.Reader) func TestStatusMsgErrors(t *testing.T) { - logInit() - eth := newEth(t) - go eth.run() - td, currentBlock, genesis := eth.chainManager.Status() + pm := newProtocolManagerForTesting(nil) + td, currentBlock, genesis := pm.chainman.Status() + defer pm.Stop() tests := []struct { - code uint64 - data interface{} - wantErrorCode int + code uint64 + data interface{} + wantError error }{ { code: TxMsg, data: []interface{}{}, - wantErrorCode: ErrNoStatusMsg, + wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{10, NetworkId, td, currentBlock, genesis}, - wantErrorCode: ErrProtocolVersionMismatch, + wantError: errResp(ErrProtocolVersionMismatch, "10 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{ProtocolVersion, 999, td, currentBlock, genesis}, - wantErrorCode: ErrNetworkIdMismatch, + wantError: errResp(ErrNetworkIdMismatch, "999 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{ProtocolVersion, NetworkId, td, currentBlock, common.Hash{3}}, - wantErrorCode: ErrGenesisBlockMismatch, + wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000000000000000000000000000000000000000000000000000 (!= %x)", genesis), }, } - for _, test := range tests { - eth.handshake(t, false) - // the send call might hang until reset because + + for i, test := range tests { + p, errc := newTestPeer(pm) + // The send call might hang until reset because // the protocol might not read the payload. - go p2p.Send(eth, test.code, test.data) - eth.checkError(test.wantErrorCode, 1*time.Second) + go p2p.Send(p, test.code, test.data) - eth.reset() - go eth.run() + select { + case err := <-errc: + if err == nil { + t.Errorf("test %d: protocol returned nil error, want %q", test.wantError) + } else if err.Error() != test.wantError.Error() { + t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.wantError) + } + case <-time.After(2 * time.Second): + t.Errorf("protocol did not shut down withing 2 seconds") + } + p.close() } } -func TestNewBlockMsg(t *testing.T) { - // logInit() - eth := newEth(t) - - var disconnected bool - eth.blockPool.removePeer = func(peerId string) { - disconnected = true - } - - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) - } - - var tds = make(chan *big.Int) - eth.blockPool.addPeer = func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) { - tds <- td - return - } - - var delay = 1 * time.Second - // eth.reset() - block := types.NewBlock(common.Hash{1}, common.Address{1}, common.Hash{1}, common.Big1, 1, []byte("extra")) - - go p2p.Send(eth, NewBlockMsg, &newBlockMsgData{Block: block}) - timer := time.After(delay) +// This test checks that received transactions are added to the local pool. +func TestRecvTransactions(t *testing.T) { + txAdded := make(chan []*types.Transaction) + pm := newProtocolManagerForTesting(txAdded) + p, _ := newTestPeer(pm) + defer pm.Stop() + defer p.close() + p.handshake(t) - select { - case td := <-tds: - if td.Cmp(common.Big0) != 0 { - t.Errorf("incorrect td %v, expected %v", td, common.Big0) - } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return + tx := newtx(testAccount, 0, 0) + if err := p2p.Send(p, TxMsg, []interface{}{tx}); err != nil { + t.Fatalf("send error: %v", err) } - - go p2p.Send(eth, NewBlockMsg, &newBlockMsgData{block, common.Big2}) - timer = time.After(delay) - select { - case td := <-tds: - if td.Cmp(common.Big2) != 0 { - t.Errorf("incorrect td %v, expected %v", td, common.Big2) + case added := <-txAdded: + if len(added) != 1 { + t.Errorf("wrong number of added transactions: got %d, want 1", len(added)) + } else if added[0].Hash() != tx.Hash() { + t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return + case <-time.After(2 * time.Second): + t.Errorf("no TxPreEvent received within 2 seconds") } - - go p2p.Send(eth, NewBlockMsg, []interface{}{}) - // Block.DecodeRLP: validation failed: header is nil - eth.checkError(ErrDecode, delay) - } -func TestBlockMsg(t *testing.T) { - // logInit() - eth := newEth(t) - blocks := make(chan *types.Block) - eth.blockPool.addBlock = func(block *types.Block, peerId string) (err error) { - blocks <- block - return - } +// This test checks that pending transactions are sent. +func TestSendTransactions(t *testing.T) { + pm := newProtocolManagerForTesting(nil) + defer pm.Stop() - var disconnected bool - eth.blockPool.removePeer = func(peerId string) { - disconnected = true + // Fill the pool with big transactions. + const txsize = txsyncPackSize / 10 + alltxs := make([]*types.Transaction, 100) + for nonce := range alltxs { + alltxs[nonce] = newtx(testAccount, uint64(nonce), txsize) } + pm.txpool.AddTransactions(alltxs) - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) - } - - var delay = 3 * time.Second - // eth.reset() - newblock := func(i int64) *types.Block { - return types.NewBlock(common.Hash{byte(i)}, common.Address{byte(i)}, common.Hash{byte(i)}, big.NewInt(i), uint64(i), []byte{byte(i)}) - } - b := newblock(0) - b.Header().Difficulty = nil // check if nil as *big.Int decodes as 0 - go p2p.Send(eth, BlocksMsg, types.Blocks{b, newblock(1), newblock(2)}) - timer := time.After(delay) - for i := int64(0); i < 3; i++ { - select { - case block := <-blocks: - if (block.ParentHash() != common.Hash{byte(i)}) { - t.Errorf("incorrect block %v, expected %v", block.ParentHash(), common.Hash{byte(i)}) + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checktxs := func(p *testPeer) { + defer wg.Done() + defer p.close() + seen := make(map[common.Hash]bool) + for _, tx := range alltxs { + seen[tx.Hash()] = false + } + for n := 0; n < len(alltxs) && !t.Failed(); { + var txs []*types.Transaction + msg, err := p.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != TxMsg { + t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&txs); err != nil { + t.Errorf("%v: %v", p.Peer, err) } - if block.Difficulty().Cmp(big.NewInt(i)) != 0 { - t.Errorf("incorrect block %v, expected %v", block.Difficulty(), big.NewInt(i)) + for _, tx := range txs { + hash := tx.Hash() + seentx, want := seen[hash] + if seentx { + t.Errorf("%v: got tx more than once: %x", p.Peer, hash) + } + if !want { + t.Errorf("%v: got unexpected tx: %x", p.Peer, hash) + } + seen[hash] = true + n++ } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return } } - - go p2p.Send(eth, BlocksMsg, []interface{}{[]interface{}{}}) - eth.checkError(ErrDecode, delay) - if !disconnected { - t.Errorf("peer not disconnected after error") - } - - // test empty transaction - eth.reset() - go eth.run() - eth.handshake(t, true) - err = p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) + for i := 0; i < 3; i++ { + p, _ := newTestPeer(pm) + p.handshake(t) + wg.Add(1) + go checktxs(p) } - b = newblock(0) - b.AddTransaction(nil) - go p2p.Send(eth, BlocksMsg, types.Blocks{b}) - eth.checkError(ErrDecode, delay) + wg.Wait() +} +// testPeer wraps all peer-related data for tests. +type testPeer struct { + p2p.MsgReadWriter // writing to the test peer feeds the protocol + pipe *p2p.MsgPipeRW // the protocol read/writes on this end + pm *ProtocolManager + *peer } -func TestTransactionsMsg(t *testing.T) { - logInit() - eth := newEth(t) - txs := make(chan *types.Transaction) +func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *ProtocolManager { + var ( + em = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em) + txpool = &fakeTxPool{added: txAdded} + dl = downloader.New(em, chain.HasBlock, chain.GetBlock) + pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain, dl) + ) + pm.Start() + return pm +} - eth.txPool.addTransactions = func(t []*types.Transaction) { - for _, tx := range t { - txs <- tx - } +func newTestPeer(pm *ProtocolManager) (*testPeer, <-chan error) { + var id discover.NodeID + rand.Read(id[:]) + rw1, rw2 := p2p.MsgPipe() + peer := pm.newPeer(pm.protVer, pm.netId, p2p.NewPeer(id, "test peer", nil), rw2) + errc := make(chan error, 1) + go func() { + pm.newPeerCh <- peer + errc <- pm.handle(peer) + }() + return &testPeer{rw1, rw2, pm, peer}, errc +} + +func (p *testPeer) handshake(t *testing.T) { + td, currentBlock, genesis := p.pm.chainman.Status() + msg := &statusMsgData{ + ProtocolVersion: uint32(p.pm.protVer), + NetworkId: uint32(p.pm.netId), + TD: td, + CurrentBlock: currentBlock, + GenesisBlock: genesis, } - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) + if err := p2p.ExpectMsg(p, StatusMsg, msg); err != nil { + t.Fatalf("status recv: %v", err) } + if err := p2p.Send(p, StatusMsg, msg); err != nil { + t.Fatalf("status send: %v", err) + } +} - var delay = 3 * time.Second - tx := &types.Transaction{} +func (p *testPeer) close() { + p.pipe.Close() +} - go p2p.Send(eth, TxMsg, []interface{}{tx, tx}) - timer := time.After(delay) - for i := int64(0); i < 2; i++ { - select { - case <-txs: - case <-timer: - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return - } +type fakeTxPool struct { + // all transactions are collected. + mu sync.Mutex + all []*types.Transaction + // if added is non-nil, it receives added transactions. + added chan<- []*types.Transaction +} + +func (pool *fakeTxPool) AddTransactions(txs []*types.Transaction) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.all = append(pool.all, txs...) + if pool.added != nil { + pool.added <- txs } +} - go p2p.Send(eth, TxMsg, []interface{}{[]interface{}{}}) - eth.checkError(ErrDecode, delay) +func (pool *fakeTxPool) GetTransactions() types.Transactions { + pool.mu.Lock() + defer pool.mu.Unlock() + txs := make([]*types.Transaction, len(pool.all)) + copy(txs, pool.all) + return types.Transactions(txs) +} +func newtx(from *crypto.Key, nonce uint64, datasize int) *types.Transaction { + data := make([]byte, datasize) + tx := types.NewTransactionMessage(common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), data) + tx.SetNonce(nonce) + return tx } -*/ From 3239aca69bd940626f3e27ab829fc7fa1c7cdc5e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:10:40 +0200 Subject: [PATCH 5/7] p2p: bump global write timeout to 20s The previous value of 5 seconds causes timeouts for legitimate messages if large messages are sent. --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/server.go b/p2p/server.go index 589041810..59b97a0aa 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -30,7 +30,7 @@ const ( frameReadTimeout = 30 * time.Second // Maximum amount of time allowed for writing a complete message. - frameWriteTimeout = 5 * time.Second + frameWriteTimeout = 20 * time.Second ) var errServerStopped = errors.New("server stopped") From 8dc3048f6556e3fb2f719f383834332656b4c8fe Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 13:13:21 +0200 Subject: [PATCH 6/7] eth/downloader: fix hash fetch timeout handling Fixes #1206 --- eth/downloader/downloader.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 92cb1a650..29b627771 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -263,23 +263,29 @@ func (d *Downloader) Cancel() bool { // XXX Make synchronous func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { - glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) - - start := time.Now() - - // Add the hash to the queue first, and start hash retrieval - d.queue.Insert([]common.Hash{h}) - p.getHashes(h) - var ( + start = time.Now() active = p // active peer will help determine the current active peer head = common.Hash{} // common and last hash - timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer + timeout = time.NewTimer(0) // timer to dump a non-responsive active peer attempted = make(map[string]bool) // attempted peers will help with retries crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) defer crossTicker.Stop() + defer timeout.Stop() + + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) + <-timeout.C // timeout channel should be initially empty. + + getHashes := func(from common.Hash) { + active.getHashes(from) + timeout.Reset(hashTTL) + } + + // Add the hash to the queue, and start hash retrieval. + d.queue.Insert([]common.Hash{h}) + getHashes(h) attempted[p.id] = true for finished := false; !finished; { @@ -293,7 +299,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } - timeout.Reset(hashTTL) + timeout.Stop() // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { @@ -345,7 +351,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { active.getBlocks([]common.Hash{origin}) // Also fetch a fresh - active.getHashes(head) + getHashes(head) continue } // We're done, prepare the download cache and proceed pulling the blocks @@ -399,7 +405,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. active = p - p.getHashes(head) + getHashes(head) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } From 73c355591fe0279334675c555b6d614aa25b6781 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 17:03:07 +0200 Subject: [PATCH 7/7] core, eth: document that result of GetTransactions is modifiable --- core/transaction_pool.go | 1 + eth/protocol.go | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 77744f8f7..918e7b957 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -247,6 +247,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { } // GetTransactions returns all currently processable transactions. +// The returned slice may be modified by the caller. func (self *TxPool) GetTransactions() (txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() diff --git a/eth/protocol.go b/eth/protocol.go index 9ccf2cb60..57805d9bd 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -57,10 +57,12 @@ var errorToString = map[int]string{ ErrSuspendedPeer: "Suspended peer", } -// backend is the interface the ethereum protocol backend should implement -// used as an argument to EthProtocol type txPool interface { + // AddTransactions should add the given transactions to the pool. AddTransactions([]*types.Transaction) + + // GetTransactions should return pending transactions. + // The slice should be modifiable by the caller. GetTransactions() types.Transactions }