@ -155,7 +155,6 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
session := & MatcherSession {
matcher : m ,
quit : make ( chan struct { } ) ,
kill : make ( chan struct { } ) ,
ctx : ctx ,
}
for _ , scheduler := range m . schedulers {
@ -386,10 +385,8 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
requests = make ( map [ uint ] [ ] uint64 ) // Per-bit list of section requests, ordered by section number
unallocs = make ( map [ uint ] struct { } ) // Bits with pending requests but not allocated to any retriever
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
)
var (
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session . quit // Shutdown request channel, will gracefully wait for pending requests
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session . quit // Shutdown request channel, will gracefully wait for pending requests
)
// assign is a helper method fo try to assign a pending bit an actively
@ -409,15 +406,12 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
for {
select {
case <- shutdown :
// Graceful shutdown requested, wait until all pending requests are honoured
// Shutdown requested. No more retrievers can be allocated,
// but we still need to wait until all pending requests have returned.
shutdown = nil
if allocs == 0 {
return
}
shutdown = nil
case <- session . kill :
// Pending requests not honoured in time, hard terminate
return
case req := <- dist :
// New retrieval request arrived to be distributed to some fetcher process
@ -499,8 +493,9 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
assign ( result . Bit )
}
}
// If we're in the process of shutting down, terminate
if allocs == 0 && shutdown == nil {
// End the session when all pending deliveries have arrived.
if shutdown == nil && allocs == 0 {
return
}
}
@ -514,7 +509,6 @@ type MatcherSession struct {
closer sync . Once // Sync object to ensure we only ever close once
quit chan struct { } // Quit channel to request pipeline termination
kill chan struct { } // Term channel to signal non-graceful forced shutdown
ctx context . Context // Context used by the light client to abort filtering
err atomic . Value // Global error to track retrieval failures deep in the chain
@ -529,7 +523,6 @@ func (s *MatcherSession) Close() {
s . closer . Do ( func ( ) {
// Signal termination and wait for all goroutines to tear down
close ( s . quit )
time . AfterFunc ( time . Second , func ( ) { close ( s . kill ) } )
s . pend . Wait ( )
} )
}
@ -542,10 +535,10 @@ func (s *MatcherSession) Error() error {
return nil
}
// A llocateRetrieval assigns a bloom bit index to a client process that can either
// a llocateRetrieval assigns a bloom bit index to a client process that can either
// immediately request and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
func ( s * MatcherSession ) A llocateRetrieval( ) ( uint , bool ) {
func ( s * MatcherSession ) a llocateRetrieval( ) ( uint , bool ) {
fetcher := make ( chan uint )
select {
@ -557,9 +550,9 @@ func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
}
}
// P endingSections returns the number of pending section retrievals belonging to
// p endingSections returns the number of pending section retrievals belonging to
// the given bloom bit index.
func ( s * MatcherSession ) P endingSections( bit uint ) int {
func ( s * MatcherSession ) p endingSections( bit uint ) int {
fetcher := make ( chan uint )
select {
@ -571,9 +564,9 @@ func (s *MatcherSession) PendingSections(bit uint) int {
}
}
// A llocateSections assigns all or part of an already allocated bit-task queue
// a llocateSections assigns all or part of an already allocated bit-task queue
// to the requesting process.
func ( s * MatcherSession ) A llocateSections( bit uint , count int ) [ ] uint64 {
func ( s * MatcherSession ) a llocateSections( bit uint , count int ) [ ] uint64 {
fetcher := make ( chan * Retrieval )
select {
@ -589,14 +582,10 @@ func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
}
}
// D eliverSections delivers a batch of section bit-vectors for a specific bloom
// d eliverSections delivers a batch of section bit-vectors for a specific bloom
// bit index to be injected into the processing pipeline.
func ( s * MatcherSession ) DeliverSections ( bit uint , sections [ ] uint64 , bitsets [ ] [ ] byte ) {
select {
case <- s . kill :
return
case s . matcher . deliveries <- & Retrieval { Bit : bit , Sections : sections , Bitsets : bitsets } :
}
func ( s * MatcherSession ) deliverSections ( bit uint , sections [ ] uint64 , bitsets [ ] [ ] byte ) {
s . matcher . deliveries <- & Retrieval { Bit : bit , Sections : sections , Bitsets : bitsets }
}
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
@ -608,17 +597,17 @@ func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets []
func ( s * MatcherSession ) Multiplex ( batch int , wait time . Duration , mux chan chan * Retrieval ) {
for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit , ok := s . A llocateRetrieval( )
bit , ok := s . a llocateRetrieval( )
if ! ok {
return
}
// Bit allocated, throttle a bit if we're below our batch limit
if s . P endingSections( bit ) < batch {
if s . p endingSections( bit ) < batch {
select {
case <- s . quit :
// Session terminating, we can't meaningfully service, abort
s . A llocateSections( bit , 0 )
s . D eliverSections( bit , [ ] uint64 { } , [ ] [ ] byte { } )
s . a llocateSections( bit , 0 )
s . d eliverSections( bit , [ ] uint64 { } , [ ] [ ] byte { } )
return
case <- time . After ( wait ) :
@ -626,13 +615,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
}
// Allocate as much as we can handle and request servicing
sections := s . A llocateSections( bit , batch )
sections := s . a llocateSections( bit , batch )
request := make ( chan * Retrieval )
select {
case <- s . quit :
// Session terminating, we can't meaningfully service, abort
s . D eliverSections( bit , sections , make ( [ ] [ ] byte , len ( sections ) ) )
s . d eliverSections( bit , sections , make ( [ ] [ ] byte , len ( sections ) ) )
return
case mux <- request :
@ -644,7 +633,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
s . err . Store ( result . Error )
s . Close ( )
}
s . D eliverSections( result . Bit , result . Sections , result . Bitsets )
s . d eliverSections( result . Bit , result . Sections , result . Bitsets )
}
}
}