@ -95,6 +95,8 @@ type Service struct {
pongCh chan struct { } // Pong notifications are fed into this channel
pongCh chan struct { } // Pong notifications are fed into this channel
histCh chan [ ] uint64 // History request block numbers are fed into this channel
histCh chan [ ] uint64 // History request block numbers are fed into this channel
headSub event . Subscription
txSub event . Subscription
}
}
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
@ -167,7 +169,12 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string)
// Start implements node.Lifecycle, starting up the monitoring and reporting daemon.
// Start implements node.Lifecycle, starting up the monitoring and reporting daemon.
func ( s * Service ) Start ( ) error {
func ( s * Service ) Start ( ) error {
go s . loop ( )
// Subscribe to chain events to execute updates on
chainHeadCh := make ( chan core . ChainHeadEvent , chainHeadChanSize )
s . headSub = s . backend . SubscribeChainHeadEvent ( chainHeadCh )
txEventCh := make ( chan core . NewTxsEvent , txChanSize )
s . txSub = s . backend . SubscribeNewTxsEvent ( txEventCh )
go s . loop ( chainHeadCh , txEventCh )
log . Info ( "Stats daemon started" )
log . Info ( "Stats daemon started" )
return nil
return nil
@ -175,22 +182,15 @@ func (s *Service) Start() error {
// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon.
// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon.
func ( s * Service ) Stop ( ) error {
func ( s * Service ) Stop ( ) error {
s . headSub . Unsubscribe ( )
s . txSub . Unsubscribe ( )
log . Info ( "Stats daemon stopped" )
log . Info ( "Stats daemon stopped" )
return nil
return nil
}
}
// loop keeps trying to connect to the netstats server, reporting chain events
// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
// until termination.
func ( s * Service ) loop ( ) {
func ( s * Service ) loop ( chainHeadCh chan core . ChainHeadEvent , txEventCh chan core . NewTxsEvent ) {
// Subscribe to chain events to execute updates on
chainHeadCh := make ( chan core . ChainHeadEvent , chainHeadChanSize )
headSub := s . backend . SubscribeChainHeadEvent ( chainHeadCh )
defer headSub . Unsubscribe ( )
txEventCh := make ( chan core . NewTxsEvent , txChanSize )
txSub := s . backend . SubscribeNewTxsEvent ( txEventCh )
defer txSub . Unsubscribe ( )
// Start a goroutine that exhausts the subscriptions to avoid events piling up
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
var (
quitCh = make ( chan struct { } )
quitCh = make ( chan struct { } )
@ -223,9 +223,9 @@ func (s *Service) loop() {
}
}
// node stopped
// node stopped
case <- txSub . Err ( ) :
case <- s . txSub . Err ( ) :
break HandleLoop
break HandleLoop
case <- headSub . Err ( ) :
case <- s . headSub . Err ( ) :
break HandleLoop
break HandleLoop
}
}
}
}