diff --git a/ethpipe/js_pipe.go b/ethpipe/js_pipe.go index a2b1a45514..0d0928fc36 100644 --- a/ethpipe/js_pipe.go +++ b/ethpipe/js_pipe.go @@ -2,10 +2,13 @@ package ethpipe import ( "encoding/json" + "fmt" "sync/atomic" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" + "github.com/ethereum/eth-go/ethreact" + "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" ) @@ -74,7 +77,8 @@ func (self *JSPipe) NumberToHuman(balance string) string { } func (self *JSPipe) StorageAt(addr, storageAddr string) string { - return self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr)).Str() + storage := self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr)) + return storage.BigInt().String() } func (self *JSPipe) TxCountAt(address string) int { @@ -186,10 +190,45 @@ func (self *JSPipe) CompileMutan(code string) string { return ethutil.Bytes2Hex(data) } +func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter { + return NewJSFilterFromMap(object, self.Pipe.obj) + /*} else if str, ok := object.(string); ok { + println("str") + return NewJSFilterFromString(str, self.Pipe.obj) + */ +} + func (self *JSPipe) Messages(object map[string]interface{}) string { - filter := ethchain.NewFilterFromMap(object, self.obj) + filter := self.Watch(object) + + defer filter.Uninstall() + + return filter.Messages() + +} + +type JSFilter struct { + eth ethchain.EthManager + *ethchain.Filter + quit chan bool + + BlockCallback func(*ethchain.Block) + MessageCallback func(ethstate.Messages) +} + +func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter { + filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil} + + go filter.mainLoop() + + return filter +} - messages := filter.Find() +func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter { + return nil +} + +func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string { var msgs []JSMessage for _, m := range messages { msgs = append(msgs, NewJSMessage(m)) @@ -202,3 +241,44 @@ func (self *JSPipe) Messages(object map[string]interface{}) string { return string(b) } + +func (self *JSFilter) Messages() string { + return self.MessagesToJson(self.Find()) +} + +func (self *JSFilter) mainLoop() { + blockChan := make(chan ethreact.Event, 1) + messageChan := make(chan ethreact.Event, 1) + // Subscribe to events + reactor := self.eth.Reactor() + reactor.Subscribe("newBlock", blockChan) + reactor.Subscribe("messages", messageChan) +out: + for { + select { + case <-self.quit: + break out + case block := <-blockChan: + if block, ok := block.Resource.(*ethchain.Block); ok { + if self.BlockCallback != nil { + self.BlockCallback(block) + } + } + case msg := <-messageChan: + if messages, ok := msg.Resource.(ethstate.Messages); ok { + if self.MessageCallback != nil { + msgs := self.FilterMessages(messages) + self.MessageCallback(msgs) + } + } + } + } +} + +func (self *JSFilter) Changed(object interface{}) { + fmt.Printf("%T\n", object) +} + +func (self *JSFilter) Uninstall() { + self.quit <- true +}