Official Go implementation of the Ethereum protocol
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-ethereum/eth/filters/api.go

652 lines
16 KiB

// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
var (
filterTickerTime = 5 * time.Minute
)
// byte will be inferred
const (
unknownFilterTy = iota
blockFilterTy
transactionFilterTy
logFilterTy
)
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
transactMu sync.Mutex
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
svc := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
filterManager: NewFilterSystem(mux),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
}
go svc.start()
return svc
}
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
close(s.quit)
}
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
done:
for {
select {
case <-timer.C:
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.logQueue, id)
}
}
s.logMu.Unlock()
s.blockMu.Lock()
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.blockQueue, id)
}
}
s.blockMu.Unlock()
s.transactionMu.Lock()
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.transactionQueue, id)
}
}
s.transactionMu.Unlock()
case <-s.quit:
break done
}
}
}
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.blockMu.Lock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
queue.add(block.Hash())
}
}
defer s.blockMu.Unlock()
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
queue.add(tx.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
if callback != nil {
callback(log, removed)
} else {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
}
}
}
return id, nil
}
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
var (
externalId string
subscription rpc.Subscription
err error
)
if externalId, err = newFilterId(); err != nil {
return nil, err
}
// uninstall filter when subscription is unsubscribed/cancelled
if subscription, err = notifier.NewSubscription(func(string) {
s.UninstallFilter(externalId)
}); err != nil {
return nil, err
}
notifySubscriber := func(log *vm.Log, removed bool) {
rpcLog := toRPCLogs(vm.Logs{log}, removed)
if err := subscription.Notify(rpcLog); err != nil {
subscription.Cancel()
}
}
// from and to block number are not used since subscriptions don't allow you to travel to "time"
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
} else {
id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
}
if err != nil {
subscription.Cancel()
return nil, err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return subscription, err
}
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
Addresses []common.Address
Topics [][]common.Hash
}
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics interface{} `json:"topics"`
}
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = rpc.LatestBlockNumber
} else {
args.FromBlock = *raw.From
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = rpc.LatestBlockNumber
} else {
args.ToBlock = *raw.ToBlock
}
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
strAddr = strAddr[2:]
}
if decAddr, err := hex.DecodeString(strAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
return fmt.Errorf("invalid address on index %d", i)
}
}
} else if singleAddr, ok := raw.Addresses.(string); ok {
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
singleAddr = singleAddr[2:]
}
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
errors.New("invalid address(es) given")
}
args.Addresses = addresses
}
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
}
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
}
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
}
return common.Hash{}, errors.New("invalid topic given")
}
// topics is an array consisting of strings or arrays of strings
if raw.Topics != nil {
topics, ok := raw.Topics.([]interface{})
if ok {
parsedTopics := make([][]common.Hash, len(topics))
for i, topic := range topics {
if topic == nil {
parsedTopics[i] = []common.Hash{common.StringToHash("")}
} else if strTopic, ok := topic.(string); ok {
if t, err := topicConverter(strTopic); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else if arrTopic, ok := topic.([]interface{}); ok {
parsedTopics[i] = make([]common.Hash, len(arrTopic))
for j := 0; j < len(parsedTopics[i]); i++ {
if arrTopic[j] == nil {
parsedTopics[i][j] = common.StringToHash("")
} else if str, ok := arrTopic[j].(string); ok {
if t, err := topicConverter(str); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else {
fmt.Errorf("topic[%d][%d] not a string", i, j)
}
}
} else {
return fmt.Errorf("topic[%d] invalid", i)
}
}
args.Topics = parsedTopics
}
}
return nil
}
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
}
if err != nil {
return "", err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
return toRPCLogs(filter.Find(), false)
}
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterMapMu.Lock()
defer s.filterMapMu.Unlock()
id, ok := s.filterMapping[filterId]
if !ok {
return false
}
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
if _, ok := s.logQueue[id]; ok {
s.logMu.Lock()
defer s.logMu.Unlock()
delete(s.logQueue, id)
return true
}
if _, ok := s.blockQueue[id]; ok {
s.blockMu.Lock()
defer s.blockMu.Unlock()
delete(s.blockQueue, id)
return true
}
if _, ok := s.transactionQueue[id]; ok {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
return true
}
return false
}
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
}
return unknownFilterTy
}
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
return s.blockQueue[id].get()
}
return nil
}
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
}
return nil
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock()
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
}
return nil
}
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
id, ok := s.filterMapping[filterId]
if !ok {
return toRPCLogs(nil, false)
}
if filter := s.filterManager.Get(id); filter != nil {
return toRPCLogs(filter.Find(), false)
}
return toRPCLogs(nil, false)
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.Unlock()
if !ok { // filter not found
return []interface{}{}
}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return s.logFilterChanged(id)
}
return []interface{}{}
}
type vmlog struct {
*vm.Log
Removed bool `json:"removed"`
}
type logQueue struct {
mu sync.Mutex
logs []vmlog
timeout time.Time
id int
}
func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() []vmlog {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.mu.Lock()
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// can hold additional information about the logs such as whether it was deleted.
// Additionally when nil is given it will by default instead create an empty slice
// instead. This is required by the RPC specification.
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
}
return convertedLogs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}