From ea0357bf02b61db94bd0ad8806ba7337a55a4f79 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 28 Sep 2014 14:52:58 +0200 Subject: [PATCH] Block pool is thread safe --- block_pool.go | 101 ++++++++++++++++++++++---------------- ethchain/state_manager.go | 10 ++-- peer.go | 11 ++--- 3 files changed, 69 insertions(+), 53 deletions(-) diff --git a/block_pool.go b/block_pool.go index 26ef51992e..4ac096bdae 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,7 +1,6 @@ package eth import ( - "bytes" "container/list" "math" "math/big" @@ -35,6 +34,9 @@ type BlockPool struct { td *big.Int quit chan bool + fetchingHashes bool + downloadStartedAt time.Time + ChainLength, BlocksProcessed int } @@ -52,6 +54,9 @@ func (self *BlockPool) Len() int { } func (self *BlockPool) HasLatestHash() bool { + self.mut.Lock() + defer self.mut.Unlock() + return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil } @@ -59,7 +64,20 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.BlockChain().GetBlock(hash) != nil } +func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { + for _, item := range self.pool { + if item.block != nil { + blocks = append(blocks, item.block) + } + } + + return +} + func (self *BlockPool) AddHash(hash []byte, peer *Peer) { + self.mut.Lock() + defer self.mut.Unlock() + if self.pool[string(hash)] == nil { self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} @@ -67,7 +85,10 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { } } -func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { +func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { + self.mut.Lock() + defer self.mut.Unlock() + hash := string(b.Hash()) if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { @@ -76,7 +97,7 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - if !self.eth.BlockChain().HasBlock(b.PrevHash) { + if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) } @@ -87,36 +108,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { self.BlocksProcessed++ } -func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { - for _, item := range self.pool { - if item.block != nil { - if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 { - return item.block - } - } - } - - return nil -} - -func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks { - var blocks ethchain.Blocks - - for b := block; b != nil; b = self.getParent(b) { - blocks = append(ethchain.Blocks{b}, blocks...) - } - - return blocks -} - -func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { - for _, item := range self.pool { - if item.block != nil { - blocks = append(blocks, item.block) - } - } +func (self *BlockPool) Remove(hash []byte) { + self.mut.Lock() + defer self.mut.Unlock() - return + self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + delete(self.pool, string(hash)) } func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) { @@ -129,9 +126,7 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou f(block) - hash := block.Hash() - self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) - delete(self.pool, string(hash)) + self.Remove(block.Hash()) } } @@ -140,9 +135,12 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou } func (self *BlockPool) DistributeHashes() { + self.mut.Lock() + defer self.mut.Unlock() + var ( peerLen = self.eth.peers.Len() - amount = 200 * peerLen + amount = 256 * peerLen dist = make(map[*Peer][][]byte) ) @@ -156,7 +154,7 @@ func (self *BlockPool) DistributeHashes() { lastFetchFailed := time.Since(item.reqAt) > 5*time.Second // Handle failed requests - if lastFetchFailed && item.requested > 0 && item.peer != nil { + if lastFetchFailed && item.requested > 5 && item.peer != nil { if item.requested < 100 { // Select peer the hash was retrieved off peer = item.from @@ -187,19 +185,23 @@ func (self *BlockPool) DistributeHashes() { for peer, hashes := range dist { peer.FetchBlocks(hashes) } + + if len(dist) > 0 { + self.downloadStartedAt = time.Now() + } } func (self *BlockPool) Start() { - go self.update() + go self.downloadThread() + go self.chainThread() } func (self *BlockPool) Stop() { close(self.quit) } -func (self *BlockPool) update() { +func (self *BlockPool) downloadThread() { serviceTimer := time.NewTicker(100 * time.Millisecond) - procTimer := time.NewTicker(500 * time.Millisecond) out: for { select { @@ -208,20 +210,31 @@ out: case <-serviceTimer.C: // Check if we're catching up. If not distribute the hashes to // the peers and download the blockchain - done := true + self.fetchingHashes = false eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { if p.statusKnown && p.FetchingHashes() { - done = false + self.fetchingHashes = true } }) - if done && len(self.hashPool) > 0 { + if !self.fetchingHashes && len(self.hashPool) > 0 { self.DistributeHashes() } if self.ChainLength < len(self.hashPool) { self.ChainLength = len(self.hashPool) } + } + } +} + +func (self *BlockPool) chainThread() { + procTimer := time.NewTicker(500 * time.Millisecond) +out: + for { + select { + case <-self.quit: + break out case <-procTimer.C: // XXX We can optimize this lifting this on to a new goroutine. // We'd need to make sure that the pools are properly protected by a mutex @@ -230,6 +243,8 @@ out: err := self.eth.StateManager().Process(block, false) if err != nil { poollogger.Infoln(err) + poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + poollogger.Debugln(block) } }) diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 6bcbe063e8..cd2d57af9f 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -143,9 +143,6 @@ done: } } - // Notify all subscribers - self.Ethereum.Reactor().Post("newTx:post", tx) - // Update the state with pending changes state.Update() @@ -160,10 +157,15 @@ done: os.Exit(1) } - return nil, nil, nil, fmt.Errorf("err diff #%d (r) %v ~ %x <=> (c) %v ~ %x (%x)\n", i+1, original.CumulativeGasUsed, original.PostState[0:4], receipt.CumulativeGasUsed, receipt.PostState[0:4], receipt.Tx.Hash()) + err := fmt.Errorf("#%d receipt failed (r) %v ~ %x <=> (c) %v ~ %x (%x...)", i+1, original.CumulativeGasUsed, original.PostState[0:4], receipt.CumulativeGasUsed, receipt.PostState[0:4], receipt.Tx.Hash()[0:4]) + + return nil, nil, nil, err } } + // Notify all subscribers + self.Ethereum.Reactor().Post("newTx:post", tx) + receipts = append(receipts, receipt) handled = append(handled, tx) diff --git a/peer.go b/peer.go index e3c4c9e107..11ec6e0036 100644 --- a/peer.go +++ b/peer.go @@ -503,16 +503,15 @@ func (p *Peer) HandleInbound() { it := msg.Data.NewIterator() for it.Next() { hash := it.Value().Bytes() - - p.lastReceivedHash = hash - p.LastHashReceived = time.Now() - if blockPool.HasCommonHash(hash) { foundCommonHash = true break } + p.lastReceivedHash = hash + p.LastHashReceived = time.Now() + blockPool.AddHash(hash, p) } @@ -530,7 +529,7 @@ func (p *Peer) HandleInbound() { block := ethchain.NewBlockFromRlpValue(it.Value()) //fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4]) - blockPool.SetBlock(block, p) + blockPool.Add(block, p) p.lastBlockReceived = time.Now() } @@ -561,7 +560,7 @@ func (self *Peer) FetchHashes() { } func (self *Peer) FetchingHashes() bool { - return time.Since(self.LastHashReceived) < 5*time.Second + return time.Since(self.LastHashReceived) < 200*time.Millisecond } // General update method