pending / chain event

pull/296/head
obscuren 10 years ago
parent bcacaaa4f4
commit c64852dbcc
  1. 10
      cmd/mist/assets/html/home.html
  2. 8
      core/block_processor.go
  3. 5
      core/filter.go
  4. 11
      event/filter/old_filter.go
  5. 16
      rpc/message.go
  6. 26
      rpc/packages.go

@ -60,7 +60,7 @@
var web3 = require('web3'); var web3 = require('web3');
var eth = web3.eth; var eth = web3.eth;
web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8080')); web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8545'));
document.querySelector("#number").innerHTML = eth.number; document.querySelector("#number").innerHTML = eth.number;
document.querySelector("#coinbase").innerHTML = eth.coinbase document.querySelector("#coinbase").innerHTML = eth.coinbase
@ -69,6 +69,14 @@
document.querySelector("#gas_price").innerHTML = eth.gasPrice; document.querySelector("#gas_price").innerHTML = eth.gasPrice;
document.querySelector("#mining").innerHTML = eth.mining; document.querySelector("#mining").innerHTML = eth.mining;
document.querySelector("#listening").innerHTML = eth.listening; document.querySelector("#listening").innerHTML = eth.listening;
eth.watch('pending').changed(function() {
console.log("pending changed");
});
eth.watch('chain').changed(function() {
console.log("chain changed");
});
</script> </script>
</html> </html>

@ -19,6 +19,10 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
type PendingBlockEvent struct {
Block *types.Block
}
var statelogger = logger.NewLogger("BLOCK") var statelogger = logger.NewLogger("BLOCK")
type EthManager interface { type EthManager interface {
@ -154,6 +158,10 @@ done:
block.Reward = cumulativeSum block.Reward = cumulativeSum
block.Header().GasUsed = totalUsedGas block.Header().GasUsed = totalUsedGas
if transientProcess {
go self.eventMux.Post(PendingBlockEvent{block})
}
return receipts, handled, unhandled, erroneous, err return receipts, handled, unhandled, erroneous, err
} }

@ -33,8 +33,9 @@ type Filter struct {
max int max int
topics [][]byte topics [][]byte
BlockCallback func(*types.Block) BlockCallback func(*types.Block)
LogsCallback func(state.Logs) PendingCallback func(*types.Block)
LogsCallback func(state.Logs)
} }
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block

@ -59,7 +59,7 @@ func (self *FilterManager) GetFilter(id int) *core.Filter {
func (self *FilterManager) filterLoop() { func (self *FilterManager) filterLoop() {
// Subscribe to events // Subscribe to events
events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Logs(nil)) events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil))
out: out:
for { for {
@ -77,6 +77,15 @@ out:
} }
self.filterMu.RUnlock() self.filterMu.RUnlock()
case core.PendingBlockEvent:
self.filterMu.RLock()
for _, filter := range self.filters {
if filter.PendingCallback != nil {
filter.PendingCallback(event.Block)
}
}
self.filterMu.RUnlock()
case state.Logs: case state.Logs:
self.filterMu.RLock() self.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range self.filters {

@ -205,7 +205,6 @@ func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
if len(req.Params) < 1 { if len(req.Params) < 1 {
return nil, NewErrorResponse(ErrorArguments) return nil, NewErrorResponse(ErrorArguments)
} }
fmt.Println("FILTER PARAMS", string(req.Params[0]))
args := new(FilterOptions) args := new(FilterOptions)
r := bytes.NewReader(req.Params[0]) r := bytes.NewReader(req.Params[0])
@ -217,6 +216,21 @@ func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
return args, nil return args, nil
} }
func (req *RpcRequest) ToFilterStringArgs() (string, error) {
if len(req.Params) < 1 {
return "", NewErrorResponse(ErrorArguments)
}
var args string
err := json.Unmarshal(req.Params[0], &args)
if err != nil {
return "", NewErrorResponse(ErrorDecodeArgs)
}
rpclogger.DebugDetailf("%T %v", args, args)
return args, nil
}
func (req *RpcRequest) ToFilterChangedArgs() (int, error) { func (req *RpcRequest) ToFilterChangedArgs() (int, error) {
if len(req.Params) < 1 { if len(req.Params) < 1 {
return 0, NewErrorResponse(ErrorArguments) return 0, NewErrorResponse(ErrorArguments)

@ -32,6 +32,7 @@ import (
"sync" "sync"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethutil"
@ -88,6 +89,25 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
return nil return nil
} }
func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error {
var id int
filter := core.NewFilter(self.xeth.Backend())
callback := func(block *types.Block) {
self.logs[id] = append(self.logs[id], &state.StateLog{})
}
if args == "pending" {
filter.PendingCallback = callback
} else if args == "chain" {
filter.BlockCallback = callback
}
id = self.filterManager.InstallFilter(filter)
*reply = id
return nil
}
func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
self.logMut.RLock() self.logMut.RLock()
defer self.logMut.RUnlock() defer self.logMut.RUnlock()
@ -389,6 +409,12 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
return err return err
} }
return p.NewFilter(args, reply) return p.NewFilter(args, reply)
case "eth_newFilterString":
args, err := req.ToFilterStringArgs()
if err != nil {
return err
}
return p.NewFilterString(args, reply)
case "eth_changed": case "eth_changed":
args, err := req.ToFilterChangedArgs() args, err := req.ToFilterChangedArgs()
if err != nil { if err != nil {

Loading…
Cancel
Save