core/bloombits: use atomic type (#26993)

pull/27002/head
s7v7nislands 2 years ago committed by GitHub
parent 881fed032c
commit 79532a25b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      core/bloombits/matcher.go
  2. 12
      core/bloombits/matcher_test.go
  3. 6
      core/bloombits/scheduler_test.go

@ -83,7 +83,7 @@ type Matcher struct {
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
running uint32 // Atomic flag whether a session is live or not running atomic.Bool // Atomic flag whether a session is live or not
} }
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
@ -146,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) {
// channel is closed. // channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions // Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 { if m.running.Swap(true) {
return nil, errors.New("matcher already running") return nil, errors.New("matcher already running")
} }
defer atomic.StoreUint32(&m.running, 0) defer m.running.Store(false)
// Initiate a new matching round // Initiate a new matching round
session := &MatcherSession{ session := &MatcherSession{

@ -160,7 +160,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
} }
} }
// Track the number of retrieval requests made // Track the number of retrieval requests made
var requested uint32 var requested atomic.Uint32
// Start the matching session for the filter and the retriever goroutines // Start the matching session for the filter and the retriever goroutines
quit := make(chan struct{}) quit := make(chan struct{})
@ -208,15 +208,15 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
session.Close() session.Close()
close(quit) close(quit)
if retrievals != 0 && requested != retrievals { if retrievals != 0 && requested.Load() != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals) t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
} }
return requested return requested.Load()
} }
// startRetrievers starts a batch of goroutines listening for section requests // startRetrievers starts a batch of goroutines listening for section requests
// and serving them. // and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) { func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
requests := make(chan chan *Retrieval) requests := make(chan chan *Retrieval)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -238,7 +238,7 @@ func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *ui
for i, section := range task.Sections { for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries if rand.Int()%4 != 0 { // Handle occasional missing deliveries
task.Bitsets[i] = generateBitset(task.Bit, section) task.Bitsets[i] = generateBitset(task.Bit, section)
atomic.AddUint32(retrievals, 1) retrievals.Add(1)
} }
} }
request <- task request <- task

@ -45,13 +45,13 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
fetch := make(chan *request, 16) fetch := make(chan *request, 16)
defer close(fetch) defer close(fetch)
var delivered uint32 var delivered atomic.Uint32
for i := 0; i < fetchers; i++ { for i := 0; i < fetchers; i++ {
go func() { go func() {
defer fetchPend.Done() defer fetchPend.Done()
for req := range fetch { for req := range fetch {
atomic.AddUint32(&delivered, 1) delivered.Add(1)
f.deliver([]uint64{ f.deliver([]uint64{
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds) req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
@ -97,7 +97,7 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
} }
pend.Wait() pend.Wait()
if have := atomic.LoadUint32(&delivered); int(have) != requests { if have := delivered.Load(); int(have) != requests {
t.Errorf("request count mismatch: have %v, want %v", have, requests) t.Errorf("request count mismatch: have %v, want %v", have, requests)
} }
} }

Loading…
Cancel
Save