From a2919b5e17197afcb689b8f4144f255a5872f85d Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 10 May 2015 23:12:18 +0200 Subject: [PATCH 1/4] core, eth, miner: improved tx removal & fatal error on db sync err * core: Added GasPriceChange event * eth: When one of the DB flush methods error a fatal error log message is given. Hopefully this will prevent corrupted databases from occuring. * miner: remove transactions with low gas price. Closes #906, #903 --- common/size.go | 6 ------ common/size_test.go | 14 -------------- core/events.go | 6 +++++- core/manager.go | 2 ++ eth/backend.go | 8 +++++--- miner/worker.go | 42 +++++++++++++++++++++++++++++++++++------- 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/common/size.go b/common/size.go index 0d9dbf5583..4ea7f7b11d 100644 --- a/common/size.go +++ b/common/size.go @@ -44,12 +44,6 @@ func CurrencyToString(num *big.Int) string { ) switch { - case num.Cmp(Douglas) >= 0: - fin = new(big.Int).Div(num, Douglas) - denom = "Douglas" - case num.Cmp(Einstein) >= 0: - fin = new(big.Int).Div(num, Einstein) - denom = "Einstein" case num.Cmp(Ether) >= 0: fin = new(big.Int).Div(num, Ether) denom = "Ether" diff --git a/common/size_test.go b/common/size_test.go index 1cbeff0a8c..cfe7efe31a 100644 --- a/common/size_test.go +++ b/common/size_test.go @@ -25,8 +25,6 @@ func (s *SizeSuite) TestStorageSizeString(c *checker.C) { } func (s *CommonSuite) TestCommon(c *checker.C) { - douglas := CurrencyToString(BigPow(10, 43)) - einstein := CurrencyToString(BigPow(10, 22)) ether := CurrencyToString(BigPow(10, 19)) finney := CurrencyToString(BigPow(10, 16)) szabo := CurrencyToString(BigPow(10, 13)) @@ -35,8 +33,6 @@ func (s *CommonSuite) TestCommon(c *checker.C) { ada := CurrencyToString(BigPow(10, 4)) wei := CurrencyToString(big.NewInt(10)) - c.Assert(douglas, checker.Equals, "10 Douglas") - c.Assert(einstein, checker.Equals, "10 Einstein") c.Assert(ether, checker.Equals, "10 Ether") c.Assert(finney, checker.Equals, "10 Finney") c.Assert(szabo, checker.Equals, "10 Szabo") @@ -45,13 +41,3 @@ func (s *CommonSuite) TestCommon(c *checker.C) { c.Assert(ada, checker.Equals, "10 Ada") c.Assert(wei, checker.Equals, "10 Wei") } - -func (s *CommonSuite) TestLarge(c *checker.C) { - douglaslarge := CurrencyToString(BigPow(100000000, 43)) - adalarge := CurrencyToString(BigPow(100000000, 4)) - weilarge := CurrencyToString(big.NewInt(100000000)) - - c.Assert(douglaslarge, checker.Equals, "10000E298 Douglas") - c.Assert(adalarge, checker.Equals, "10000E7 Einstein") - c.Assert(weilarge, checker.Equals, "100 Babbage") -} diff --git a/core/events.go b/core/events.go index 3da668af5e..1ea35c2f49 100644 --- a/core/events.go +++ b/core/events.go @@ -1,8 +1,10 @@ package core import ( - "github.com/ethereum/go-ethereum/core/types" + "math/big" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" ) // TxPreEvent is posted when a transaction enters the transaction pool. @@ -44,6 +46,8 @@ type ChainUncleEvent struct { type ChainHeadEvent struct{ Block *types.Block } +type GasPriceChanged struct{ Price *big.Int } + // Mining operation events type StartMining struct{} type TopMining struct{} diff --git a/core/manager.go b/core/manager.go index 9b5407a9e2..433ada7ee6 100644 --- a/core/manager.go +++ b/core/manager.go @@ -1,12 +1,14 @@ package core import ( + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" ) type Backend interface { + AccountManager() *accounts.Manager BlockProcessor() *BlockProcessor ChainManager() *ChainManager TxPool() *TxPool diff --git a/eth/backend.go b/eth/backend.go index 8f07894670..cdbe35b26f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -451,6 +451,8 @@ func (s *Ethereum) Start() error { return nil } +// sync databases every minute. If flushing fails we exit immediatly. The system +// may not continue under any circumstances. func (s *Ethereum) syncDatabases() { ticker := time.NewTicker(1 * time.Minute) done: @@ -459,13 +461,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v\n", err) } case <-s.shutdownChan: break done diff --git a/miner/worker.go b/miner/worker.go index 22493c2354..4ba566eecd 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -253,7 +254,12 @@ func (self *worker) makeCurrent() { func (w *worker) setGasPrice(p *big.Int) { w.mu.Lock() defer w.mu.Unlock() - w.gasPrice = p + + // calculate the minimal gas price the miner accepts when sorting out transactions. + const pct = int64(90) + w.gasPrice = gasprice(p, pct) + + w.mux.Post(core.GasPriceChanged{w.gasPrice}) } func (self *worker) commitNewWork() { @@ -269,27 +275,40 @@ func (self *worker) commitNewWork() { transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) + accounts, _ := self.eth.AccountManager().Accounts() // Keep track of transactions which return errors so they can be removed var ( remove = set.New() tcount = 0 ignoredTransactors = set.New() + lowGasTransactors = set.New() + ownedAccounts = accountAddressesSet(accounts) + lowGasTxs types.Transactions ) - const pct = int64(90) - // calculate the minimal gas price the miner accepts when sorting out transactions. - minprice := gasprice(self.gasPrice, pct) for _, tx := range transactions { // We can skip err. It has already been validated in the tx pool from, _ := tx.From() // check if it falls within margin - if tx.GasPrice().Cmp(minprice) < 0 { + if tx.GasPrice().Cmp(self.gasPrice) < 0 { // ignore the transaction and transactor. We ignore the transactor // because nonce will fail after ignoring this transaction so there's // no point - ignoredTransactors.Add(from) - glog.V(logger.Info).Infof("transaction(%x) below gas price (<%d%% ask price). All sequential txs from this address(%x) will fail\n", tx.Hash().Bytes()[:4], pct, from[:4]) + lowGasTransactors.Add(from) + + glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) + } + + // Continue with the next transaction if the transaction sender is included in + // the low gas tx set. This will also remove the tx and all sequential transaction + // from this transactor + if lowGasTransactors.Has(from) { + // add tx to the low gas set. This will be removed at the end of the run + // owned accounts are ignored + if !ownedAccounts.Has(from) { + lowGasTxs = append(lowGasTxs, tx) + } continue } @@ -327,6 +346,7 @@ func (self *worker) commitNewWork() { tcount++ } } + self.eth.TxPool().RemoveTransactions(lowGasTxs) var ( uncles []*types.Header @@ -423,3 +443,11 @@ func gasprice(price *big.Int, pct int64) *big.Int { p.Mul(p, big.NewInt(pct)) return p } + +func accountAddressesSet(accounts []accounts.Account) *set.Set { + accountSet := set.New() + for _, account := range accounts { + accountSet.Add(common.BytesToAddress(account.Address)) + } + return accountSet +} From df323cdb4e1aaab8e57cb1809a0bc5e47d307260 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 01:24:40 +0200 Subject: [PATCH 2/4] rpc: display error message to stdout --- rpc/jeth.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rpc/jeth.go b/rpc/jeth.go index 4739316b2d..a08f9be8fd 100644 --- a/rpc/jeth.go +++ b/rpc/jeth.go @@ -2,6 +2,7 @@ package rpc import ( "encoding/json" + "fmt" "github.com/ethereum/go-ethereum/jsre" "github.com/robertkrimen/otto" @@ -52,6 +53,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { var respif interface{} err = self.ethApi.GetRequestReply(&req, &respif) if err != nil { + fmt.Println("Error response:", err) return self.err(call, -32603, err.Error(), req.Id) } call.Otto.Set("ret_jsonrpc", jsonrpcver) From 3c6c89168049fbcffa0a02690c27c324f6f0264d Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 01:28:15 +0200 Subject: [PATCH 3/4] core: optimise pending transaction processing --- core/transaction_pool.go | 2 +- miner/worker.go | 186 +++++++++++++++++++++------------------ 2 files changed, 102 insertions(+), 86 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 6898a4bda9..e68f7406ae 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -21,7 +21,7 @@ var ( ErrInvalidSender = errors.New("Invalid sender") ErrNonce = errors.New("Nonce too low") ErrBalance = errors.New("Insufficient balance") - ErrNonExistentAccount = errors.New("Account does not exist") + ErrNonExistentAccount = errors.New("Account does not exist or account balance too low") ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value") ErrIntrinsicGas = errors.New("Intrinsic gas too low") ErrGasLimit = errors.New("Exceeds block gas limit") diff --git a/miner/worker.go b/miner/worker.go index 4ba566eecd..e3dbae717b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -22,12 +22,18 @@ import ( var jsonlogger = logger.NewJsonLogger() type environment struct { - totalUsedGas *big.Int - state *state.StateDB - coinbase *state.StateObject - block *types.Block - family *set.Set - uncles *set.Set + totalUsedGas *big.Int + state *state.StateDB + coinbase *state.StateObject + block *types.Block + family *set.Set + uncles *set.Set + remove *set.Set + tcount int + ignoredTransactors *set.Set + lowGasTransactors *set.Set + ownedAccounts *set.Set + lowGasTxs types.Transactions } func env(block *types.Block, eth core.Backend) *environment { @@ -129,12 +135,13 @@ func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() + atomic.StoreInt32(&self.mining, 1) + // spin up agents for _, agent := range self.agents { agent.Start() } - atomic.StoreInt32(&self.mining, 1) } func (self *worker) stop() { @@ -175,8 +182,11 @@ out: self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case core.TxPreEvent: + // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { - self.commitNewWork() + self.mu.Lock() + self.commitTransactions(types.Transactions{ev.Tx}) + self.mu.Unlock() } } case <-self.quit: @@ -242,13 +252,22 @@ func (self *worker) makeCurrent() { } block.Header().Extra = self.extra - self.current = env(block, self.eth) + current := env(block, self.eth) for _, ancestor := range self.chain.GetAncestors(block, 7) { - self.current.family.Add(ancestor.Hash()) + current.family.Add(ancestor.Hash()) } + accounts, _ := self.eth.AccountManager().Accounts() + // Keep track of transactions which return errors so they can be removed + current.remove = set.New() + current.tcount = 0 + current.ignoredTransactors = set.New() + current.lowGasTransactors = set.New() + current.ownedAccounts = accountAddressesSet(accounts) + + parent := self.chain.GetBlock(current.block.ParentHash()) + current.coinbase.SetGasPool(core.CalcGasLimit(parent)) - parent := self.chain.GetBlock(self.current.block.ParentHash()) - self.current.coinbase.SetGasPool(core.CalcGasLimit(parent)) + self.current = current } func (w *worker) setGasPrice(p *big.Int) { @@ -271,82 +290,14 @@ func (self *worker) commitNewWork() { defer self.currentMu.Unlock() self.makeCurrent() + current := self.current transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) - accounts, _ := self.eth.AccountManager().Accounts() - // Keep track of transactions which return errors so they can be removed - var ( - remove = set.New() - tcount = 0 - ignoredTransactors = set.New() - lowGasTransactors = set.New() - ownedAccounts = accountAddressesSet(accounts) - lowGasTxs types.Transactions - ) - - for _, tx := range transactions { - // We can skip err. It has already been validated in the tx pool - from, _ := tx.From() - - // check if it falls within margin - if tx.GasPrice().Cmp(self.gasPrice) < 0 { - // ignore the transaction and transactor. We ignore the transactor - // because nonce will fail after ignoring this transaction so there's - // no point - lowGasTransactors.Add(from) - - glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) - } - - // Continue with the next transaction if the transaction sender is included in - // the low gas tx set. This will also remove the tx and all sequential transaction - // from this transactor - if lowGasTransactors.Has(from) { - // add tx to the low gas set. This will be removed at the end of the run - // owned accounts are ignored - if !ownedAccounts.Has(from) { - lowGasTxs = append(lowGasTxs, tx) - } - continue - } - - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if ignoredTransactors.Has(from) { - continue - } - - self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) - - err := self.commitTransaction(tx) - switch { - case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) - remove.Add(tx.Hash()) - - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } - case state.IsGasLimitErr(err): - from, _ := tx.From() - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - ignoredTransactors.Add(from) - - glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) - default: - tcount++ - } - } - self.eth.TxPool().RemoveTransactions(lowGasTxs) + // commit transactions for this run + self.commitTransactions(transactions) + self.eth.TxPool().RemoveTransactions(current.lowGasTxs) var ( uncles []*types.Header @@ -372,7 +323,7 @@ func (self *worker) commitNewWork() { // We only care about logging if we're actually mining if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles)) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles)) } for _, hash := range badUncles { @@ -412,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error { return nil } +func (self *worker) commitTransactions(transactions types.Transactions) { + current := self.current + + for _, tx := range transactions { + // We can skip err. It has already been validated in the tx pool + from, _ := tx.From() + + // check if it falls within margin + if tx.GasPrice().Cmp(self.gasPrice) < 0 { + // ignore the transaction and transactor. We ignore the transactor + // because nonce will fail after ignoring this transaction so there's + // no point + current.lowGasTransactors.Add(from) + + glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) + } + + // Continue with the next transaction if the transaction sender is included in + // the low gas tx set. This will also remove the tx and all sequential transaction + // from this transactor + if current.lowGasTransactors.Has(from) { + // add tx to the low gas set. This will be removed at the end of the run + // owned accounts are ignored + if !current.ownedAccounts.Has(from) { + current.lowGasTxs = append(current.lowGasTxs, tx) + } + continue + } + + // Move on to the next transaction when the transactor is in ignored transactions set + // This may occur when a transaction hits the gas limit. When a gas limit is hit and + // the transaction is processed (that could potentially be included in the block) it + // will throw a nonce error because the previous transaction hasn't been processed. + // Therefor we need to ignore any transaction after the ignored one. + if current.ignoredTransactors.Has(from) { + continue + } + + self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) + + err := self.commitTransaction(tx) + switch { + case core.IsNonceErr(err) || core.IsInvalidTxErr(err): + // Remove invalid transactions + from, _ := tx.From() + + self.chain.TxState().RemoveNonce(from, tx.Nonce()) + current.remove.Add(tx.Hash()) + + if glog.V(logger.Detail) { + glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + } + case state.IsGasLimitErr(err): + from, _ := tx.From() + // ignore the transactor so no nonce errors will be thrown for this account + // next time the worker is run, they'll be picked up again. + current.ignoredTransactors.Add(from) + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + default: + current.tcount++ + } + } +} + func (self *worker) commitTransaction(tx *types.Transaction) error { snap := self.current.state.Copy() receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) From 6ecba12650f2d20eded5f4f09fb312d84e81d909 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 11:36:09 +0200 Subject: [PATCH 4/4] miner: added log message for mining operation. #912 --- miner/miner.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/miner/miner.go b/miner/miner.go index d5ea9a1469..efe6d30518 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -7,6 +7,8 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/pow" ) @@ -47,6 +49,8 @@ func (m *Miner) SetGasPrice(price *big.Int) { } func (self *Miner) Start(coinbase common.Address) { + glog.V(logger.Info).Infoln("Starting mining operation") + self.mining = true self.worker.coinbase = coinbase self.worker.start()