@ -1,16 +1,13 @@
package core
package core
import (
import (
"bytes"
"container/list"
"fmt"
"fmt"
"math/big"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/state "
"gopkg.in/fatih/set.v0 "
)
)
var txplogger = logger . NewLogger ( "TXP" )
var txplogger = logger . NewLogger ( "TXP" )
@ -26,86 +23,50 @@ const (
minGasPrice = 1000000
minGasPrice = 1000000
)
)
var MinGasPrice = big . NewInt ( 10000000000000 )
func EachTx ( pool * list . List , it func ( * types . Transaction , * list . Element ) bool ) {
for e := pool . Front ( ) ; e != nil ; e = e . Next ( ) {
if it ( e . Value . ( * types . Transaction ) , e ) {
break
}
}
}
func FindTx ( pool * list . List , finder func ( * types . Transaction , * list . Element ) bool ) * types . Transaction {
for e := pool . Front ( ) ; e != nil ; e = e . Next ( ) {
if tx , ok := e . Value . ( * types . Transaction ) ; ok {
if finder ( tx , e ) {
return tx
}
}
}
return nil
}
type TxProcessor interface {
type TxProcessor interface {
ProcessTransaction ( tx * types . Transaction )
ProcessTransaction ( tx * types . Transaction )
}
}
// The tx pool a thread safe transaction pool handler. In order to
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
// guarantee a non blocking pool we use a queue channel which can be
// independently read without needing access to the actual pool. If the
// independently read without needing access to the actual pool.
// pool is being drained or synced for whatever reason the transactions
// will simple queue up and handled when the mutex is freed.
type TxPool struct {
type TxPool struct {
// The mutex for accessing the Tx pool.
mutex sync . Mutex
// Queueing channel for reading and writing incoming
// Queueing channel for reading and writing incoming
// transactions to
// transactions to
queueChan chan * types . Transaction
queueChan chan * types . Transaction
// Quiting channel
// Quiting channel
quit chan bool
quit chan bool
// The actual pool
// The actual pool
pool * list . List
//pool *list.List
pool * set . Set
SecondaryProcessor TxProcessor
SecondaryProcessor TxProcessor
subscribers [ ] chan TxMsg
subscribers [ ] chan TxMsg
chainManager * ChainManager
stateQuery StateQuery
eventMux * event . TypeMux
eventMux * event . TypeMux
}
}
func NewTxPool ( chainManager * ChainManager , eventMux * event . TypeMux ) * TxPool {
func NewTxPool ( stateQuery StateQuery , eventMux * event . TypeMux ) * TxPool {
return & TxPool {
return & TxPool {
pool : li st. New ( ) ,
pool : se t . New ( ) ,
queueChan : make ( chan * types . Transaction , txPoolQueueSize ) ,
queueChan : make ( chan * types . Transaction , txPoolQueueSize ) ,
quit : make ( chan bool ) ,
quit : make ( chan bool ) ,
chainManager : chainManager ,
stateQuery : stateQuery ,
eventMux : eventMux ,
eventMux : eventMux ,
}
}
}
}
// Blocking function. Don't use directly. Use QueueTransaction instead
func ( pool * TxPool ) addTransaction ( tx * types . Transaction ) {
func ( pool * TxPool ) addTransaction ( tx * types . Transaction ) {
pool . mutex . Lock ( )
defer pool . mutex . Unlock ( )
pool . pool . PushBack ( tx )
pool . pool . Add ( tx )
// Broadcast the transaction to the rest of the peers
// Broadcast the transaction to the rest of the peers
pool . eventMux . Post ( TxPreEvent { tx } )
pool . eventMux . Post ( TxPreEvent { tx } )
}
}
func ( pool * TxPool ) ValidateTransaction ( tx * types . Transaction ) error {
func ( pool * TxPool ) ValidateTransaction ( tx * types . Transaction ) error {
// Get the last block so we can retrieve the sender and receiver from
// the merkle trie
block := pool . chainManager . CurrentBlock
// Something has gone horribly wrong if this happens
if block == nil {
return fmt . Errorf ( "No last block on the block chain" )
}
if len ( tx . To ( ) ) != 0 && len ( tx . To ( ) ) != 20 {
if len ( tx . To ( ) ) != 0 && len ( tx . To ( ) ) != 20 {
return fmt . Errorf ( "Invalid recipient. len = %d" , len ( tx . To ( ) ) )
return fmt . Errorf ( "Invalid recipient. len = %d" , len ( tx . To ( ) ) )
}
}
@ -120,7 +81,7 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
if senderAddr == nil {
if senderAddr == nil {
return fmt . Errorf ( "invalid sender" )
return fmt . Errorf ( "invalid sender" )
}
}
sender := pool . chainManager . State ( ) . GetAccount ( senderAddr )
sender := pool . stateQuery . GetAccount ( senderAddr )
totAmount := new ( big . Int ) . Set ( tx . Value ( ) )
totAmount := new ( big . Int ) . Set ( tx . Value ( ) )
// Make sure there's enough in the sender's account. Having insufficient
// Make sure there's enough in the sender's account. Having insufficient
@ -129,19 +90,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
return fmt . Errorf ( "Insufficient amount in sender's (%x) account" , tx . From ( ) )
return fmt . Errorf ( "Insufficient amount in sender's (%x) account" , tx . From ( ) )
}
}
// Increment the nonce making each tx valid only once to prevent replay
// attacks
return nil
return nil
}
}
func ( self * TxPool ) Add ( tx * types . Transaction ) error {
func ( self * TxPool ) Add ( tx * types . Transaction ) error {
hash := tx . Hash ( )
hash := tx . Hash ( )
foundTx := FindTx ( self . pool , func ( tx * types . Transaction , e * list . Element ) bool {
if self . pool . Has ( tx ) {
return bytes . Compare ( tx . Hash ( ) , hash ) == 0
} )
if foundTx != nil {
return fmt . Errorf ( "Known transaction (%x)" , hash [ 0 : 4 ] )
return fmt . Errorf ( "Known transaction (%x)" , hash [ 0 : 4 ] )
}
}
@ -161,7 +115,7 @@ func (self *TxPool) Add(tx *types.Transaction) error {
}
}
func ( self * TxPool ) Size ( ) int {
func ( self * TxPool ) Size ( ) int {
return self . pool . Len ( )
return self . pool . Size ( )
}
}
func ( self * TxPool ) AddTransactions ( txs [ ] * types . Transaction ) {
func ( self * TxPool ) AddTransactions ( txs [ ] * types . Transaction ) {
@ -175,63 +129,47 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
}
}
func ( pool * TxPool ) GetTransactions ( ) [ ] * types . Transaction {
func ( pool * TxPool ) GetTransactions ( ) [ ] * types . Transaction {
pool . mutex . Lock ( )
txList := make ( [ ] * types . Transaction , pool . Size ( ) )
defer pool . mutex . Unlock ( )
txList := make ( [ ] * types . Transaction , pool . pool . Len ( ) )
i := 0
i := 0
for e := pool . pool . Front ( ) ; e != nil ; e = e . Next ( ) {
pool . pool . Each ( func ( v interface { } ) bool {
tx := e . Value . ( * types . Transaction )
txList [ i ] = v . ( * types . Transaction )
txList [ i ] = tx
i ++
i ++
}
return true
} )
return txList
return txList
}
}
func ( pool * TxPool ) RemoveInvalid ( state * state . StateDB ) {
func ( pool * TxPool ) RemoveInvalid ( query StateQuery ) {
pool . mutex . Lock ( )
var removedTxs types . Transactions
defer pool . mutex . Unlock ( )
pool . pool . Each ( func ( v interface { } ) bool {
tx := v . ( * types . Transaction )
for e := pool . pool . Front ( ) ; e != nil ; e = e . Next ( ) {
sender := query . GetAccount ( tx . From ( ) )
tx := e . Value . ( * types . Transaction )
sender := state . GetAccount ( tx . From ( ) )
err := pool . ValidateTransaction ( tx )
err := pool . ValidateTransaction ( tx )
if err != nil || sender . Nonce >= tx . Nonce ( ) {
if err != nil || sender . Nonce >= tx . Nonce ( ) {
pool . pool . Remove ( e )
removedTxs = append ( removedTxs , tx )
}
}
}
return true
} )
pool . RemoveSet ( removedTxs )
}
}
func ( self * TxPool ) RemoveSet ( txs types . Transactions ) {
func ( self * TxPool ) RemoveSet ( txs types . Transactions ) {
self . mutex . Lock ( )
defer self . mutex . Unlock ( )
for _ , tx := range txs {
for _ , tx := range txs {
EachTx ( self . pool , func ( t * types . Transaction , element * list . Element ) bool {
self . pool . Remove ( tx )
if t == tx {
self . pool . Remove ( element )
return true // To stop the loop
}
return false
} )
}
}
}
}
func ( pool * TxPool ) Flush ( ) [ ] * types . Transaction {
func ( pool * TxPool ) Flush ( ) [ ] * types . Transaction {
txList := pool . GetTransactions ( )
txList := pool . GetTransactions ( )
pool . pool . Clear ( )
// Recreate a new list all together
// XXX Is this the fastest way?
pool . pool = list . New ( )
return txList
return txList
}
}
func ( pool * TxPool ) Start ( ) {
func ( pool * TxPool ) Start ( ) {
//go pool.queueHandler()
}
}
func ( pool * TxPool ) Stop ( ) {
func ( pool * TxPool ) Stop ( ) {