|
|
@ -31,6 +31,7 @@ import ( |
|
|
|
"github.com/ethereum/go-ethereum/core/rawdb" |
|
|
|
"github.com/ethereum/go-ethereum/core/rawdb" |
|
|
|
"github.com/ethereum/go-ethereum/core/types" |
|
|
|
"github.com/ethereum/go-ethereum/core/types" |
|
|
|
"github.com/ethereum/go-ethereum/event" |
|
|
|
"github.com/ethereum/go-ethereum/event" |
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/log" |
|
|
|
"github.com/ethereum/go-ethereum/rpc" |
|
|
|
"github.com/ethereum/go-ethereum/rpc" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
@ -92,8 +93,21 @@ type EventSystem struct { |
|
|
|
backend Backend |
|
|
|
backend Backend |
|
|
|
lightMode bool |
|
|
|
lightMode bool |
|
|
|
lastHead *types.Header |
|
|
|
lastHead *types.Header |
|
|
|
install chan *subscription // install filter for event notification
|
|
|
|
|
|
|
|
uninstall chan *subscription // remove filter for event notification
|
|
|
|
// Subscriptions
|
|
|
|
|
|
|
|
txSub event.Subscription // Subscription for new transaction event
|
|
|
|
|
|
|
|
logsSub event.Subscription // Subscription for new log event
|
|
|
|
|
|
|
|
rmLogsSub event.Subscription // Subscription for removed log event
|
|
|
|
|
|
|
|
chainSub event.Subscription // Subscription for new chain event
|
|
|
|
|
|
|
|
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Channels
|
|
|
|
|
|
|
|
install chan *subscription // install filter for event notification
|
|
|
|
|
|
|
|
uninstall chan *subscription // remove filter for event notification
|
|
|
|
|
|
|
|
txCh chan core.TxPreEvent // Channel to receive new transaction event
|
|
|
|
|
|
|
|
logsCh chan []*types.Log // Channel to receive new log event
|
|
|
|
|
|
|
|
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
|
|
|
|
|
|
|
chainCh chan core.ChainEvent // Channel to receive new chain event
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// NewEventSystem creates a new manager that listens for event on the given mux,
|
|
|
|
// NewEventSystem creates a new manager that listens for event on the given mux,
|
|
|
@ -109,10 +123,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS |
|
|
|
lightMode: lightMode, |
|
|
|
lightMode: lightMode, |
|
|
|
install: make(chan *subscription), |
|
|
|
install: make(chan *subscription), |
|
|
|
uninstall: make(chan *subscription), |
|
|
|
uninstall: make(chan *subscription), |
|
|
|
|
|
|
|
txCh: make(chan core.TxPreEvent, txChanSize), |
|
|
|
|
|
|
|
logsCh: make(chan []*types.Log, logsChanSize), |
|
|
|
|
|
|
|
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), |
|
|
|
|
|
|
|
chainCh: make(chan core.ChainEvent, chainEvChanSize), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
go m.eventLoop() |
|
|
|
// Subscribe events
|
|
|
|
|
|
|
|
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) |
|
|
|
|
|
|
|
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) |
|
|
|
|
|
|
|
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) |
|
|
|
|
|
|
|
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) |
|
|
|
|
|
|
|
// TODO(rjl493456442): use feed to subscribe pending log event
|
|
|
|
|
|
|
|
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Make sure none of the subscriptions are empty
|
|
|
|
|
|
|
|
if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || |
|
|
|
|
|
|
|
m.pendingLogSub.Closed() { |
|
|
|
|
|
|
|
log.Crit("Subscribe for event system failed") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
go m.eventLoop() |
|
|
|
return m |
|
|
|
return m |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -412,50 +443,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. |
|
|
|
|
|
|
|
|
|
|
|
// eventLoop (un)installs filters and processes mux events.
|
|
|
|
// eventLoop (un)installs filters and processes mux events.
|
|
|
|
func (es *EventSystem) eventLoop() { |
|
|
|
func (es *EventSystem) eventLoop() { |
|
|
|
var ( |
|
|
|
// Ensure all subscriptions get cleaned up
|
|
|
|
index = make(filterIndex) |
|
|
|
defer func() { |
|
|
|
sub = es.mux.Subscribe(core.PendingLogsEvent{}) |
|
|
|
es.pendingLogSub.Unsubscribe() |
|
|
|
// Subscribe TxPreEvent form txpool
|
|
|
|
es.txSub.Unsubscribe() |
|
|
|
txCh = make(chan core.TxPreEvent, txChanSize) |
|
|
|
es.logsSub.Unsubscribe() |
|
|
|
txSub = es.backend.SubscribeTxPreEvent(txCh) |
|
|
|
es.rmLogsSub.Unsubscribe() |
|
|
|
// Subscribe RemovedLogsEvent
|
|
|
|
es.chainSub.Unsubscribe() |
|
|
|
rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) |
|
|
|
}() |
|
|
|
rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) |
|
|
|
|
|
|
|
// Subscribe []*types.Log
|
|
|
|
index := make(filterIndex) |
|
|
|
logsCh = make(chan []*types.Log, logsChanSize) |
|
|
|
|
|
|
|
logsSub = es.backend.SubscribeLogsEvent(logsCh) |
|
|
|
|
|
|
|
// Subscribe ChainEvent
|
|
|
|
|
|
|
|
chainEvCh = make(chan core.ChainEvent, chainEvChanSize) |
|
|
|
|
|
|
|
chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Unsubscribe all events
|
|
|
|
|
|
|
|
defer sub.Unsubscribe() |
|
|
|
|
|
|
|
defer txSub.Unsubscribe() |
|
|
|
|
|
|
|
defer rmLogsSub.Unsubscribe() |
|
|
|
|
|
|
|
defer logsSub.Unsubscribe() |
|
|
|
|
|
|
|
defer chainEvSub.Unsubscribe() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i := UnknownSubscription; i < LastIndexSubscription; i++ { |
|
|
|
for i := UnknownSubscription; i < LastIndexSubscription; i++ { |
|
|
|
index[i] = make(map[rpc.ID]*subscription) |
|
|
|
index[i] = make(map[rpc.ID]*subscription) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
|
for { |
|
|
|
select { |
|
|
|
select { |
|
|
|
case ev, active := <-sub.Chan(): |
|
|
|
|
|
|
|
if !active { // system stopped
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
es.broadcast(index, ev) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Handle subscribed events
|
|
|
|
// Handle subscribed events
|
|
|
|
case ev := <-txCh: |
|
|
|
case ev := <-es.txCh: |
|
|
|
es.broadcast(index, ev) |
|
|
|
es.broadcast(index, ev) |
|
|
|
case ev := <-rmLogsCh: |
|
|
|
case ev := <-es.logsCh: |
|
|
|
es.broadcast(index, ev) |
|
|
|
es.broadcast(index, ev) |
|
|
|
case ev := <-logsCh: |
|
|
|
case ev := <-es.rmLogsCh: |
|
|
|
es.broadcast(index, ev) |
|
|
|
es.broadcast(index, ev) |
|
|
|
case ev := <-chainEvCh: |
|
|
|
case ev := <-es.chainCh: |
|
|
|
|
|
|
|
es.broadcast(index, ev) |
|
|
|
|
|
|
|
case ev, active := <-es.pendingLogSub.Chan(): |
|
|
|
|
|
|
|
if !active { // system stopped
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
es.broadcast(index, ev) |
|
|
|
es.broadcast(index, ev) |
|
|
|
|
|
|
|
|
|
|
|
case f := <-es.install: |
|
|
|
case f := <-es.install: |
|
|
@ -467,6 +483,7 @@ func (es *EventSystem) eventLoop() { |
|
|
|
index[f.typ][f.id] = f |
|
|
|
index[f.typ][f.id] = f |
|
|
|
} |
|
|
|
} |
|
|
|
close(f.installed) |
|
|
|
close(f.installed) |
|
|
|
|
|
|
|
|
|
|
|
case f := <-es.uninstall: |
|
|
|
case f := <-es.uninstall: |
|
|
|
if f.typ == MinedAndPendingLogsSubscription { |
|
|
|
if f.typ == MinedAndPendingLogsSubscription { |
|
|
|
// the type are logs and pending logs subscriptions
|
|
|
|
// the type are logs and pending logs subscriptions
|
|
|
@ -478,13 +495,13 @@ func (es *EventSystem) eventLoop() { |
|
|
|
close(f.err) |
|
|
|
close(f.err) |
|
|
|
|
|
|
|
|
|
|
|
// System stopped
|
|
|
|
// System stopped
|
|
|
|
case <-txSub.Err(): |
|
|
|
case <-es.txSub.Err(): |
|
|
|
return |
|
|
|
return |
|
|
|
case <-rmLogsSub.Err(): |
|
|
|
case <-es.logsSub.Err(): |
|
|
|
return |
|
|
|
return |
|
|
|
case <-logsSub.Err(): |
|
|
|
case <-es.rmLogsSub.Err(): |
|
|
|
return |
|
|
|
return |
|
|
|
case <-chainEvSub.Err(): |
|
|
|
case <-es.chainSub.Err(): |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|