diff --git a/core/blockchain.go b/core/blockchain.go index ad545cf697..6c555e9eec 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -483,13 +483,6 @@ func (bc *BlockChain) Stop() { glog.V(logger.Info).Infoln("Chain manager stopped") } -type queueEvent struct { - queue []interface{} - canonicalCount int - sideCount int - splitCount int -} - func (self *BlockChain) procFutureBlocks() { blocks := make([]*types.Block, self.futureBlocks.Len()) for i, hash := range self.futureBlocks.Keys() { @@ -573,10 +566,9 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { // faster than direct delivery and requires much less mutex // acquiring. var ( - queue = make([]interface{}, len(chain)) - queueEvent = queueEvent{queue: queue} - stats struct{ queued, processed, ignored int } - tstart = time.Now() + stats struct{ queued, processed, ignored int } + events = make([]interface{}, 0, len(chain)) + tstart = time.Now() nonceChecked = make([]bool, len(chain)) ) @@ -659,22 +651,21 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { if glog.V(logger.Debug) { glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } - queue[i] = ChainEvent{block, block.Hash(), logs} - queueEvent.canonicalCount++ + events = append(events, ChainEvent{block, block.Hash(), logs}) // This puts transactions in a extra db for rpc PutTransactions(self.chainDb, block, block.Transactions()) // store the receipts PutReceipts(self.chainDb, receipts) + case SideStatTy: if glog.V(logger.Detail) { glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } - queue[i] = ChainSideEvent{block, logs} - queueEvent.sideCount++ + events = append(events, ChainSideEvent{block, logs}) + case SplitStatTy: - queue[i] = ChainSplitEvent{block, logs} - queueEvent.splitCount++ + events = append(events, ChainSplitEvent{block, logs}) } stats.processed++ } @@ -684,8 +675,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { start, end := chain[0], chain[len(chain)-1] glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) } - - go self.eventMux.Post(queueEvent) + go self.postChainEvents(events) return 0, nil } @@ -774,32 +764,31 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } +// postChainEvents iterates over the events generated by a chain insertion and +// posts them into the event mux. +func (self *BlockChain) postChainEvents(events []interface{}) { + for _, event := range events { + if event, ok := event.(ChainEvent); ok { + // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long + // and in most cases isn't even necessary. + if self.currentBlock.Hash() == event.Hash { + self.currentGasLimit = CalcGasLimit(event.Block) + self.eventMux.Post(ChainHeadEvent{event.Block}) + } + } + // Fire the insertion events individually too + self.eventMux.Post(event) + } +} + func (self *BlockChain) update() { - events := self.eventMux.Subscribe(queueEvent{}) futureTimer := time.Tick(5 * time.Second) -out: for { select { - case ev := <-events.Chan(): - switch ev := ev.(type) { - case queueEvent: - for _, event := range ev.queue { - switch event := event.(type) { - case ChainEvent: - // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long - // and in most cases isn't even necessary. - if self.currentBlock.Hash() == event.Hash { - self.currentGasLimit = CalcGasLimit(event.Block) - self.eventMux.Post(ChainHeadEvent{event.Block}) - } - } - self.eventMux.Post(event) - } - } case <-futureTimer: self.procFutureBlocks() case <-self.quit: - break out + return } } } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 11d0cb490d..a4e6ce3e2f 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -93,7 +93,7 @@ func (pool *TxPool) eventLoop() { // we need to know the new state. The new state will help us determine // the nonces in the managed state for ev := range pool.events.Chan() { - switch ev := ev.(type) { + switch ev := ev.Data.(type) { case ChainHeadEvent: pool.mu.Lock() pool.resetState() diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4972dcd599..ae60935259 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,6 +20,7 @@ package filters import ( "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -35,6 +36,7 @@ type FilterSystem struct { filterMu sync.RWMutex filterId int filters map[int]*Filter + created map[int]time.Time quit chan struct{} } @@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ eventMux: mux, filters: make(map[int]*Filter), + created: make(map[int]time.Time), } go fs.filterLoop() return fs @@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { defer fs.filterMu.Unlock() id = fs.filterId fs.filters[id] = filter + fs.created[id] = time.Now() fs.filterId++ return id @@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - if _, ok := fs.filters[id]; ok { - delete(fs.filters, id) - } + + delete(fs.filters, id) + delete(fs.created, id) } // Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() + return fs.filters[id] } @@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter { // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { // Subscribe to events - events := fs.eventMux.Subscribe( + eventCh := fs.eventMux.Subscribe( //core.PendingBlockEvent{}, core.ChainEvent{}, core.TxPreEvent{}, - vm.Logs(nil)) + vm.Logs(nil), + ).Chan() out: for { select { case <-fs.quit: break out - case event := <-events.Chan(): - switch event := event.(type) { + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, notify the registered filters + switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block, event.Logs) + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() case core.TxPreEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.TransactionCallback != nil { - filter.TransactionCallback(event.Tx) + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } } fs.filterMu.RUnlock() case vm.Logs: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) if len(msgs) > 0 { filter.LogsCallback(msgs) } diff --git a/eth/gasprice.go b/eth/gasprice.go index c08b961293..b4409f346e 100644 --- a/eth/gasprice.go +++ b/eth/gasprice.go @@ -84,19 +84,16 @@ func (self *GasPriceOracle) processPastBlocks() { } func (self *GasPriceOracle) listenLoop() { - for { - ev, isopen := <-self.events.Chan() - if !isopen { - break - } - switch ev := ev.(type) { + defer self.events.Unsubscribe() + + for event := range self.events.Chan() { + switch event := event.Data.(type) { case core.ChainEvent: - self.processBlock(ev.Block) + self.processBlock(event.Block) case core.ChainSplitEvent: - self.processBlock(ev.Block) + self.processBlock(event.Block) } } - self.events.Unsubscribe() } func (self *GasPriceOracle) processBlock(block *types.Block) { diff --git a/eth/handler.go b/eth/handler.go index fc92338b4f..3fc9096725 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -687,7 +687,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.minedBlockSub.Chan() { - switch ev := obj.(type) { + switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: self.BroadcastBlock(ev.Block, true) // First propagate block to peers self.BroadcastBlock(ev.Block, false) // Only then announce to the rest @@ -698,7 +698,7 @@ func (self *ProtocolManager) minedBroadcastLoop() { func (self *ProtocolManager) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { - event := obj.(core.TxPreEvent) + event := obj.Data.(core.TxPreEvent) self.BroadcastTx(event.Tx.Hash(), event.Tx) } } diff --git a/event/event.go b/event/event.go index ce74e52862..57dd52baa1 100644 --- a/event/event.go +++ b/event/event.go @@ -22,14 +22,21 @@ import ( "fmt" "reflect" "sync" + "time" ) +// Event is a time-tagged notification pushed to subscribers. +type Event struct { + Time time.Time + Data interface{} +} + // Subscription is implemented by event subscriptions. type Subscription interface { // Chan returns a channel that carries events. // Implementations should return the same channel // for any subsequent calls to Chan. - Chan() <-chan interface{} + Chan() <-chan *Event // Unsubscribe stops delivery of events to a subscription. // The event channel is closed. @@ -82,6 +89,10 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { // Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. func (mux *TypeMux) Post(ev interface{}) error { + event := &Event{ + Time: time.Now(), + Data: ev, + } rtyp := reflect.TypeOf(ev) mux.mutex.RLock() if mux.stopped { @@ -91,7 +102,7 @@ func (mux *TypeMux) Post(ev interface{}) error { subs := mux.subm[rtyp] mux.mutex.RUnlock() for _, sub := range subs { - sub.deliver(ev) + sub.deliver(event) } return nil } @@ -143,6 +154,7 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux + created time.Time closeMu sync.Mutex closing chan struct{} closed bool @@ -151,21 +163,22 @@ type muxsub struct { // postC can be set to nil without affecting the return value of // Chan. postMu sync.RWMutex - readC <-chan interface{} - postC chan<- interface{} + readC <-chan *Event + postC chan<- *Event } func newsub(mux *TypeMux) *muxsub { - c := make(chan interface{}) + c := make(chan *Event) return &muxsub{ mux: mux, + created: time.Now(), readC: c, postC: c, closing: make(chan struct{}), } } -func (s *muxsub) Chan() <-chan interface{} { +func (s *muxsub) Chan() <-chan *Event { return s.readC } @@ -189,11 +202,17 @@ func (s *muxsub) closewait() { s.postMu.Unlock() } -func (s *muxsub) deliver(ev interface{}) { +func (s *muxsub) deliver(event *Event) { + // Short circuit delivery if stale event + if s.created.After(event.Time) { + return + } + // Otherwise deliver the event s.postMu.RLock() + defer s.postMu.RUnlock() + select { - case s.postC <- ev: + case s.postC <- event: case <-s.closing: } - s.postMu.RUnlock() } diff --git a/event/event_test.go b/event/event_test.go index 465af38cd9..323cfea49e 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -37,7 +37,7 @@ func TestSub(t *testing.T) { }() ev := <-sub.Chan() - if ev.(testEvent) != testEvent(5) { + if ev.Data.(testEvent) != testEvent(5) { t.Errorf("Got %v (%T), expected event %v (%T)", ev, ev, testEvent(5), testEvent(5)) } diff --git a/event/example_test.go b/event/example_test.go index d4642ef2f5..29938e8539 100644 --- a/event/example_test.go +++ b/event/example_test.go @@ -30,7 +30,7 @@ func ExampleTypeMux() { sub := mux.Subscribe(someEvent{}, otherEvent{}) go func() { for event := range sub.Chan() { - fmt.Printf("Received: %#v\n", event) + fmt.Printf("Received: %#v\n", event.Data) } fmt.Println("done") close(done) diff --git a/miner/miner.go b/miner/miner.go index b550ed6d62..769db79d1a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -66,7 +66,7 @@ func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) out: for ev := range events.Chan() { - switch ev.(type) { + switch ev.Data.(type) { case downloader.StartEvent: atomic.StoreInt32(&self.canStart, 0) if self.Mining() { diff --git a/miner/worker.go b/miner/worker.go index 8be2db93e4..43f6f9909c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -215,13 +215,20 @@ func (self *worker) register(agent Agent) { } func (self *worker) update() { - events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + defer eventSub.Unsubscribe() -out: + eventCh := eventSub.Chan() for { select { - case event := <-events.Chan(): - switch ev := event.(type) { + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, process interesting content + switch ev := event.Data.(type) { case core.ChainHeadEvent: self.commitNewWork() case core.ChainSideEvent: @@ -237,11 +244,9 @@ out: } } case <-self.quit: - break out + return } } - - events.Unsubscribe() } func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) (minedBlocks *uint64RingBuffer) { diff --git a/xeth/xeth.go b/xeth/xeth.go index da712a9848..13e1712709 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -244,30 +244,37 @@ func (self *XEth) State() *State { return self.state } func (self *XEth) UpdateState() (wait chan *big.Int) { wait = make(chan *big.Int) go func() { - sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) + eventSub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) + defer eventSub.Unsubscribe() + var m, n *big.Int var ok bool - out: + + eventCh := eventSub.Chan() for { select { - case event := <-sub.Chan(): - ev, ok := event.(core.ChainHeadEvent) - if ok { - m = ev.Block.Number() + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, process if new head block assignment + if event, ok := event.Data.(core.ChainHeadEvent); ok { + m = event.Block.Number() if n != nil && n.Cmp(m) < 0 { wait <- n n = nil } - statedb := state.New(ev.Block.Root(), self.backend.ChainDb()) + statedb := state.New(event.Block.Root(), self.backend.ChainDb()) self.state = NewState(self, statedb) } case n, ok = <-wait: if !ok { - break out + return } } } - sub.Unsubscribe() }() return }