|
|
@ -203,27 +203,34 @@ func (m *FairMix) Close() { |
|
|
|
func (m *FairMix) Next() bool { |
|
|
|
func (m *FairMix) Next() bool { |
|
|
|
m.cur = nil |
|
|
|
m.cur = nil |
|
|
|
|
|
|
|
|
|
|
|
var timeout <-chan time.Time |
|
|
|
|
|
|
|
if m.timeout >= 0 { |
|
|
|
|
|
|
|
timer := time.NewTimer(m.timeout) |
|
|
|
|
|
|
|
timeout = timer.C |
|
|
|
|
|
|
|
defer timer.Stop() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for { |
|
|
|
for { |
|
|
|
source := m.pickSource() |
|
|
|
source := m.pickSource() |
|
|
|
if source == nil { |
|
|
|
if source == nil { |
|
|
|
return m.nextFromAny() |
|
|
|
return m.nextFromAny() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var timeout <-chan time.Time |
|
|
|
|
|
|
|
if source.timeout >= 0 { |
|
|
|
|
|
|
|
timer := time.NewTimer(source.timeout) |
|
|
|
|
|
|
|
timeout = timer.C |
|
|
|
|
|
|
|
defer timer.Stop() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
select { |
|
|
|
select { |
|
|
|
case n, ok := <-source.next: |
|
|
|
case n, ok := <-source.next: |
|
|
|
if ok { |
|
|
|
if ok { |
|
|
|
m.cur = n |
|
|
|
// Here, the timeout is reset to the configured value
|
|
|
|
|
|
|
|
// because the source delivered a node.
|
|
|
|
source.timeout = m.timeout |
|
|
|
source.timeout = m.timeout |
|
|
|
|
|
|
|
m.cur = n |
|
|
|
return true |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
// This source has ended.
|
|
|
|
// This source has ended.
|
|
|
|
m.deleteSource(source) |
|
|
|
m.deleteSource(source) |
|
|
|
case <-timeout: |
|
|
|
case <-timeout: |
|
|
|
|
|
|
|
// The selected source did not deliver a node within the timeout, so the
|
|
|
|
|
|
|
|
// timeout duration is halved for next time. This is supposed to improve
|
|
|
|
|
|
|
|
// latency with stuck sources.
|
|
|
|
source.timeout /= 2 |
|
|
|
source.timeout /= 2 |
|
|
|
return m.nextFromAny() |
|
|
|
return m.nextFromAny() |
|
|
|
} |
|
|
|
} |
|
|
|