|
|
|
@ -59,7 +59,7 @@ const ( |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
|
|
|
|
|
// txChanSize is the size of channel listening to TxPreEvent.
|
|
|
|
|
// txChanSize is the size of channel listening to NewTxsEvent.
|
|
|
|
|
// The number is referenced from the size of tx pool.
|
|
|
|
|
txChanSize = 4096 |
|
|
|
|
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
|
|
|
|
@ -80,7 +80,7 @@ type subscription struct { |
|
|
|
|
created time.Time |
|
|
|
|
logsCrit ethereum.FilterQuery |
|
|
|
|
logs chan []*types.Log |
|
|
|
|
hashes chan common.Hash |
|
|
|
|
hashes chan []common.Hash |
|
|
|
|
headers chan *types.Header |
|
|
|
|
installed chan struct{} // closed when the filter is installed
|
|
|
|
|
err chan error // closed when the filter is uninstalled
|
|
|
|
@ -95,7 +95,7 @@ type EventSystem struct { |
|
|
|
|
lastHead *types.Header |
|
|
|
|
|
|
|
|
|
// Subscriptions
|
|
|
|
|
txSub event.Subscription // Subscription for new transaction event
|
|
|
|
|
txsSub event.Subscription // Subscription for new transaction event
|
|
|
|
|
logsSub event.Subscription // Subscription for new log event
|
|
|
|
|
rmLogsSub event.Subscription // Subscription for removed log event
|
|
|
|
|
chainSub event.Subscription // Subscription for new chain event
|
|
|
|
@ -104,7 +104,7 @@ type EventSystem struct { |
|
|
|
|
// Channels
|
|
|
|
|
install chan *subscription // install filter for event notification
|
|
|
|
|
uninstall chan *subscription // remove filter for event notification
|
|
|
|
|
txCh chan core.TxPreEvent // Channel to receive new transaction event
|
|
|
|
|
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
|
|
|
|
|
logsCh chan []*types.Log // Channel to receive new log event
|
|
|
|
|
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
|
|
|
|
chainCh chan core.ChainEvent // Channel to receive new chain event
|
|
|
|
@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS |
|
|
|
|
lightMode: lightMode, |
|
|
|
|
install: make(chan *subscription), |
|
|
|
|
uninstall: make(chan *subscription), |
|
|
|
|
txCh: make(chan core.TxPreEvent, txChanSize), |
|
|
|
|
txsCh: make(chan core.NewTxsEvent, txChanSize), |
|
|
|
|
logsCh: make(chan []*types.Log, logsChanSize), |
|
|
|
|
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), |
|
|
|
|
chainCh: make(chan core.ChainEvent, chainEvChanSize), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Subscribe events
|
|
|
|
|
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) |
|
|
|
|
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh) |
|
|
|
|
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) |
|
|
|
|
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) |
|
|
|
|
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) |
|
|
|
@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS |
|
|
|
|
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) |
|
|
|
|
|
|
|
|
|
// Make sure none of the subscriptions are empty
|
|
|
|
|
if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || |
|
|
|
|
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || |
|
|
|
|
m.pendingLogSub.Closed() { |
|
|
|
|
log.Crit("Subscribe for event system failed") |
|
|
|
|
} |
|
|
|
@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs |
|
|
|
|
logsCrit: crit, |
|
|
|
|
created: time.Now(), |
|
|
|
|
logs: logs, |
|
|
|
|
hashes: make(chan common.Hash), |
|
|
|
|
hashes: make(chan []common.Hash), |
|
|
|
|
headers: make(chan *types.Header), |
|
|
|
|
installed: make(chan struct{}), |
|
|
|
|
err: make(chan error), |
|
|
|
@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ |
|
|
|
|
logsCrit: crit, |
|
|
|
|
created: time.Now(), |
|
|
|
|
logs: logs, |
|
|
|
|
hashes: make(chan common.Hash), |
|
|
|
|
hashes: make(chan []common.Hash), |
|
|
|
|
headers: make(chan *types.Header), |
|
|
|
|
installed: make(chan struct{}), |
|
|
|
|
err: make(chan error), |
|
|
|
@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan |
|
|
|
|
logsCrit: crit, |
|
|
|
|
created: time.Now(), |
|
|
|
|
logs: logs, |
|
|
|
|
hashes: make(chan common.Hash), |
|
|
|
|
hashes: make(chan []common.Hash), |
|
|
|
|
headers: make(chan *types.Header), |
|
|
|
|
installed: make(chan struct{}), |
|
|
|
|
err: make(chan error), |
|
|
|
@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti |
|
|
|
|
typ: BlocksSubscription, |
|
|
|
|
created: time.Now(), |
|
|
|
|
logs: make(chan []*types.Log), |
|
|
|
|
hashes: make(chan common.Hash), |
|
|
|
|
hashes: make(chan []common.Hash), |
|
|
|
|
headers: headers, |
|
|
|
|
installed: make(chan struct{}), |
|
|
|
|
err: make(chan error), |
|
|
|
@ -298,9 +298,9 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti |
|
|
|
|
return es.subscribe(sub) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
|
|
|
|
|
// SubscribePendingTxs creates a subscription that writes transaction hashes for
|
|
|
|
|
// transactions that enter the transaction pool.
|
|
|
|
|
func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { |
|
|
|
|
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { |
|
|
|
|
sub := &subscription{ |
|
|
|
|
id: rpc.NewID(), |
|
|
|
|
typ: PendingTransactionsSubscription, |
|
|
|
@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
case core.TxPreEvent: |
|
|
|
|
case core.NewTxsEvent: |
|
|
|
|
hashes := make([]common.Hash, 0, len(e.Txs)) |
|
|
|
|
for _, tx := range e.Txs { |
|
|
|
|
hashes = append(hashes, tx.Hash()) |
|
|
|
|
} |
|
|
|
|
for _, f := range filters[PendingTransactionsSubscription] { |
|
|
|
|
f.hashes <- e.Tx.Hash() |
|
|
|
|
f.hashes <- hashes |
|
|
|
|
} |
|
|
|
|
case core.ChainEvent: |
|
|
|
|
for _, f := range filters[BlocksSubscription] { |
|
|
|
@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { |
|
|
|
|
// Ensure all subscriptions get cleaned up
|
|
|
|
|
defer func() { |
|
|
|
|
es.pendingLogSub.Unsubscribe() |
|
|
|
|
es.txSub.Unsubscribe() |
|
|
|
|
es.txsSub.Unsubscribe() |
|
|
|
|
es.logsSub.Unsubscribe() |
|
|
|
|
es.rmLogsSub.Unsubscribe() |
|
|
|
|
es.chainSub.Unsubscribe() |
|
|
|
@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
// Handle subscribed events
|
|
|
|
|
case ev := <-es.txCh: |
|
|
|
|
case ev := <-es.txsCh: |
|
|
|
|
es.broadcast(index, ev) |
|
|
|
|
case ev := <-es.logsCh: |
|
|
|
|
es.broadcast(index, ev) |
|
|
|
@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { |
|
|
|
|
close(f.err) |
|
|
|
|
|
|
|
|
|
// System stopped
|
|
|
|
|
case <-es.txSub.Err(): |
|
|
|
|
case <-es.txsSub.Err(): |
|
|
|
|
return |
|
|
|
|
case <-es.logsSub.Err(): |
|
|
|
|
return |
|
|
|
|