From 735b029db95bf72c3105674c0f2b4f111e5ccdf5 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 14:00:24 +0200 Subject: [PATCH 01/10] core: return the index of the block that failed when inserting a chain --- cmd/utils/cmd.go | 4 ++-- core/chain_makers.go | 2 +- core/chain_manager.go | 8 +++++--- eth/downloader/downloader.go | 9 ++++----- eth/handler.go | 2 +- miner/worker.go | 2 +- tests/block_test_util.go | 2 +- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index 64faf6ad1b..cbb2d42aaa 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -172,7 +172,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error { n++ if n == batchSize { - if err := chainmgr.InsertChain(blocks); err != nil { + if _, err := chainmgr.InsertChain(blocks); err != nil { return fmt.Errorf("invalid block %v", err) } n = 0 @@ -181,7 +181,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error { } if n > 0 { - if err := chainmgr.InsertChain(blocks[:n]); err != nil { + if _, err := chainmgr.InsertChain(blocks[:n]); err != nil { return fmt.Errorf("invalid block %v", err) } } diff --git a/core/chain_makers.go b/core/chain_makers.go index 4512a5493a..73c2205f44 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -141,6 +141,6 @@ func newCanonical(n int, db common.Database) (*BlockProcessor, error) { return bman, nil } lchain := makeChain(bman, parent, n, db, CanonicalSeed) - err := bman.bc.InsertChain(lchain) + _, err := bman.bc.InsertChain(lchain) return bman, err } diff --git a/core/chain_manager.go b/core/chain_manager.go index 32ad4a2bab..253228bfd8 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -497,7 +497,9 @@ func (self *ChainManager) procFutureBlocks() { self.InsertChain(blocks) } -func (self *ChainManager) InsertChain(chain types.Blocks) error { +// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned +// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). +func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. var ( queue = make([]interface{}, len(chain)) @@ -540,7 +542,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { glog.V(logger.Error).Infoln(err) glog.V(logger.Debug).Infoln(block) - return err + return i, err } block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash()))) @@ -613,7 +615,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { go self.eventMux.Post(queueEvent) - return nil + return 0, nil } // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a3917854f2..a7e83a532c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -37,7 +37,7 @@ var ( ) type hashCheckFn func(common.Hash) bool -type chainInsertFn func(types.Blocks) error +type chainInsertFn func(types.Blocks) (int, error) type hashIterFn func() (common.Hash, error) type blockPack struct { @@ -432,12 +432,11 @@ func (d *Downloader) process(peer *peer) error { // TODO check for parent error. When there's a parent error we should stop // processing and start requesting the `block.hash` so that it's parent and // grandparents can be requested and queued. - err = d.insertChain(blocks[:max]) + var i int + i, err = d.insertChain(blocks[:max]) if err != nil && core.IsParentErr(err) { - glog.V(logger.Debug).Infoln("Aborting process due to missing parent.") + glog.V(logger.Debug).Infof("Aborting process due to missing parent (%d)\n", i) - // XXX this needs a lot of attention - blocks = nil break } else if err != nil { // immediatly unregister the false peer but do not disconnect diff --git a/eth/handler.go b/eth/handler.go index 61149049ef..2dd4c74db8 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -376,7 +376,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // if the parent exists we process the block and propagate to our peers // if the parent does not exists we delegate to the downloader. if self.chainman.HasBlock(request.Block.ParentHash()) { - if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { + if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { // handle error return nil } diff --git a/miner/worker.go b/miner/worker.go index a38b8a5d47..87d17dfd69 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -184,7 +184,7 @@ func (self *worker) wait() { continue } - if err := self.chain.InsertChain(types.Blocks{block}); err == nil { + if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil { for _, uncle := range block.Uncles() { delete(self.possibleUncles, uncle.Hash()) } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 06f082ca36..093c9be0c7 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -162,7 +162,7 @@ func (t *BlockTest) TryBlocksInsert(chainManager *core.ChainManager) error { } } // RLP decoding worked, try to insert into chain: - err = chainManager.InsertChain(types.Blocks{cb}) + _, err = chainManager.InsertChain(types.Blocks{cb}) if err != nil { if b.BlockHeader == nil { continue // OK - block is supposed to be invalid, continue with next block From dfbf580354711af3b542b8c6f5608852d7b90ce3 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 14:49:37 +0200 Subject: [PATCH 02/10] eth/downloader: ignore orphan blocks in the downloader. When blocks have been sorted and are being processed, orphan blocks should be ignored and thrown out. The protocol handler is responsible for downloading blocks which have missing parents. --- eth/downloader/downloader.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a7e83a532c..81808b4f81 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -418,14 +418,16 @@ func (d *Downloader) process(peer *peer) error { // link). We should at least check whihc queue match. This code could move // to a seperate goroutine where it periodically checks for linked pieces. types.BlockBy(types.Number).Sort(d.queue.blocks) - blocks := d.queue.blocks - if len(blocks) == 0 { + if len(d.queue.blocks) == 0 { return nil } + var ( + blocks = d.queue.blocks + err error + ) glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) - var err error // Loop untill we're out of blocks for len(blocks) != 0 { max := int(math.Min(float64(len(blocks)), 256)) @@ -435,9 +437,11 @@ func (d *Downloader) process(peer *peer) error { var i int i, err = d.insertChain(blocks[:max]) if err != nil && core.IsParentErr(err) { - glog.V(logger.Debug).Infof("Aborting process due to missing parent (%d)\n", i) + // Ignore the missing blocks. Handler should take care of anything that's missing. + glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i) + blocks = blocks[i:] - break + continue } else if err != nil { // immediatly unregister the false peer but do not disconnect d.UnregisterPeer(d.activePeer) From c9300458344e9024b4d18171f87b7e0edb3b6859 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 15:09:37 +0200 Subject: [PATCH 03/10] core: fixed tetst to reflect (int, error) return by insertChain --- core/chain_manager_test.go | 4 ++-- eth/downloader/downloader_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index a88afd7c86..8d1b7865df 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -44,7 +44,7 @@ func testFork(t *testing.T, bman *BlockProcessor, i, N int, f func(td1, td2 *big // extend the fork parent := bman2.bc.CurrentBlock() chainB := makeChain(bman2, parent, N, db, ForkSeed) - err = bman2.bc.InsertChain(chainB) + _, err = bman2.bc.InsertChain(chainB) if err != nil { t.Fatal("Insert chain error for fork:", err) } @@ -108,7 +108,7 @@ func loadChain(fn string, t *testing.T) (types.Blocks, error) { } func insertChain(done chan bool, chainMan *ChainManager, chain types.Blocks, t *testing.T) { - err := chainMan.InsertChain(chain) + _, err := chainMan.InsertChain(chain) if err != nil { fmt.Println(err) t.FailNow() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8843ca0c7b..5518163caf 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -62,10 +62,10 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool { return false } -func (dl *downloadTester) insertChain(blocks types.Blocks) error { +func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { dl.insertedBlocks += len(blocks) - return nil + return 0, nil } func (dl *downloadTester) getHashes(hash common.Hash) error { From f8c27d7159c3f93f0ca05ab1df86cca98d86e52e Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 19:55:04 +0200 Subject: [PATCH 04/10] eth/downloader: drop block --- eth/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 81808b4f81..4cd927fd53 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -439,7 +439,7 @@ func (d *Downloader) process(peer *peer) error { if err != nil && core.IsParentErr(err) { // Ignore the missing blocks. Handler should take care of anything that's missing. glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i) - blocks = blocks[i:] + blocks = blocks[i+1:] continue } else if err != nil { From 9e63798d0362a27b3ef45345d93f4a01c3349516 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 19:55:30 +0200 Subject: [PATCH 05/10] core/types, eth: meassure and display propagation times --- core/types/block.go | 2 ++ eth/handler.go | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/types/block.go b/core/types/block.go index 19cf49c121..c93452fa7c 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -99,6 +99,8 @@ type Block struct { Td *big.Int queued bool // flag for blockpool to skip TD check + ReceivedAt time.Time + receipts Receipts } diff --git a/eth/handler.go b/eth/handler.go index 2dd4c74db8..f7610e9e3c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -346,6 +346,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } + request.Block.ReceivedAt = time.Now() + hash := request.Block.Hash() // Add the block hash as a known hash to the peer. This will later be used to determine // who should receive this. @@ -419,7 +421,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) for _, peer := range peers { peer.sendNewBlock(block) } - glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") + glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total propagation time:", time.Since(block.ReceivedAt)) } // BroadcastTx will propagate the block to its connected peers. It will sort From 01e3d694a68b55d44d92754db5e84eb2930988ab Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 22:49:58 +0200 Subject: [PATCH 06/10] p2p: added received at to peer message p2p.Msg.ReceivedAt can be used for determining block propagation from begining to end. --- p2p/message.go | 7 ++++--- p2p/peer.go | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/p2p/message.go b/p2p/message.go index be6405d6f0..5ab5ab73e0 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -22,9 +22,10 @@ import ( // structure, encode the payload into a byte array and create a // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { - Code uint64 - Size uint32 // size of the paylod - Payload io.Reader + Code uint64 + Size uint32 // size of the paylod + Payload io.Reader + ReceivedAt time.Time } // Decode parses the RLP content of a message into diff --git a/p2p/peer.go b/p2p/peer.go index 1262ba64a7..bc0e6eb5f1 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -177,6 +177,7 @@ func (p *Peer) readLoop(errc chan<- error) { errc <- err return } + msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return From 04c209980bdecb848ae6e397e808e62aecaece39 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Apr 2015 22:50:58 +0200 Subject: [PATCH 07/10] eth: rely on p2p to determine block propagation --- eth/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index f7610e9e3c..fecd71632b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -346,7 +346,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } - request.Block.ReceivedAt = time.Now() + request.Block.ReceivedAt = msg.ReceivedAt hash := request.Block.Hash() // Add the block hash as a known hash to the peer. This will later be used to determine From 2590a7dabbf6781734be0c388b46ecd53ece6155 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 30 Apr 2015 00:08:43 +0200 Subject: [PATCH 08/10] core: added some additional chain tests for shortest chain --- core/chain_manager.go | 2 +- core/chain_manager_test.go | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index 253228bfd8..4fdb2edceb 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -593,7 +593,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { } } else { if glog.V(logger.Detail) { - glog.Infof("inserted forked block #%d (%d TXs %d UNCs) (%x...)\n", block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) } queue[i] = ChainSideEvent{block, logs} diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 8d1b7865df..50915459b9 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -9,6 +9,7 @@ import ( "strconv" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -369,11 +370,8 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block return chain } -func TestReorg(t *testing.T) { - db, _ := ethdb.NewMemDatabase() +func chm(genesis *types.Block, db common.Database) *ChainManager { var eventMux event.TypeMux - - genesis := GenesisBlock(db) bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux} bc.cache = NewBlockCache(100) bc.futureBlocks = NewBlockCache(100) @@ -381,6 +379,14 @@ func TestReorg(t *testing.T) { bc.ResetWithGenesisBlock(genesis) bc.txState = state.ManageState(bc.State()) + return bc +} + +func TestReorgLongest(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + genesis := GenesisBlock(db) + bc := chm(genesis, db) + chain1 := makeChainWithDiff(genesis, []int{1, 2, 4}, 10) chain2 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11) @@ -394,3 +400,22 @@ func TestReorg(t *testing.T) { } } } + +func TestReorgShortest(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + genesis := GenesisBlock(db) + bc := chm(genesis, db) + + chain1 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 10) + chain2 := makeChainWithDiff(genesis, []int{1, 10}, 11) + + bc.InsertChain(chain1) + bc.InsertChain(chain2) + + prev := bc.CurrentBlock() + for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) { + if prev.ParentHash() != block.Hash() { + t.Errorf("parent hash mismatch %x - %x", prev.ParentHash(), block.Hash()) + } + } +} From 88292f35db656fe0677a3d7cf9be3a78f507699d Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 30 Apr 2015 00:20:59 +0200 Subject: [PATCH 09/10] core: remove txs from queue in addition to removal of pending --- core/transaction_pool.go | 23 ++++++++++++++++++++++- core/transaction_pool_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 8543aa0174..22a804e1dc 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -306,6 +306,27 @@ func (pool *TxPool) checkQueue() { } } +func (pool *TxPool) removeTx(hash common.Hash) { + // delete from pending pool + delete(pool.txs, hash) + + // delete from queue +out: + for address, txs := range pool.queue { + for i, tx := range txs { + if tx.Hash() == hash { + if len(txs) == 1 { + // if only one tx, remove entire address entry + delete(pool.queue, address) + } else { + pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) + } + break out + } + } + } +} + func (pool *TxPool) validatePool() { pool.mu.Lock() defer pool.mu.Unlock() @@ -316,7 +337,7 @@ func (pool *TxPool) validatePool() { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - delete(pool.txs, hash) + pool.removeTx(hash) } } } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 4d66776f01..49224be5bb 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -111,3 +111,30 @@ func TestTransactionQueue(t *testing.T) { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } + +func TestRemoveTx(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.queueTx(tx) + pool.addTx(tx) + if len(pool.queue) != 1 { + t.Error("expected queue to be 1, got", len(pool.queue)) + } + + if len(pool.txs) != 1 { + t.Error("expected txs to be 1, got", len(pool.txs)) + } + + pool.removeTx(tx.Hash()) + + if len(pool.queue) > 0 { + t.Error("expected queue to be 0, got", len(pool.queue)) + } + + if len(pool.txs) > 0 { + t.Error("expected txs to be 0, got", len(pool.txs)) + } +} From 30b921ef463247da63dece1cb81887f7e66668ff Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 30 Apr 2015 00:40:14 +0200 Subject: [PATCH 10/10] cmd/geth: bump version to 0.9.14 --- cmd/geth/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 365390a07c..ef007051c3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -47,7 +47,7 @@ import _ "net/http/pprof" const ( ClientIdentifier = "Geth" - Version = "0.9.13" + Version = "0.9.14" ) var (