Official Go implementation of the Ethereum protocol
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-ethereum/core/transaction_pool.go

207 lines
4.5 KiB

package core
11 years ago
import (
"errors"
11 years ago
"fmt"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event"
10 years ago
"github.com/ethereum/go-ethereum/logger"
11 years ago
)
var (
txplogger = logger.NewLogger("TXP")
ErrInvalidSender = errors.New("Invalid sender")
)
const txPoolQueueSize = 50
11 years ago
type TxPoolHook chan *types.Transaction
type TxMsg struct {
Tx *types.Transaction
}
const (
minGasPrice = 1000000
)
type TxProcessor interface {
ProcessTransaction(tx *types.Transaction)
}
11 years ago
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
// independently read without needing access to the actual pool.
11 years ago
type TxPool struct {
mu sync.RWMutex
11 years ago
// Queueing channel for reading and writing incoming
// transactions to
queueChan chan *types.Transaction
11 years ago
// Quiting channel
quit chan bool
// The actual pool
//pool *list.List
10 years ago
txs map[string]*types.Transaction
11 years ago
SecondaryProcessor TxProcessor
subscribers []chan TxMsg
eventMux *event.TypeMux
11 years ago
}
func NewTxPool(eventMux *event.TypeMux) *TxPool {
11 years ago
return &TxPool{
10 years ago
txs: make(map[string]*types.Transaction),
queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool),
eventMux: eventMux,
11 years ago
}
}
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
if len(tx.To()) != 0 && len(tx.To()) != 20 {
return fmt.Errorf("Invalid recipient. len = %d", len(tx.To()))
}
// Validate curve param
10 years ago
v, _, _ := tx.Curve()
if v > 28 || v < 27 {
return fmt.Errorf("tx.v != (28 || 27) => %v", v)
11 years ago
}
// Validate sender address
senderAddr := tx.From()
if senderAddr == nil || len(senderAddr) != 20 {
return ErrInvalidSender
}
/* XXX this kind of validation needs to happen elsewhere in the gui when sending txs.
Other clients should do their own validation. Value transfer could throw error
but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
can still be rewarded for their inclusion and processing.
sender := pool.stateQuery.GetAccount(senderAddr)
totAmount := new(big.Int).Set(tx.Value())
11 years ago
// Make sure there's enough in the sender's account. Having insufficient
// funds won't invalidate this transaction but simple ignores it.
if sender.Balance().Cmp(totAmount) < 0 {
return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
}
*/
11 years ago
return nil
}
10 years ago
func (self *TxPool) addTx(tx *types.Transaction) {
self.txs[string(tx.Hash())] = tx
}
func (self *TxPool) add(tx *types.Transaction) error {
if self.txs[string(tx.Hash())] != nil {
return fmt.Errorf("Known transaction (%x)", tx.Hash()[0:4])
}
err := self.ValidateTransaction(tx)
if err != nil {
return err
}
10 years ago
self.addTx(tx)
var to string
if len(tx.To()) > 0 {
to = ethutil.Bytes2Hex(tx.To()[:4])
} else {
to = "[NEW_CONTRACT]"
}
var from string
if len(tx.From()) > 0 {
from = ethutil.Bytes2Hex(tx.From()[:4])
} else {
return errors.New(fmt.Sprintf("FROM ADDRESS MUST BE POSITIVE (was %v)", tx.From()))
}
txplogger.Debugf("(t) %x => %s (%v) %x\n", from, to, tx.Value, tx.Hash())
// Notify the subscribers
go self.eventMux.Post(TxPreEvent{tx})
return nil
}
func (self *TxPool) Size() int {
10 years ago
return len(self.txs)
}
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
defer self.mu.Unlock()
return self.add(tx)
}
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
self.mu.Lock()
defer self.mu.Unlock()
for _, tx := range txs {
if err := self.add(tx); err != nil {
txplogger.Debugln(err)
} else {
txplogger.Debugf("tx %x\n", tx.Hash()[0:4])
}
}
}
10 years ago
func (self *TxPool) GetTransactions() (txs types.Transactions) {
self.mu.RLock()
defer self.mu.RUnlock()
10 years ago
txs = make(types.Transactions, self.Size())
11 years ago
i := 0
10 years ago
for _, tx := range self.txs {
txs[i] = tx
11 years ago
i++
10 years ago
}
10 years ago
return
}
func (pool *TxPool) RemoveInvalid(query StateQuery) {
pool.mu.Lock()
var removedTxs types.Transactions
10 years ago
for _, tx := range pool.txs {
sender := query.GetAccount(tx.From())
err := pool.ValidateTransaction(tx)
if err != nil || sender.Nonce() >= tx.Nonce() {
removedTxs = append(removedTxs, tx)
}
10 years ago
}
pool.mu.Unlock()
pool.RemoveSet(removedTxs)
}
func (self *TxPool) RemoveSet(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
for _, tx := range txs {
10 years ago
delete(self.txs, string(tx.Hash()))
}
}
func (pool *TxPool) Flush() {
10 years ago
pool.txs = make(map[string]*types.Transaction)
11 years ago
}
func (pool *TxPool) Start() {
}
func (pool *TxPool) Stop() {
pool.Flush()
txplogger.Infoln("Stopped")
11 years ago
}