eth/downloader, eth/filters: use defer to call Unsubscribe (#28762)

pull/28769/head
ucwong 11 months ago committed by GitHub
parent e3eeb64c94
commit 877d09443d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      eth/downloader/api.go
  2. 10
      eth/filters/api.go

@ -101,16 +101,15 @@ func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error
go func() { go func() {
statuses := make(chan interface{}) statuses := make(chan interface{})
sub := api.SubscribeSyncStatus(statuses) sub := api.SubscribeSyncStatus(statuses)
defer sub.Unsubscribe()
for { for {
select { select {
case status := <-statuses: case status := <-statuses:
notifier.Notify(rpcSub.ID, status) notifier.Notify(rpcSub.ID, status)
case <-rpcSub.Err(): case <-rpcSub.Err():
sub.Unsubscribe()
return return
case <-notifier.Closed(): case <-notifier.Closed():
sub.Unsubscribe()
return return
} }
} }

@ -159,6 +159,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
go func() { go func() {
txs := make(chan []*types.Transaction, 128) txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs) pendingTxSub := api.events.SubscribePendingTxs(txs)
defer pendingTxSub.Unsubscribe()
chainConfig := api.sys.backend.ChainConfig() chainConfig := api.sys.backend.ChainConfig()
for { for {
@ -176,10 +178,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
} }
} }
case <-rpcSub.Err(): case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return return
case <-notifier.Closed(): case <-notifier.Closed():
pendingTxSub.Unsubscribe()
return return
} }
} }
@ -233,16 +233,15 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
go func() { go func() {
headers := make(chan *types.Header) headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers) headersSub := api.events.SubscribeNewHeads(headers)
defer headersSub.Unsubscribe()
for { for {
select { select {
case h := <-headers: case h := <-headers:
notifier.Notify(rpcSub.ID, h) notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err(): case <-rpcSub.Err():
headersSub.Unsubscribe()
return return
case <-notifier.Closed(): case <-notifier.Closed():
headersSub.Unsubscribe()
return return
} }
} }
@ -267,6 +266,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer logsSub.Unsubscribe()
go func() { go func() {
for { for {
@ -277,10 +277,8 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
notifier.Notify(rpcSub.ID, &log) notifier.Notify(rpcSub.ID, &log)
} }
case <-rpcSub.Err(): // client send an unsubscribe request case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe()
return return
case <-notifier.Closed(): // connection dropped case <-notifier.Closed(): // connection dropped
logsSub.Unsubscribe()
return return
} }
} }

Loading…
Cancel
Save