Fixed pending states

release/1.0.1
obscuren 10 years ago
parent 9c55576c7b
commit 7b6a8cc9ae
  1. 12
      miner/miner.go
  2. 72
      miner/worker.go
  3. 28
      xeth/xeth.go

@ -6,6 +6,8 @@ import (
"github.com/ethereum/ethash" "github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
) )
@ -26,6 +28,7 @@ func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner {
for i := 0; i < minerThreads; i++ { for i := 0; i < minerThreads; i++ {
miner.worker.register(NewCpuMiner(i, pow)) miner.worker.register(NewCpuMiner(i, pow))
} }
return miner return miner
} }
@ -40,6 +43,7 @@ func (self *Miner) Start(coinbase common.Address) {
self.pow.(*ethash.Ethash).UpdateDAG() self.pow.(*ethash.Ethash).UpdateDAG()
self.worker.start() self.worker.start()
self.worker.commitNewWork() self.worker.commitNewWork()
} }
@ -61,3 +65,11 @@ func (self *Miner) HashRate() int64 {
func (self *Miner) SetExtra(extra []byte) { func (self *Miner) SetExtra(extra []byte) {
self.worker.extra = extra self.worker.extra = extra
} }
func (self *Miner) PendingState() *state.StateDB {
return self.worker.pendingState()
}
func (self *Miner) PendingBlock() *types.Block {
return self.worker.pendingBlock()
}

@ -76,16 +76,20 @@ type worker struct {
coinbase common.Address coinbase common.Address
extra []byte extra []byte
current *environment currentMu sync.Mutex
current *environment
uncleMu sync.Mutex uncleMu sync.Mutex
possibleUncles map[common.Hash]*types.Block possibleUncles map[common.Hash]*types.Block
mining bool txQueueMu sync.Mutex
txQueue map[common.Hash]*types.Transaction
mining int64
} }
func newWorker(coinbase common.Address, eth core.Backend) *worker { func newWorker(coinbase common.Address, eth core.Backend) *worker {
return &worker{ worker := &worker{
eth: eth, eth: eth,
mux: eth.EventMux(), mux: eth.EventMux(),
recv: make(chan *types.Block), recv: make(chan *types.Block),
@ -93,28 +97,45 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
proc: eth.BlockProcessor(), proc: eth.BlockProcessor(),
possibleUncles: make(map[common.Hash]*types.Block), possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase, coinbase: coinbase,
txQueue: make(map[common.Hash]*types.Transaction),
} }
go worker.update()
go worker.wait()
worker.quit = make(chan struct{})
worker.commitNewWork()
return worker
} }
func (self *worker) start() { func (self *worker) pendingState() *state.StateDB {
self.mining = true self.currentMu.Lock()
defer self.currentMu.Unlock()
return self.current.state
}
self.quit = make(chan struct{}) func (self *worker) pendingBlock() *types.Block {
self.currentMu.Lock()
defer self.currentMu.Unlock()
return self.current.block
}
func (self *worker) start() {
// spin up agents // spin up agents
for _, agent := range self.agents { for _, agent := range self.agents {
agent.Start() agent.Start()
} }
go self.update() atomic.StoreInt64(&self.mining, 1)
go self.wait()
} }
func (self *worker) stop() { func (self *worker) stop() {
self.mining = false atomic.StoreInt64(&self.mining, 0)
atomic.StoreInt64(&self.atWork, 0)
close(self.quit) atomic.StoreInt64(&self.atWork, 0)
} }
func (self *worker) register(agent Agent) { func (self *worker) register(agent Agent) {
@ -123,7 +144,7 @@ func (self *worker) register(agent Agent) {
} }
func (self *worker) update() { func (self *worker) update() {
events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}) events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
timer := time.NewTicker(2 * time.Second) timer := time.NewTicker(2 * time.Second)
@ -138,6 +159,10 @@ out:
self.uncleMu.Lock() self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock() self.uncleMu.Unlock()
case core.TxPreEvent:
if atomic.LoadInt64(&self.mining) == 0 {
self.commitNewWork()
}
} }
case <-self.quit: case <-self.quit:
@ -152,7 +177,7 @@ out:
} }
// XXX In case all mined a possible uncle // XXX In case all mined a possible uncle
if atomic.LoadInt64(&self.atWork) == 0 { if atomic.LoadInt64(&self.atWork) == 0 && atomic.LoadInt64(&self.mining) == 1 {
self.commitNewWork() self.commitNewWork()
} }
} }
@ -192,7 +217,7 @@ func (self *worker) wait() {
} }
func (self *worker) push() { func (self *worker) push() {
if self.mining { if atomic.LoadInt64(&self.mining) == 1 {
self.current.block.Header().GasUsed = self.current.totalUsedGas self.current.block.Header().GasUsed = self.current.totalUsedGas
self.current.block.SetRoot(self.current.state.Root()) self.current.block.SetRoot(self.current.state.Root())
@ -205,12 +230,7 @@ func (self *worker) push() {
} }
} }
func (self *worker) commitNewWork() { func (self *worker) makeCurrent() {
self.mu.Lock()
defer self.mu.Unlock()
self.uncleMu.Lock()
defer self.uncleMu.Unlock()
block := self.chain.NewBlock(self.coinbase) block := self.chain.NewBlock(self.coinbase)
if block.Time() == self.chain.CurrentBlock().Time() { if block.Time() == self.chain.CurrentBlock().Time() {
block.Header().Time++ block.Header().Time++
@ -224,6 +244,17 @@ func (self *worker) commitNewWork() {
parent := self.chain.GetBlock(self.current.block.ParentHash()) parent := self.chain.GetBlock(self.current.block.ParentHash())
self.current.coinbase.SetGasPool(core.CalcGasLimit(parent, self.current.block)) self.current.coinbase.SetGasPool(core.CalcGasLimit(parent, self.current.block))
}
func (self *worker) commitNewWork() {
self.mu.Lock()
defer self.mu.Unlock()
self.uncleMu.Lock()
defer self.uncleMu.Unlock()
self.currentMu.Lock()
defer self.currentMu.Unlock()
self.makeCurrent()
transactions := self.eth.TxPool().GetTransactions() transactions := self.eth.TxPool().GetTransactions()
sort.Sort(types.TxByNonce{transactions}) sort.Sort(types.TxByNonce{transactions})
@ -287,6 +318,7 @@ gasLimit:
core.AccumulateRewards(self.current.state, self.current.block) core.AccumulateRewards(self.current.state, self.current.block)
self.current.state.Update() self.current.state.Update()
self.push() self.push()
} }

@ -136,13 +136,16 @@ func cTopics(t [][]string) [][]common.Hash {
func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent } func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent }
func (self *XEth) AtStateNum(num int64) *XEth { func (self *XEth) AtStateNum(num int64) *XEth {
block := self.getBlockByHeight(num)
var st *state.StateDB var st *state.StateDB
if block != nil { switch num {
st = state.New(block.Root(), self.backend.StateDb()) case -2:
} else { st = self.backend.Miner().PendingState().Copy()
st = self.backend.ChainManager().State() default:
if block := self.getBlockByHeight(num); block != nil {
st = state.New(block.Root(), self.backend.StateDb())
} else {
st = state.New(self.backend.ChainManager().GetBlockByNumber(0).Root(), self.backend.StateDb())
}
} }
return self.withState(st) return self.withState(st)
@ -164,9 +167,16 @@ func (self *XEth) Whisper() *Whisper { return self.whisper }
func (self *XEth) getBlockByHeight(height int64) *types.Block { func (self *XEth) getBlockByHeight(height int64) *types.Block {
var num uint64 var num uint64
if height < 0 { switch height {
num = self.CurrentBlock().NumberU64() + uint64(-1*height) case -2:
} else { return self.backend.Miner().PendingBlock()
case -1:
return self.CurrentBlock()
default:
if height < 0 {
return nil
}
num = uint64(height) num = uint64(height)
} }

Loading…
Cancel
Save