forked from mirror/go-ethereum
commit
3d5db7288f
@ -1,40 +0,0 @@ |
||||
# ethreact |
||||
|
||||
ethereum event reactor. Component of the ethereum stack. |
||||
various events like state change on an account or new block found are broadcast to subscribers. |
||||
Broadcasting to subscribers is running on its own routine and globally order preserving. |
||||
|
||||
## Clients |
||||
### subscribe |
||||
|
||||
eventChannel := make(chan ethreact.Event) |
||||
reactor.Subscribe(event, eventChannel) |
||||
|
||||
The same channel can be subscribed to multiple events but only once for each event. In order to allow order of events to be preserved, broadcast of events is synchronous within the main broadcast loop. Therefore any blocking subscriber channels will be skipped, i.e. missing broadcasting events while they are blocked. |
||||
|
||||
### unsubscribe |
||||
|
||||
reactor.Unsubscribe(event, eventChannel) |
||||
|
||||
### Processing events |
||||
|
||||
event.Resource is of type interface{}. The actual type of event.Resource depends on event.Name and may need to be cast for processing. |
||||
|
||||
var event ethreact.Event |
||||
for { |
||||
select { |
||||
case event = <-eventChannel: |
||||
processTransaction(event.Resource.(Transaction)) |
||||
} |
||||
} |
||||
|
||||
## Broadcast |
||||
|
||||
reactor := ethreact.New() |
||||
reactor.Start() |
||||
reactor.Post(name, resource) |
||||
reactor.Flush() // wait till all broadcast messages are dispatched |
||||
reactor.Stop() // stop the main broadcast loop immediately (even if there are unbroadcast events left) |
||||
|
||||
|
||||
|
@ -1,63 +0,0 @@ |
||||
package ethreact |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
) |
||||
|
||||
func TestReactorAdd(t *testing.T) { |
||||
reactor := New() |
||||
ch := make(chan Event) |
||||
reactor.Subscribe("test", ch) |
||||
if reactor.eventHandlers["test"] == nil { |
||||
t.Error("Expected new eventHandler to be created") |
||||
} |
||||
reactor.Unsubscribe("test", ch) |
||||
if reactor.eventHandlers["test"] != nil { |
||||
t.Error("Expected eventHandler to be removed") |
||||
} |
||||
} |
||||
|
||||
func TestReactorEvent(t *testing.T) { |
||||
var name string |
||||
reactor := New() |
||||
// Buffer the channel, so it doesn't block for this test
|
||||
cap := 20 |
||||
ch := make(chan Event, cap) |
||||
reactor.Subscribe("even", ch) |
||||
reactor.Subscribe("odd", ch) |
||||
reactor.Post("even", "disappears") // should not broadcast if engine not started
|
||||
reactor.Start() |
||||
for i := 0; i < cap; i++ { |
||||
if i%2 == 0 { |
||||
name = "even" |
||||
} else { |
||||
name = "odd" |
||||
} |
||||
reactor.Post(name, i) |
||||
} |
||||
reactor.Post("test", cap) // this should not block
|
||||
i := 0 |
||||
reactor.Flush() |
||||
close(ch) |
||||
for event := range ch { |
||||
fmt.Printf("%d: %v", i, event) |
||||
if i%2 == 0 { |
||||
name = "even" |
||||
} else { |
||||
name = "odd" |
||||
} |
||||
if val, ok := event.Resource.(int); ok { |
||||
if i != val || event.Name != name { |
||||
t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) |
||||
} |
||||
} else { |
||||
t.Error("Unable to cast") |
||||
} |
||||
i++ |
||||
} |
||||
if i != cap { |
||||
t.Error("excpected exactly %d events, got ", i) |
||||
} |
||||
reactor.Stop() |
||||
} |
@ -0,0 +1,87 @@ |
||||
package ethutil |
||||
|
||||
import ( |
||||
"sync" |
||||
) |
||||
|
||||
type ReactorEvent struct { |
||||
mut sync.Mutex |
||||
event string |
||||
chans []chan React |
||||
} |
||||
|
||||
// Post the specified reactor resource on the channels
|
||||
// currently subscribed
|
||||
func (e *ReactorEvent) Post(react React) { |
||||
e.mut.Lock() |
||||
defer e.mut.Unlock() |
||||
|
||||
for _, ch := range e.chans { |
||||
go func(ch chan React) { |
||||
ch <- react |
||||
}(ch) |
||||
} |
||||
} |
||||
|
||||
// Add a subscriber to this event
|
||||
func (e *ReactorEvent) Add(ch chan React) { |
||||
e.mut.Lock() |
||||
defer e.mut.Unlock() |
||||
|
||||
e.chans = append(e.chans, ch) |
||||
} |
||||
|
||||
// Remove a subscriber
|
||||
func (e *ReactorEvent) Remove(ch chan React) { |
||||
e.mut.Lock() |
||||
defer e.mut.Unlock() |
||||
|
||||
for i, c := range e.chans { |
||||
if c == ch { |
||||
e.chans = append(e.chans[:i], e.chans[i+1:]...) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Basic reactor resource
|
||||
type React struct { |
||||
Resource interface{} |
||||
Event string |
||||
} |
||||
|
||||
// The reactor basic engine. Acts as bridge
|
||||
// between the events and the subscribers/posters
|
||||
type ReactorEngine struct { |
||||
patterns map[string]*ReactorEvent |
||||
} |
||||
|
||||
func NewReactorEngine() *ReactorEngine { |
||||
return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} |
||||
} |
||||
|
||||
// Subscribe a channel to the specified event
|
||||
func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { |
||||
ev := reactor.patterns[event] |
||||
// Create a new event if one isn't available
|
||||
if ev == nil { |
||||
ev = &ReactorEvent{event: event} |
||||
reactor.patterns[event] = ev |
||||
} |
||||
|
||||
// Add the channel to reactor event handler
|
||||
ev.Add(ch) |
||||
} |
||||
|
||||
func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { |
||||
ev := reactor.patterns[event] |
||||
if ev != nil { |
||||
ev.Remove(ch) |
||||
} |
||||
} |
||||
|
||||
func (reactor *ReactorEngine) Post(event string, resource interface{}) { |
||||
ev := reactor.patterns[event] |
||||
if ev != nil { |
||||
ev.Post(React{Resource: resource, Event: event}) |
||||
} |
||||
} |
@ -0,0 +1,30 @@ |
||||
package ethutil |
||||
|
||||
import "testing" |
||||
|
||||
func TestReactorAdd(t *testing.T) { |
||||
engine := NewReactorEngine() |
||||
ch := make(chan React) |
||||
engine.Subscribe("test", ch) |
||||
if len(engine.patterns) != 1 { |
||||
t.Error("Expected patterns to be 1, got", len(engine.patterns)) |
||||
} |
||||
} |
||||
|
||||
func TestReactorEvent(t *testing.T) { |
||||
engine := NewReactorEngine() |
||||
|
||||
// Buffer 1, so it doesn't block for this test
|
||||
ch := make(chan React, 1) |
||||
engine.Subscribe("test", ch) |
||||
engine.Post("test", "hello") |
||||
|
||||
value := <-ch |
||||
if val, ok := value.Resource.(string); ok { |
||||
if val != "hello" { |
||||
t.Error("Expected Resource to be 'hello', got", val) |
||||
} |
||||
} else { |
||||
t.Error("Unable to cast") |
||||
} |
||||
} |
Loading…
Reference in new issue