@ -7,6 +7,7 @@ import (
"sync"
"sync"
"sync/atomic"
"sync/atomic"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state"
@ -27,6 +28,12 @@ type environment struct {
block * types . Block
block * types . Block
family * set . Set
family * set . Set
uncles * set . Set
uncles * set . Set
remove * set . Set
tcount int
ignoredTransactors * set . Set
lowGasTransactors * set . Set
ownedAccounts * set . Set
lowGasTxs types . Transactions
}
}
func env ( block * types . Block , eth core . Backend ) * environment {
func env ( block * types . Block , eth core . Backend ) * environment {
@ -72,6 +79,7 @@ type worker struct {
proc * core . BlockProcessor
proc * core . BlockProcessor
coinbase common . Address
coinbase common . Address
gasPrice * big . Int
extra [ ] byte
extra [ ] byte
currentMu sync . Mutex
currentMu sync . Mutex
@ -93,6 +101,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
eth : eth ,
eth : eth ,
mux : eth . EventMux ( ) ,
mux : eth . EventMux ( ) ,
recv : make ( chan * types . Block ) ,
recv : make ( chan * types . Block ) ,
gasPrice : new ( big . Int ) ,
chain : eth . ChainManager ( ) ,
chain : eth . ChainManager ( ) ,
proc : eth . BlockProcessor ( ) ,
proc : eth . BlockProcessor ( ) ,
possibleUncles : make ( map [ common . Hash ] * types . Block ) ,
possibleUncles : make ( map [ common . Hash ] * types . Block ) ,
@ -123,15 +132,22 @@ func (self *worker) pendingBlock() *types.Block {
}
}
func ( self * worker ) start ( ) {
func ( self * worker ) start ( ) {
self . mu . Lock ( )
defer self . mu . Unlock ( )
atomic . StoreInt32 ( & self . mining , 1 )
// spin up agents
// spin up agents
for _ , agent := range self . agents {
for _ , agent := range self . agents {
agent . Start ( )
agent . Start ( )
}
}
atomic . StoreInt32 ( & self . mining , 1 )
}
}
func ( self * worker ) stop ( ) {
func ( self * worker ) stop ( ) {
self . mu . Lock ( )
defer self . mu . Unlock ( )
if atomic . LoadInt32 ( & self . mining ) == 1 {
if atomic . LoadInt32 ( & self . mining ) == 1 {
// stop all agents
// stop all agents
for _ , agent := range self . agents {
for _ , agent := range self . agents {
@ -144,6 +160,9 @@ func (self *worker) stop() {
}
}
func ( self * worker ) register ( agent Agent ) {
func ( self * worker ) register ( agent Agent ) {
self . mu . Lock ( )
defer self . mu . Unlock ( )
self . agents = append ( self . agents , agent )
self . agents = append ( self . agents , agent )
agent . SetReturnCh ( self . recv )
agent . SetReturnCh ( self . recv )
}
}
@ -163,8 +182,11 @@ out:
self . possibleUncles [ ev . Block . Hash ( ) ] = ev . Block
self . possibleUncles [ ev . Block . Hash ( ) ] = ev . Block
self . uncleMu . Unlock ( )
self . uncleMu . Unlock ( )
case core . TxPreEvent :
case core . TxPreEvent :
// Apply transaction to the pending state if we're not mining
if atomic . LoadInt32 ( & self . mining ) == 0 {
if atomic . LoadInt32 ( & self . mining ) == 0 {
self . commitNewWork ( )
self . mu . Lock ( )
self . commitTransactions ( types . Transactions { ev . Tx } )
self . mu . Unlock ( )
}
}
}
}
case <- self . quit :
case <- self . quit :
@ -230,13 +252,33 @@ func (self *worker) makeCurrent() {
}
}
block . Header ( ) . Extra = self . extra
block . Header ( ) . Extra = self . extra
self . current = env ( block , self . eth )
current : = env ( block , self . eth )
for _ , ancestor := range self . chain . GetAncestors ( block , 7 ) {
for _ , ancestor := range self . chain . GetAncestors ( block , 7 ) {
self . current . family . Add ( ancestor . Hash ( ) )
current . family . Add ( ancestor . Hash ( ) )
}
}
accounts , _ := self . eth . AccountManager ( ) . Accounts ( )
// Keep track of transactions which return errors so they can be removed
current . remove = set . New ( )
current . tcount = 0
current . ignoredTransactors = set . New ( )
current . lowGasTransactors = set . New ( )
current . ownedAccounts = accountAddressesSet ( accounts )
parent := self . chain . GetBlock ( current . block . ParentHash ( ) )
current . coinbase . SetGasPool ( core . CalcGasLimit ( parent ) )
parent := self . chain . GetBlock ( self . current . block . ParentHash ( ) )
self . current = current
self . current . coinbase . SetGasPool ( core . CalcGasLimit ( parent ) )
}
func ( w * worker ) setGasPrice ( p * big . Int ) {
w . mu . Lock ( )
defer w . mu . Unlock ( )
// calculate the minimal gas price the miner accepts when sorting out transactions.
const pct = int64 ( 90 )
w . gasPrice = gasprice ( p , pct )
w . mux . Post ( core . GasPriceChanged { w . gasPrice } )
}
}
func ( self * worker ) commitNewWork ( ) {
func ( self * worker ) commitNewWork ( ) {
@ -248,54 +290,14 @@ func (self *worker) commitNewWork() {
defer self . currentMu . Unlock ( )
defer self . currentMu . Unlock ( )
self . makeCurrent ( )
self . makeCurrent ( )
current := self . current
transactions := self . eth . TxPool ( ) . GetTransactions ( )
transactions := self . eth . TxPool ( ) . GetTransactions ( )
sort . Sort ( types . TxByNonce { transactions } )
sort . Sort ( types . TxByNonce { transactions } )
// Keep track of transactions which return errors so they can be removed
// commit transactions for this run
var (
self . commitTransactions ( transactions )
remove = set . New ( )
self . eth . TxPool ( ) . RemoveTransactions ( current . lowGasTxs )
tcount = 0
ignoredTransactors = set . New ( )
)
for _ , tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from , _ := tx . From ( )
// Move on to the next transaction when the transactor is in ignored transactions set
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
// the transaction is processed (that could potentially be included in the block) it
// will throw a nonce error because the previous transaction hasn't been processed.
// Therefor we need to ignore any transaction after the ignored one.
if ignoredTransactors . Has ( from ) {
continue
}
self . current . state . StartRecord ( tx . Hash ( ) , common . Hash { } , 0 )
err := self . commitTransaction ( tx )
switch {
case core . IsNonceErr ( err ) || core . IsInvalidTxErr ( err ) :
// Remove invalid transactions
from , _ := tx . From ( )
self . chain . TxState ( ) . RemoveNonce ( from , tx . Nonce ( ) )
remove . Add ( tx . Hash ( ) )
if glog . V ( logger . Detail ) {
glog . Infof ( "TX (%x) failed, will be removed: %v\n" , tx . Hash ( ) . Bytes ( ) [ : 4 ] , err )
}
case state . IsGasLimitErr ( err ) :
from , _ := tx . From ( )
// ignore the transactor so no nonce errors will be thrown for this account
// next time the worker is run, they'll be picked up again.
ignoredTransactors . Add ( from )
glog . V ( logger . Detail ) . Infof ( "Gas limit reached for (%x) in this block. Continue to try smaller txs\n" , from [ : 4 ] )
default :
tcount ++
}
}
var (
var (
uncles [ ] * types . Header
uncles [ ] * types . Header
@ -321,7 +323,7 @@ func (self *worker) commitNewWork() {
// We only care about logging if we're actually mining
// We only care about logging if we're actually mining
if atomic . LoadInt32 ( & self . mining ) == 1 {
if atomic . LoadInt32 ( & self . mining ) == 1 {
glog . V ( logger . Info ) . Infof ( "commit new work on block %v with %d txs & %d uncles\n" , self . current . block . Number ( ) , tcount , len ( uncles ) )
glog . V ( logger . Info ) . Infof ( "commit new work on block %v with %d txs & %d uncles\n" , current . block . Number ( ) , current . tcount , len ( uncles ) )
}
}
for _ , hash := range badUncles {
for _ , hash := range badUncles {
@ -361,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error {
return nil
return nil
}
}
func ( self * worker ) commitTransactions ( transactions types . Transactions ) {
current := self . current
for _ , tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from , _ := tx . From ( )
// check if it falls within margin
if tx . GasPrice ( ) . Cmp ( self . gasPrice ) < 0 {
// ignore the transaction and transactor. We ignore the transactor
// because nonce will fail after ignoring this transaction so there's
// no point
current . lowGasTransactors . Add ( from )
glog . V ( logger . Info ) . Infof ( "transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n" , tx . Hash ( ) . Bytes ( ) [ : 4 ] , common . CurrencyToString ( tx . GasPrice ( ) ) , common . CurrencyToString ( self . gasPrice ) , from [ : 4 ] )
}
// Continue with the next transaction if the transaction sender is included in
// the low gas tx set. This will also remove the tx and all sequential transaction
// from this transactor
if current . lowGasTransactors . Has ( from ) {
// add tx to the low gas set. This will be removed at the end of the run
// owned accounts are ignored
if ! current . ownedAccounts . Has ( from ) {
current . lowGasTxs = append ( current . lowGasTxs , tx )
}
continue
}
// Move on to the next transaction when the transactor is in ignored transactions set
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
// the transaction is processed (that could potentially be included in the block) it
// will throw a nonce error because the previous transaction hasn't been processed.
// Therefor we need to ignore any transaction after the ignored one.
if current . ignoredTransactors . Has ( from ) {
continue
}
self . current . state . StartRecord ( tx . Hash ( ) , common . Hash { } , 0 )
err := self . commitTransaction ( tx )
switch {
case core . IsNonceErr ( err ) || core . IsInvalidTxErr ( err ) :
// Remove invalid transactions
from , _ := tx . From ( )
self . chain . TxState ( ) . RemoveNonce ( from , tx . Nonce ( ) )
current . remove . Add ( tx . Hash ( ) )
if glog . V ( logger . Detail ) {
glog . Infof ( "TX (%x) failed, will be removed: %v\n" , tx . Hash ( ) . Bytes ( ) [ : 4 ] , err )
}
case state . IsGasLimitErr ( err ) :
from , _ := tx . From ( )
// ignore the transactor so no nonce errors will be thrown for this account
// next time the worker is run, they'll be picked up again.
current . ignoredTransactors . Add ( from )
glog . V ( logger . Detail ) . Infof ( "Gas limit reached for (%x) in this block. Continue to try smaller txs\n" , from [ : 4 ] )
default :
current . tcount ++
}
}
}
func ( self * worker ) commitTransaction ( tx * types . Transaction ) error {
func ( self * worker ) commitTransaction ( tx * types . Transaction ) error {
snap := self . current . state . Copy ( )
snap := self . current . state . Copy ( )
receipt , _ , err := self . proc . ApplyTransaction ( self . current . coinbase , self . current . state , self . current . block , tx , self . current . totalUsedGas , true )
receipt , _ , err := self . proc . ApplyTransaction ( self . current . coinbase , self . current . state , self . current . block , tx , self . current . totalUsedGas , true )
@ -383,3 +450,20 @@ func (self *worker) HashRate() int64 {
return tot
return tot
}
}
// gasprice calculates a reduced gas price based on the pct
// XXX Use big.Rat?
func gasprice ( price * big . Int , pct int64 ) * big . Int {
p := new ( big . Int ) . Set ( price )
p . Div ( p , big . NewInt ( 100 ) )
p . Mul ( p , big . NewInt ( pct ) )
return p
}
func accountAddressesSet ( accounts [ ] accounts . Account ) * set . Set {
accountSet := set . New ( )
for _ , account := range accounts {
accountSet . Add ( common . BytesToAddress ( account . Address ) )
}
return accountSet
}