forked from mirror/go-ethereum
parent
20cdb73862
commit
28570ef109
@ -1,83 +0,0 @@ |
|||||||
package eventer |
|
||||||
|
|
||||||
import "sync" |
|
||||||
|
|
||||||
// Basic receiver interface.
|
|
||||||
type Receiver interface { |
|
||||||
Send(Event) |
|
||||||
} |
|
||||||
|
|
||||||
// Receiver as channel
|
|
||||||
type Channel chan Event |
|
||||||
|
|
||||||
func (self Channel) Send(ev Event) { |
|
||||||
self <- ev |
|
||||||
} |
|
||||||
|
|
||||||
// Receiver as function
|
|
||||||
type Function func(ev Event) |
|
||||||
|
|
||||||
func (self Function) Send(ev Event) { |
|
||||||
self(ev) |
|
||||||
} |
|
||||||
|
|
||||||
type Event struct { |
|
||||||
Type string |
|
||||||
Data interface{} |
|
||||||
} |
|
||||||
|
|
||||||
type Channels map[string][]Receiver |
|
||||||
|
|
||||||
type EventMachine struct { |
|
||||||
mu sync.RWMutex |
|
||||||
channels Channels |
|
||||||
} |
|
||||||
|
|
||||||
func New() *EventMachine { |
|
||||||
return &EventMachine{channels: make(Channels)} |
|
||||||
} |
|
||||||
|
|
||||||
func (self *EventMachine) add(typ string, r Receiver) { |
|
||||||
self.mu.Lock() |
|
||||||
self.channels[typ] = append(self.channels[typ], r) |
|
||||||
self.mu.Unlock() |
|
||||||
} |
|
||||||
|
|
||||||
// Generalised methods for the known receiver types
|
|
||||||
// * Channel
|
|
||||||
// * Function
|
|
||||||
func (self *EventMachine) On(typ string, r interface{}) { |
|
||||||
if eventFunc, ok := r.(func(Event)); ok { |
|
||||||
self.RegisterFunc(typ, eventFunc) |
|
||||||
} else if eventChan, ok := r.(Channel); ok { |
|
||||||
self.RegisterChannel(typ, eventChan) |
|
||||||
} else { |
|
||||||
panic("Invalid type for EventMachine::On") |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (self *EventMachine) RegisterChannel(typ string, c Channel) { |
|
||||||
self.add(typ, c) |
|
||||||
} |
|
||||||
|
|
||||||
func (self *EventMachine) RegisterFunc(typ string, f Function) { |
|
||||||
self.add(typ, f) |
|
||||||
} |
|
||||||
|
|
||||||
func (self *EventMachine) Register(typ string) Channel { |
|
||||||
c := make(Channel, 1) |
|
||||||
self.add(typ, c) |
|
||||||
return c |
|
||||||
} |
|
||||||
|
|
||||||
func (self *EventMachine) Post(typ string, data interface{}) { |
|
||||||
self.mu.RLock() |
|
||||||
if self.channels[typ] != nil { |
|
||||||
ev := Event{typ, data} |
|
||||||
for _, receiver := range self.channels[typ] { |
|
||||||
// Blocking is OK. These are internals and need to be handled
|
|
||||||
receiver.Send(ev) |
|
||||||
} |
|
||||||
} |
|
||||||
self.mu.RUnlock() |
|
||||||
} |
|
@ -1,113 +0,0 @@ |
|||||||
package eventer |
|
||||||
|
|
||||||
import ( |
|
||||||
"math/rand" |
|
||||||
"testing" |
|
||||||
"time" |
|
||||||
) |
|
||||||
|
|
||||||
func TestChannel(t *testing.T) { |
|
||||||
eventer := New() |
|
||||||
|
|
||||||
c := make(Channel, 1) |
|
||||||
eventer.RegisterChannel("test", c) |
|
||||||
eventer.Post("test", "hello world") |
|
||||||
|
|
||||||
res := <-c |
|
||||||
|
|
||||||
if res.Data.(string) != "hello world" { |
|
||||||
t.Error("Expected event with data 'hello world'. Got", res.Data) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestFunction(t *testing.T) { |
|
||||||
eventer := New() |
|
||||||
|
|
||||||
var data string |
|
||||||
eventer.RegisterFunc("test", func(ev Event) { |
|
||||||
data = ev.Data.(string) |
|
||||||
}) |
|
||||||
eventer.Post("test", "hello world") |
|
||||||
|
|
||||||
if data != "hello world" { |
|
||||||
t.Error("Expected event with data 'hello world'. Got", data) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestRegister(t *testing.T) { |
|
||||||
eventer := New() |
|
||||||
|
|
||||||
c := eventer.Register("test") |
|
||||||
eventer.Post("test", "hello world") |
|
||||||
|
|
||||||
res := <-c |
|
||||||
|
|
||||||
if res.Data.(string) != "hello world" { |
|
||||||
t.Error("Expected event with data 'hello world'. Got", res.Data) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestOn(t *testing.T) { |
|
||||||
eventer := New() |
|
||||||
|
|
||||||
c := make(Channel, 1) |
|
||||||
eventer.On("test", c) |
|
||||||
|
|
||||||
var data string |
|
||||||
eventer.On("test", func(ev Event) { |
|
||||||
data = ev.Data.(string) |
|
||||||
}) |
|
||||||
eventer.Post("test", "hello world") |
|
||||||
|
|
||||||
res := <-c |
|
||||||
if res.Data.(string) != "hello world" { |
|
||||||
t.Error("Expected channel event with data 'hello world'. Got", res.Data) |
|
||||||
} |
|
||||||
|
|
||||||
if data != "hello world" { |
|
||||||
t.Error("Expected function event with data 'hello world'. Got", data) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestConcurrentUsage(t *testing.T) { |
|
||||||
rand.Seed(time.Now().Unix()) |
|
||||||
eventer := New() |
|
||||||
stop := make(chan struct{}) |
|
||||||
recv := make(chan int) |
|
||||||
poster := func() { |
|
||||||
for { |
|
||||||
select { |
|
||||||
case <-stop: |
|
||||||
return |
|
||||||
default: |
|
||||||
eventer.Post("test", "hi") |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
listener := func(i int) { |
|
||||||
time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) |
|
||||||
c := eventer.Register("test") |
|
||||||
// wait for the first event
|
|
||||||
<-c |
|
||||||
recv <- i |
|
||||||
// keep receiving to prevent deadlock
|
|
||||||
for { |
|
||||||
select { |
|
||||||
case <-stop: |
|
||||||
return |
|
||||||
case <-c: |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
nlisteners := 200 |
|
||||||
go poster() |
|
||||||
for i := 0; i < nlisteners; i++ { |
|
||||||
go listener(i) |
|
||||||
} |
|
||||||
// wait until everyone has been served
|
|
||||||
for i := 0; i < nlisteners; i++ { |
|
||||||
<-recv |
|
||||||
} |
|
||||||
close(stop) |
|
||||||
} |
|
Loading…
Reference in new issue