fix reactor channel blocking

poc8
zelig 10 years ago
parent 5c03adbded
commit dc11b5c55e
  1. 53
      ethreact/reactor.go

@ -7,6 +7,10 @@ import (
var logger = ethlog.NewLogger("REACTOR") var logger = ethlog.NewLogger("REACTOR")
const (
eventBufferSize int = 10
)
type EventHandler struct { type EventHandler struct {
lock sync.RWMutex lock sync.RWMutex
name string name string
@ -21,7 +25,7 @@ func (e *EventHandler) Post(event Event) {
// if we want to preserve order pushing to subscibed channels // if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous // dispatching should be syncrounous
// this means if subscribed event channel is blocked (closed or has fixed capacity) // this means if subscribed event channel is blocked
// the reactor dispatch will be blocked, so we need to mitigate by skipping // the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers // rogue blocking subscribers
for i, ch := range e.chans { for i, ch := range e.chans {
@ -63,22 +67,20 @@ type Event struct {
// The reactor basic engine. Acts as bridge // The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters // between the events and the subscribers/posters
type ReactorEngine struct { type ReactorEngine struct {
lock sync.RWMutex lock sync.RWMutex
eventChannel chan Event eventChannel chan Event
eventHandlers map[string]*EventHandler eventHandlers map[string]*EventHandler
quit chan bool quit chan chan error
shutdownChannel chan bool running bool
running bool drained chan bool
drained chan bool
} }
func New() *ReactorEngine { func New() *ReactorEngine {
return &ReactorEngine{ return &ReactorEngine{
eventHandlers: make(map[string]*EventHandler), eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event), eventChannel: make(chan Event, eventBufferSize),
quit: make(chan bool, 1), quit: make(chan chan error, 1),
drained: make(chan bool, 1), drained: make(chan bool, 1),
shutdownChannel: make(chan bool, 1),
} }
} }
@ -87,24 +89,22 @@ func (reactor *ReactorEngine) Start() {
defer reactor.lock.Unlock() defer reactor.lock.Unlock()
if !reactor.running { if !reactor.running {
go func() { go func() {
out:
for { for {
select { select {
case <-reactor.quit: case status := <-reactor.quit:
break out reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
status <- nil
return
case event := <-reactor.eventChannel: case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events // needs to be called syncronously to keep order of events
reactor.dispatch(event) reactor.dispatch(event)
// case reactor.drained <- true:
default: default:
reactor.drained <- true // blocking till message is coming in reactor.drained <- true // blocking till message is coming in
} }
} }
reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
close(reactor.shutdownChannel)
}() }()
reactor.running = true reactor.running = true
logger.Infoln("started") logger.Infoln("started")
@ -112,15 +112,15 @@ func (reactor *ReactorEngine) Start() {
} }
func (reactor *ReactorEngine) Stop() { func (reactor *ReactorEngine) Stop() {
reactor.lock.RLock()
if reactor.running { if reactor.running {
reactor.quit <- true status := make(chan error)
reactor.quit <- status
select { select {
case <-reactor.drained: case <-reactor.drained:
default:
} }
<-status
} }
reactor.lock.RUnlock()
<-reactor.shutdownChannel
} }
func (reactor *ReactorEngine) Flush() { func (reactor *ReactorEngine) Flush() {
@ -165,6 +165,7 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.eventChannel <- Event{Resource: resource, Name: event} reactor.eventChannel <- Event{Resource: resource, Name: event}
select { select {
case <-reactor.drained: case <-reactor.drained:
default:
} }
} }
} }

Loading…
Cancel
Save