diff --git a/eth/filters/api.go b/eth/filters/api.go index aa4c305a6f..148daa649b 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -206,12 +206,12 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo filter.SetEndBlock(latest) filter.SetAddresses(addresses) filter.SetTopics(topics) - filter.LogsCallback = func(logs vm.Logs) { + filter.LogCallback = func(log *vm.Log, removed bool) { s.logMu.Lock() defer s.logMu.Unlock() if queue := s.logQueue[id]; queue != nil { - queue.add(logs...) + queue.add(vmlog{log, removed}) } } @@ -365,14 +365,14 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { } // GetLogs returns the logs matching the given argument. -func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) vm.Logs { +func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog { filter := New(s.chainDb) filter.SetBeginBlock(args.FromBlock.Int64()) filter.SetEndBlock(args.ToBlock.Int64()) filter.SetAddresses(args.Addresses) filter.SetTopics(args.Topics) - return returnLogs(filter.Find()) + return toRPCLogs(filter.Find(), false) } // UninstallFilter removes the filter with the given filter id. @@ -447,7 +447,7 @@ func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash { } // logFilterChanged returns a collection of logs for the log filter with the given id. -func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs { +func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog { s.logMu.Lock() defer s.logMu.Unlock() @@ -458,17 +458,17 @@ func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs { } // GetFilterLogs returns the logs for the filter with the given id. -func (s *PublicFilterAPI) GetFilterLogs(filterId string) vm.Logs { +func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { id, ok := s.filterMapping[filterId] if !ok { - return returnLogs(nil) + return toRPCLogs(nil, false) } if filter := s.filterManager.Get(id); filter != nil { - return returnLogs(filter.Find()) + return toRPCLogs(filter.Find(), false) } - return returnLogs(nil) + return toRPCLogs(nil, false) } // GetFilterChanges returns the logs for the filter with the given id since last time is was called. @@ -488,28 +488,33 @@ func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { case transactionFilterTy: return returnHashes(s.transactionFilterChanged(id)) case logFilterTy: - return returnLogs(s.logFilterChanged(id)) + return s.logFilterChanged(id) } return []interface{}{} } +type vmlog struct { + *vm.Log + Removed bool `json:"removed"` +} + type logQueue struct { mu sync.Mutex - logs vm.Logs + logs []vmlog timeout time.Time id int } -func (l *logQueue) add(logs ...*vm.Log) { +func (l *logQueue) add(logs ...vmlog) { l.mu.Lock() defer l.mu.Unlock() l.logs = append(l.logs, logs...) } -func (l *logQueue) get() vm.Logs { +func (l *logQueue) get() []vmlog { l.mu.Lock() defer l.mu.Unlock() @@ -556,13 +561,16 @@ func newFilterId() (string, error) { return "0x" + hex.EncodeToString(subid[:]), nil } -// returnLogs is a helper that will return an empty logs array case the given logs is nil, otherwise is will return the -// given logs. The RPC interfaces defines that always an array is returned. -func returnLogs(logs vm.Logs) vm.Logs { - if logs == nil { - return vm.Logs{} +// toRPCLogs is a helper that will convert a vm.Logs array to an structure which +// can hold additional information about the logs such as whether it was deleted. +// Additionally when nil is given it will by default instead create an empty slice +// instead. This is required by the RPC specification. +func toRPCLogs(logs vm.Logs, removed bool) []vmlog { + convertedLogs := make([]vmlog, len(logs)) + for i, log := range logs { + convertedLogs[i] = vmlog{Log: log, Removed: removed} } - return logs + return convertedLogs } // returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will diff --git a/eth/filters/filter.go b/eth/filters/filter.go index ff192cdf6b..2c92d20b14 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -39,7 +39,7 @@ type Filter struct { BlockCallback func(*types.Block, vm.Logs) TransactionCallback func(*types.Transaction) - LogsCallback func(vm.Logs) + LogCallback func(*vm.Log, bool) } // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index df3ce90c6e..04e58a08c5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -46,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem { } fs.sub = mux.Subscribe( //core.PendingBlockEvent{}, + core.RemovedLogEvent{}, core.ChainEvent{}, core.TxPreEvent{}, vm.Logs(nil), @@ -96,7 +97,7 @@ func (fs *FilterSystem) filterLoop() { case core.ChainEvent: fs.filterMu.RLock() for id, filter := range fs.filters { - if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + if filter.BlockCallback != nil && !fs.created[id].After(event.Time) { filter.BlockCallback(ev.Block, ev.Logs) } } @@ -105,7 +106,7 @@ func (fs *FilterSystem) filterLoop() { case core.TxPreEvent: fs.filterMu.RLock() for id, filter := range fs.filters { - if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) { filter.TransactionCallback(ev.Tx) } } @@ -114,10 +115,20 @@ func (fs *FilterSystem) filterLoop() { case vm.Logs: fs.filterMu.RLock() for id, filter := range fs.filters { - if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { - msgs := filter.FilterLogs(ev) - if len(msgs) > 0 { - filter.LogsCallback(msgs) + if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, log := range filter.FilterLogs(ev) { + filter.LogCallback(log, false) + } + } + } + fs.filterMu.RUnlock() + + case core.RemovedLogEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, removedLog := range ev.Logs { + filter.LogCallback(removedLog, true) } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go new file mode 100644 index 0000000000..7ddeb02bc9 --- /dev/null +++ b/eth/filters/filter_system_test.go @@ -0,0 +1,87 @@ +package filters + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/event" +) + +func TestCallbacks(t *testing.T) { + var ( + mux event.TypeMux + fs = NewFilterSystem(&mux) + blockDone = make(chan struct{}) + txDone = make(chan struct{}) + logDone = make(chan struct{}) + removedLogDone = make(chan struct{}) + ) + + blockFilter := &Filter{ + BlockCallback: func(*types.Block, vm.Logs) { + close(blockDone) + }, + } + txFilter := &Filter{ + TransactionCallback: func(*types.Transaction) { + close(txDone) + }, + } + logFilter := &Filter{ + LogCallback: func(l *vm.Log, oob bool) { + if !oob { + close(logDone) + } + }, + } + + removedLogFilter := &Filter{ + LogCallback: func(l *vm.Log, oob bool) { + if oob { + close(removedLogDone) + } + }, + } + + fs.Add(blockFilter) + fs.Add(txFilter) + fs.Add(logFilter) + fs.Add(removedLogFilter) + + mux.Post(core.ChainEvent{}) + mux.Post(core.TxPreEvent{}) + mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}}) + mux.Post(vm.Logs{&vm.Log{}}) + + const dura = 5 * time.Second + failTimer := time.NewTimer(dura) + select { + case <-blockDone: + case <-failTimer.C: + t.Error("block filter failed to trigger (timeout)") + } + + failTimer.Reset(dura) + select { + case <-txDone: + case <-failTimer.C: + t.Error("transaction filter failed to trigger (timeout)") + } + + failTimer.Reset(dura) + select { + case <-logDone: + case <-failTimer.C: + t.Error("log filter failed to trigger (timeout)") + } + + failTimer.Reset(dura) + select { + case <-removedLogDone: + case <-failTimer.C: + t.Error("removed log filter failed to trigger (timeout)") + } +}