From bd74882d83509ba4a8477dd21b86d46bc7d12eb4 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 5 Mar 2017 16:52:03 +0100 Subject: [PATCH 1/2] core: implement ChainIndexer --- core/chain_indexer.go | 294 +++++++++++++++++++++++++++++++++++++ core/chain_indexer_test.go | 235 +++++++++++++++++++++++++++++ 2 files changed, 529 insertions(+) create mode 100644 core/chain_indexer.go create mode 100644 core/chain_indexer_test.go diff --git a/core/chain_indexer.go b/core/chain_indexer.go new file mode 100644 index 0000000000..f1ead526de --- /dev/null +++ b/core/chain_indexer.go @@ -0,0 +1,294 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package core implements the Ethereum consensus protocol. +package core + +import ( + "encoding/binary" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" +) + +// ChainIndexer does a post-processing job for equally sized sections of the canonical +// chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain +// through the event system by starting a ChainEventLoop in a goroutine. +// Further child ChainIndexers can be added which use the output of the parent section +// indexer. These child indexers receive new head notifications only after an entire section +// has been finished or in case of rollbacks that might affect already finished sections. +type ChainIndexer struct { + chainDb, indexDb ethdb.Database + backend ChainIndexerBackend + sectionSize, confirmReq uint64 + stop chan struct{} + lock sync.Mutex + procWait time.Duration + tryUpdate chan struct{} + stored, targetCount, calcIdx, lastForwarded uint64 + updating bool + children []*ChainIndexer +} + +// ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job +type ChainIndexerBackend interface { + Reset(section uint64) // start processing a new section + Process(header *types.Header) // process a single block (called for each block in the section) + Commit(db ethdb.Database) error // do some more processing if necessary and store the results in the database + UpdateMsg(done, all uint64) // print a progress update message if necessary (only called when multiple sections need to be processed) +} + +// NewChainIndexer creates a new ChainIndexer +// db: database where the index of available processed sections is stored (the index is stored by the +// indexer, the actual processed chain data is stored by the backend) +// dbKey: key prefix where the index is stored +// backend: an implementation of ChainIndexerBackend +// sectionSize: the size of processable sections +// confirmReq: required number of confirmation blocks before a new section is being processed +// procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade) +// stop: quit channel +func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, sectionSize, confirmReq uint64, procWait time.Duration, stop chan struct{}) *ChainIndexer { + c := &ChainIndexer{ + chainDb: chainDb, + indexDb: indexDb, + backend: backend, + sectionSize: sectionSize, + confirmReq: confirmReq, + tryUpdate: make(chan struct{}, 1), + stop: stop, + procWait: procWait, + } + c.stored = c.getValidSections() + go c.updateLoop() + return c +} + +// updateLoop is the main event loop of the indexer +func (c *ChainIndexer) updateLoop() { + updateMsg := false + + for { + select { + case <-c.stop: + return + case <-c.tryUpdate: + c.lock.Lock() + if c.targetCount > c.stored { + if !updateMsg && c.targetCount > c.stored+1 { + updateMsg = true + c.backend.UpdateMsg(c.stored, c.targetCount) + } + c.calcIdx = c.stored + + var lastSectionHead common.Hash + if c.calcIdx > 0 { + lastSectionHead = c.getSectionHead(c.calcIdx - 1) + } + + c.lock.Unlock() + sectionHead, ok := c.processSection(c.calcIdx, lastSectionHead) + c.lock.Lock() + + if ok && lastSectionHead == c.getSectionHead(c.calcIdx-1) { + c.stored = c.calcIdx + 1 + c.setSectionHead(c.calcIdx, sectionHead) + c.setValidSections(c.stored) + if updateMsg { + c.backend.UpdateMsg(c.stored, c.targetCount) + if c.stored >= c.targetCount { + updateMsg = false + } + } + c.lastForwarded = c.stored*c.sectionSize - 1 + for _, cp := range c.children { + cp.newHead(c.lastForwarded, false) + } + } else { + // if processing has failed, do not retry until further notification + c.targetCount = c.stored + } + } + + if c.targetCount > c.stored { + go func() { + time.Sleep(c.procWait) + c.tryUpdate <- struct{}{} + }() + } else { + c.updating = false + } + c.lock.Unlock() + } + } +} + +// ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead +// (not needed for child indexers where the parent calls newHead) +func (c *ChainIndexer) ChainEventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { + sub := eventMux.Subscribe(ChainEvent{}) + c.newHead(currentHeader.Number.Uint64(), false) + lastHead := currentHeader.Hash() + for { + select { + case <-c.stop: + return + case ev := <-sub.Chan(): + header := ev.Data.(ChainEvent).Block.Header() + c.newHead(header.Number.Uint64(), header.ParentHash != lastHead) + lastHead = header.Hash() + } + } +} + +// AddChildIndexer adds a child ChainIndexer that can use the output of this one +func (c *ChainIndexer) AddChildIndexer(ci *ChainIndexer) { + c.children = append(c.children, ci) +} + +// newHead notifies the indexer about new chain heads or rollbacks +func (c *ChainIndexer) newHead(headNum uint64, rollback bool) { + c.lock.Lock() + defer c.lock.Unlock() + + if rollback { + firstChanged := headNum / c.sectionSize + if firstChanged < c.targetCount { + c.targetCount = firstChanged + } + if firstChanged < c.stored { + c.stored = firstChanged + c.setValidSections(c.stored) + } + headNum = firstChanged * c.sectionSize + + if headNum < c.lastForwarded { + c.lastForwarded = headNum + for _, cp := range c.children { + cp.newHead(c.lastForwarded, true) + } + } + + } else { + var newCount uint64 + if headNum >= c.confirmReq { + newCount = (headNum + 1 - c.confirmReq) / c.sectionSize + if newCount > c.targetCount { + c.targetCount = newCount + if !c.updating { + c.updating = true + c.tryUpdate <- struct{}{} + } + } + } + } +} + +// processSection processes an entire section by calling backend functions while ensuring +// the continuity of the passed headers. Since the chain mutex is not held while processing, +// the continuity can be broken by a long reorg, in which case the function returns with ok == false. +func (c *ChainIndexer) processSection(section uint64, lastSectionHead common.Hash) (sectionHead common.Hash, ok bool) { + c.backend.Reset(section) + + head := lastSectionHead + for i := section * c.sectionSize; i < (section+1)*c.sectionSize; i++ { + hash := GetCanonicalHash(c.chainDb, i) + if hash == (common.Hash{}) { + return common.Hash{}, false + } + header := GetHeader(c.chainDb, hash, i) + if header == nil || header.ParentHash != head { + return common.Hash{}, false + } + c.backend.Process(header) + head = header.Hash() + } + if err := c.backend.Commit(c.chainDb); err != nil { + return common.Hash{}, false + } + return head, true +} + +// CanonicalSections returns the number of processed sections that are consistent with +// the current canonical chain +func (c *ChainIndexer) CanonicalSections() uint64 { + c.lock.Lock() + defer c.lock.Unlock() + + cnt := c.getValidSections() + for cnt > 0 { + if c.getSectionHead(cnt-1) == GetCanonicalHash(c.chainDb, cnt*c.sectionSize-1) { + break + } + cnt-- + c.setValidSections(cnt) + } + return cnt +} + +// getValidSections reads the number of valid sections from the index database +func (c *ChainIndexer) getValidSections() uint64 { + data, _ := c.indexDb.Get([]byte("count")) + if len(data) == 8 { + return binary.BigEndian.Uint64(data[:]) + } + return 0 +} + +// setValidSections writes the number of valid sections to the index database +func (c *ChainIndexer) setValidSections(cnt uint64) { + oldCnt := c.getValidSections() + if cnt < oldCnt { + for i := cnt; i < oldCnt; i++ { + c.removeSectionHead(i) + } + } + + var data [8]byte + binary.BigEndian.PutUint64(data[:], cnt) + c.indexDb.Put([]byte("count"), data[:]) +} + +// getSectionHead reads the last block hash of a processed section from the index database +func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash { + var data [8]byte + binary.BigEndian.PutUint64(data[:], idx) + + hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) + if len(hash) == len(common.Hash{}) { + return common.BytesToHash(hash) + } + return common.Hash{} +} + +// setSectionHead writes the last block hash of a processed section to the index database +func (c *ChainIndexer) setSectionHead(idx uint64, shead common.Hash) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], idx) + + c.indexDb.Put(append([]byte("shead"), data[:]...), shead.Bytes()) +} + +// removeSectionHead removes the reference to a processed section from the index database +func (c *ChainIndexer) removeSectionHead(idx uint64) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], idx) + + c.indexDb.Delete(append([]byte("shead"), data[:]...)) +} diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go new file mode 100644 index 0000000000..827976d51c --- /dev/null +++ b/core/chain_indexer_test.go @@ -0,0 +1,235 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package core implements the Ethereum consensus protocol. +package core + +import ( + "encoding/binary" + "math/big" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" +) + +func TestChainIndexerSingle(t *testing.T) { + // run multiple tests with randomized parameters + for i := 0; i < 10; i++ { + testChainIndexer(t, 1) + } +} + +func TestChainIndexerWithChildren(t *testing.T) { + // run multiple tests with randomized parameters and different number of + // chained indexers + for i := 2; i < 8; i++ { + testChainIndexer(t, i) + } +} + +// testChainIndexer runs a test with either a single ChainIndexer or a chain of multiple indexers +// sectionSize and confirmReq parameters are randomized +func testChainIndexer(t *testing.T, tciCount int) { + db, _ := ethdb.NewMemDatabase() + stop := make(chan struct{}) + tciList := make([]*testChainIndex, tciCount) + var lastIndexer *ChainIndexer + for i, _ := range tciList { + tci := &testChainIndex{t: t, sectionSize: uint64(rand.Intn(100) + 1), confirmReq: uint64(rand.Intn(10)), processCh: make(chan uint64)} + tciList[i] = tci + tci.indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), tci, tci.sectionSize, tci.confirmReq, 0, stop) + if cs := tci.indexer.CanonicalSections(); cs != 0 { + t.Errorf("Expected 0 canonical sections, got %d", cs) + } + if lastIndexer != nil { + lastIndexer.AddChildIndexer(tci.indexer) + } + lastIndexer = tci.indexer + } + + // expectCs expects a certain number of available canonical sections + expectCs := func(indexer *ChainIndexer, expCs uint64) { + cnt := 0 + for { + cs := indexer.CanonicalSections() + if cs == expCs { + return + } + // keep trying for 10 seconds if it does not match + cnt++ + if cnt == 10000 { + t.Fatalf("Expected %d canonical sections, got %d", expCs, cs) + } + time.Sleep(time.Millisecond) + } + } + + // notify the indexer about a new head or rollback, then expect processed blocks if a section is processable + notify := func(headNum, expFailAfter uint64, rollback bool) { + tciList[0].indexer.newHead(headNum, rollback) + if rollback { + for _, tci := range tciList { + headNum = tci.rollback(headNum) + expectCs(tci.indexer, tci.stored) + } + } else { + for _, tci := range tciList { + var more bool + headNum, more = tci.newBlocks(headNum, expFailAfter) + if !more { + break + } + expectCs(tci.indexer, tci.stored) + } + } + } + + for i := uint64(0); i <= 100; i++ { + testCanonicalHeader(db, i) + } + // start indexer with an already existing chain + notify(100, 100, false) + // add new blocks one by one + for i := uint64(101); i <= 1000; i++ { + testCanonicalHeader(db, i) + notify(i, i, false) + } + // do a rollback + notify(500, 500, true) + // create new fork + for i := uint64(501); i <= 1000; i++ { + testCanonicalHeader(db, i) + notify(i, i, false) + } + + for i := uint64(1001); i <= 1500; i++ { + testCanonicalHeader(db, i) + } + // create a failed processing scenario where less blocks are available at processing time than notified + notify(2000, 1500, false) + // notify about a rollback (which could have caused the missing blocks if happened during processing) + notify(1500, 1500, true) + + // create new fork + for i := uint64(1501); i <= 2000; i++ { + testCanonicalHeader(db, i) + notify(i, i, false) + } + close(stop) + db.Close() +} + +func testCanonicalHeader(db ethdb.Database, idx uint64) { + var rnd [8]byte + binary.BigEndian.PutUint64(rnd[:], uint64(rand.Int63())) + header := &types.Header{Number: big.NewInt(int64(idx)), Extra: rnd[:]} + if idx > 0 { + header.ParentHash = GetCanonicalHash(db, idx-1) + } + WriteHeader(db, header) + WriteCanonicalHash(db, header.Hash(), idx) +} + +// testChainIndex implements ChainIndexerBackend +type testChainIndex struct { + t *testing.T + sectionSize, confirmReq uint64 + section, headerCnt, stored uint64 + indexer *ChainIndexer + processCh chan uint64 +} + +// newBlocks expects process calls after new blocks have arrived. If expFailAfter < headNum then +// we are simulating a scenario where a rollback has happened after the processing has started and +// the processing of a section fails. +func (t *testChainIndex) newBlocks(headNum, expFailAfter uint64) (uint64, bool) { + var newCount uint64 + if headNum >= t.confirmReq { + newCount = (headNum + 1 - t.confirmReq) / t.sectionSize + if newCount > t.stored { + // expect processed blocks + for exp := t.stored * t.sectionSize; exp < newCount*t.sectionSize; exp++ { + if exp > expFailAfter { + // rolled back after processing started, no more process calls expected + // wait until updating is done to make sure that processing actually fails + for { + t.indexer.lock.Lock() + u := t.indexer.updating + t.indexer.lock.Unlock() + if !u { + break + } + time.Sleep(time.Millisecond) + } + + newCount = exp / t.sectionSize + break + } + select { + case <-time.After(10 * time.Second): + t.t.Fatalf("Expected processed block #%d, got nothing", exp) + case proc := <-t.processCh: + if proc != exp { + t.t.Errorf("Expected processed block #%d, got #%d", exp, proc) + } + } + } + t.stored = newCount + } + } + if t.stored == 0 { + return 0, false + } + return t.stored*t.sectionSize - 1, true +} + +func (t *testChainIndex) rollback(headNum uint64) uint64 { + firstChanged := headNum / t.sectionSize + if firstChanged < t.stored { + t.stored = firstChanged + } + return t.stored * t.sectionSize +} + +func (t *testChainIndex) Reset(section uint64) { + t.section = section + t.headerCnt = 0 +} + +func (t *testChainIndex) Process(header *types.Header) { + t.headerCnt++ + if t.headerCnt > t.sectionSize { + t.t.Error("Processing too many headers") + } + //t.processCh <- header.Number.Uint64() + select { + case <-time.After(10 * time.Second): + t.t.Fatal("Unexpected call to Process") + case t.processCh <- header.Number.Uint64(): + } +} + +func (t *testChainIndex) Commit(db ethdb.Database) error { + if t.headerCnt != t.sectionSize { + t.t.Error("Not enough headers processed") + } + return nil +} + +func (t *testChainIndex) UpdateMsg(done, all uint64) {} From 8edaaa227db9dfc0a3993f9d33413771c7963330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 3 Aug 2017 18:25:06 +0200 Subject: [PATCH 2/2] core: polish chain indexer a bit --- core/chain_indexer.go | 454 +++++++++++++++++++++++-------------- core/chain_indexer_test.go | 251 ++++++++++---------- 2 files changed, 403 insertions(+), 302 deletions(-) diff --git a/core/chain_indexer.go b/core/chain_indexer.go index f1ead526de..9a88a5b1b8 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -14,261 +14,361 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package core implements the Ethereum consensus protocol. package core import ( "encoding/binary" + "fmt" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) -// ChainIndexer does a post-processing job for equally sized sections of the canonical -// chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain -// through the event system by starting a ChainEventLoop in a goroutine. -// Further child ChainIndexers can be added which use the output of the parent section -// indexer. These child indexers receive new head notifications only after an entire section -// has been finished or in case of rollbacks that might affect already finished sections. -type ChainIndexer struct { - chainDb, indexDb ethdb.Database - backend ChainIndexerBackend - sectionSize, confirmReq uint64 - stop chan struct{} - lock sync.Mutex - procWait time.Duration - tryUpdate chan struct{} - stored, targetCount, calcIdx, lastForwarded uint64 - updating bool - children []*ChainIndexer +// ChainIndexerBackend defines the methods needed to process chain segments in +// the background and write the segment results into the database. These can be +// used to create filter blooms or CHTs. +type ChainIndexerBackend interface { + // Reset initiates the processing of a new chain segment, potentially terminating + // any partially completed operations (in case of a reorg). + Reset(section uint64) + + // Process crunches through the next header in the chain segment. The caller + // will ensure a sequential order of headers. + Process(header *types.Header) + + // Commit finalizes the section metadata and stores it into the database. This + // interface will usually be a batch writer. + Commit(db ethdb.Database) error } -// ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job -type ChainIndexerBackend interface { - Reset(section uint64) // start processing a new section - Process(header *types.Header) // process a single block (called for each block in the section) - Commit(db ethdb.Database) error // do some more processing if necessary and store the results in the database - UpdateMsg(done, all uint64) // print a progress update message if necessary (only called when multiple sections need to be processed) +// ChainIndexer does a post-processing job for equally sized sections of the +// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is +// connected to the blockchain through the event system by starting a +// ChainEventLoop in a goroutine. +// +// Further child ChainIndexers can be added which use the output of the parent +// section indexer. These child indexers receive new head notifications only +// after an entire section has been finished or in case of rollbacks that might +// affect already finished sections. +type ChainIndexer struct { + chainDb ethdb.Database // Chain database to index the data from + indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into + backend ChainIndexerBackend // Background processor generating the index data content + children []*ChainIndexer // Child indexers to cascade chain updates to + + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + + sectionSize uint64 // Number of blocks in a single chain segment to process + confirmsReq uint64 // Number of confirmations before processing a completed segment + + storedSections uint64 // Number of sections successfully indexed into the database + knownSections uint64 // Number of sections known to be complete (block wise) + cascadedHead uint64 // Block number of the last completed section cascaded to subindexers + + throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources + + log log.Logger + lock sync.RWMutex } -// NewChainIndexer creates a new ChainIndexer -// db: database where the index of available processed sections is stored (the index is stored by the -// indexer, the actual processed chain data is stored by the backend) -// dbKey: key prefix where the index is stored -// backend: an implementation of ChainIndexerBackend -// sectionSize: the size of processable sections -// confirmReq: required number of confirmation blocks before a new section is being processed -// procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade) -// stop: quit channel -func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, sectionSize, confirmReq uint64, procWait time.Duration, stop chan struct{}) *ChainIndexer { +// NewChainIndexer creates a new chain indexer to do background processing on +// chain segments of a given size after certain number of confirmations passed. +// The throttling parameter might be used to prevent database thrashing. +func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { c := &ChainIndexer{ chainDb: chainDb, indexDb: indexDb, backend: backend, - sectionSize: sectionSize, - confirmReq: confirmReq, - tryUpdate: make(chan struct{}, 1), - stop: stop, - procWait: procWait, + update: make(chan struct{}, 1), + quit: make(chan chan error), + sectionSize: section, + confirmsReq: confirm, + throttling: throttling, + log: log.New("type", kind), } - c.stored = c.getValidSections() + // Initialize database dependent fields and start the updater + c.loadValidSections() go c.updateLoop() + return c } -// updateLoop is the main event loop of the indexer -func (c *ChainIndexer) updateLoop() { - updateMsg := false - - for { - select { - case <-c.stop: - return - case <-c.tryUpdate: - c.lock.Lock() - if c.targetCount > c.stored { - if !updateMsg && c.targetCount > c.stored+1 { - updateMsg = true - c.backend.UpdateMsg(c.stored, c.targetCount) - } - c.calcIdx = c.stored +// Start creates a goroutine to feed chain head events into the indexer for +// cascading background processing. +func (c *ChainIndexer) Start(currentHeader *types.Header, eventMux *event.TypeMux) { + go c.eventLoop(currentHeader, eventMux) +} - var lastSectionHead common.Hash - if c.calcIdx > 0 { - lastSectionHead = c.getSectionHead(c.calcIdx - 1) - } +// Close tears down all goroutines belonging to the indexer and returns any error +// that might have occurred internally. +func (c *ChainIndexer) Close() error { + var errs []error - c.lock.Unlock() - sectionHead, ok := c.processSection(c.calcIdx, lastSectionHead) - c.lock.Lock() + // Tear down the primary update loop + errc := make(chan error) + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + // If needed, tear down the secondary event loop + if atomic.LoadUint32(&c.active) != 0 { + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + // Return any failures + switch { + case len(errs) == 0: + return nil - if ok && lastSectionHead == c.getSectionHead(c.calcIdx-1) { - c.stored = c.calcIdx + 1 - c.setSectionHead(c.calcIdx, sectionHead) - c.setValidSections(c.stored) - if updateMsg { - c.backend.UpdateMsg(c.stored, c.targetCount) - if c.stored >= c.targetCount { - updateMsg = false - } - } - c.lastForwarded = c.stored*c.sectionSize - 1 - for _, cp := range c.children { - cp.newHead(c.lastForwarded, false) - } - } else { - // if processing has failed, do not retry until further notification - c.targetCount = c.stored - } - } + case len(errs) == 1: + return errs[0] - if c.targetCount > c.stored { - go func() { - time.Sleep(c.procWait) - c.tryUpdate <- struct{}{} - }() - } else { - c.updating = false - } - c.lock.Unlock() - } + default: + return fmt.Errorf("%v", errs) } } -// ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead -// (not needed for child indexers where the parent calls newHead) -func (c *ChainIndexer) ChainEventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { +// eventLoop is a secondary - optional - event loop of the indexer which is only +// started for the outermost indexer to push chain head events into a processing +// queue. +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { + // Mark the chain indexer as active, requiring an additional teardown + atomic.StoreUint32(&c.active, 1) + + // Subscribe to chain head events sub := eventMux.Subscribe(ChainEvent{}) + defer sub.Unsubscribe() + + // Fire the initial new head event to start any outstanding processing c.newHead(currentHeader.Number.Uint64(), false) - lastHead := currentHeader.Hash() + + var ( + prevHeader = currentHeader + prevHash = currentHeader.Hash() + ) for { select { - case <-c.stop: + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil return - case ev := <-sub.Chan(): + + case ev, ok := <-sub.Chan(): + // Received a new event, ensure it's not nil (closing) and update + if !ok { + errc := <-c.quit + errc <- nil + return + } header := ev.Data.(ChainEvent).Block.Header() - c.newHead(header.Number.Uint64(), header.ParentHash != lastHead) - lastHead = header.Hash() + if header.ParentHash != prevHash { + c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + } + c.newHead(header.Number.Uint64(), false) + + prevHeader, prevHash = header, header.Hash() } } } -// AddChildIndexer adds a child ChainIndexer that can use the output of this one -func (c *ChainIndexer) AddChildIndexer(ci *ChainIndexer) { - c.children = append(c.children, ci) -} - -// newHead notifies the indexer about new chain heads or rollbacks -func (c *ChainIndexer) newHead(headNum uint64, rollback bool) { +// newHead notifies the indexer about new chain heads and/or reorgs. +func (c *ChainIndexer) newHead(head uint64, reorg bool) { c.lock.Lock() defer c.lock.Unlock() - if rollback { - firstChanged := headNum / c.sectionSize - if firstChanged < c.targetCount { - c.targetCount = firstChanged + // If a reorg happened, invalidate all sections until that point + if reorg { + // Revert the known section number to the reorg point + changed := head / c.sectionSize + if changed < c.knownSections { + c.knownSections = changed } - if firstChanged < c.stored { - c.stored = firstChanged - c.setValidSections(c.stored) + // Revert the stored sections from the database to the reorg point + if changed < c.storedSections { + c.setValidSections(changed) } - headNum = firstChanged * c.sectionSize + // Update the new head number to te finalized section end and notify children + head = changed * c.sectionSize - if headNum < c.lastForwarded { - c.lastForwarded = headNum - for _, cp := range c.children { - cp.newHead(c.lastForwarded, true) + if head < c.cascadedHead { + c.cascadedHead = head + for _, child := range c.children { + child.newHead(c.cascadedHead, true) } } + return + } + // No reorg, calculate the number of newly known sections and update if high enough + var sections uint64 + if head >= c.confirmsReq { + sections = (head + 1 - c.confirmsReq) / c.sectionSize + if sections > c.knownSections { + c.knownSections = sections + + select { + case c.update <- struct{}{}: + default: + } + } + } +} + +// updateLoop is the main event loop of the indexer which pushes chain segments +// down into the processing backend. +func (c *ChainIndexer) updateLoop() { + var updated time.Time + + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case <-c.update: + // Section headers completed (or rolled back), update the index + c.lock.Lock() + if c.knownSections > c.storedSections { + // Periodically print an upgrade log message to the user + if time.Since(updated) > 8*time.Second { + if c.knownSections > c.storedSections+1 { + c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) + } + updated = time.Now() + } + // Cache the current section count and head to allow unlocking the mutex + section := c.storedSections + var oldHead common.Hash + if section > 0 { + oldHead = c.sectionHead(section - 1) + } + // Process the newly defined section in the background + c.lock.Unlock() + newHead, err := c.processSection(section, oldHead) + c.lock.Lock() + + // If processing succeeded and no reorgs occcurred, mark the section completed + if err == nil && oldHead == c.sectionHead(section-1) { + c.setSectionHead(section, newHead) + c.setValidSections(section + 1) - } else { - var newCount uint64 - if headNum >= c.confirmReq { - newCount = (headNum + 1 - c.confirmReq) / c.sectionSize - if newCount > c.targetCount { - c.targetCount = newCount - if !c.updating { - c.updating = true - c.tryUpdate <- struct{}{} + c.cascadedHead = c.storedSections*c.sectionSize - 1 + for _, child := range c.children { + c.log.Trace("Cascading chain index update", "head", c.cascadedHead) + child.newHead(c.cascadedHead, false) + } + } else { + // If processing failed, don't retry until further notification + c.log.Debug("Chain index processing failed", "section", section, "err", err) + c.knownSections = c.storedSections } } + // If there are still further sections to process, reschedule + if c.knownSections > c.storedSections { + time.AfterFunc(c.throttling, func() { + select { + case c.update <- struct{}{}: + default: + } + }) + } + c.lock.Unlock() } } } -// processSection processes an entire section by calling backend functions while ensuring -// the continuity of the passed headers. Since the chain mutex is not held while processing, -// the continuity can be broken by a long reorg, in which case the function returns with ok == false. -func (c *ChainIndexer) processSection(section uint64, lastSectionHead common.Hash) (sectionHead common.Hash, ok bool) { +// processSection processes an entire section by calling backend functions while +// ensuring the continuity of the passed headers. Since the chain mutex is not +// held while processing, the continuity can be broken by a long reorg, in which +// case the function returns with an error. +func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { + c.log.Trace("Processing new chain section", "section", section) + + // Reset and partial processing c.backend.Reset(section) - head := lastSectionHead - for i := section * c.sectionSize; i < (section+1)*c.sectionSize; i++ { - hash := GetCanonicalHash(c.chainDb, i) + for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { + hash := GetCanonicalHash(c.chainDb, number) if hash == (common.Hash{}) { - return common.Hash{}, false + return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) } - header := GetHeader(c.chainDb, hash, i) - if header == nil || header.ParentHash != head { - return common.Hash{}, false + header := GetHeader(c.chainDb, hash, number) + if header == nil { + return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) + } else if header.ParentHash != lastHead { + return common.Hash{}, fmt.Errorf("chain reorged during section processing") } c.backend.Process(header) - head = header.Hash() + lastHead = header.Hash() } if err := c.backend.Commit(c.chainDb); err != nil { - return common.Hash{}, false + return common.Hash{}, err } - return head, true + return lastHead, nil } -// CanonicalSections returns the number of processed sections that are consistent with -// the current canonical chain -func (c *ChainIndexer) CanonicalSections() uint64 { +// Sections returns the number of processed sections maintained by the indexer +// and also the information about the last header indexed for potential canonical +// verifications. +func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - cnt := c.getValidSections() - for cnt > 0 { - if c.getSectionHead(cnt-1) == GetCanonicalHash(c.chainDb, cnt*c.sectionSize-1) { - break - } - cnt-- - c.setValidSections(cnt) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) +} + +// AddChildIndexer adds a child ChainIndexer that can use the output of this one +func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { + c.lock.Lock() + defer c.lock.Unlock() + + c.children = append(c.children, indexer) + + // Cascade any pending updates to new children too + if c.storedSections > 0 { + indexer.newHead(c.storedSections*c.sectionSize-1, false) } - return cnt } -// getValidSections reads the number of valid sections from the index database -func (c *ChainIndexer) getValidSections() uint64 { +// loadValidSections reads the number of valid sections from the index database +// and caches is into the local state. +func (c *ChainIndexer) loadValidSections() { data, _ := c.indexDb.Get([]byte("count")) if len(data) == 8 { - return binary.BigEndian.Uint64(data[:]) + c.storedSections = binary.BigEndian.Uint64(data[:]) } - return 0 } // setValidSections writes the number of valid sections to the index database -func (c *ChainIndexer) setValidSections(cnt uint64) { - oldCnt := c.getValidSections() - if cnt < oldCnt { - for i := cnt; i < oldCnt; i++ { - c.removeSectionHead(i) - } - } - +func (c *ChainIndexer) setValidSections(sections uint64) { + // Set the current number of valid sections in the database var data [8]byte - binary.BigEndian.PutUint64(data[:], cnt) + binary.BigEndian.PutUint64(data[:], sections) c.indexDb.Put([]byte("count"), data[:]) + + // Remove any reorged sections, caching the valids in the mean time + for c.storedSections > sections { + c.storedSections-- + c.removeSectionHead(c.storedSections) + } + c.storedSections = sections // needed if new > old } -// getSectionHead reads the last block hash of a processed section from the index database -func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash { +// sectionHead retrieves the last block hash of a processed section from the +// index database. +func (c *ChainIndexer) sectionHead(section uint64) common.Hash { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) if len(hash) == len(common.Hash{}) { @@ -277,18 +377,20 @@ func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash { return common.Hash{} } -// setSectionHead writes the last block hash of a processed section to the index database -func (c *ChainIndexer) setSectionHead(idx uint64, shead common.Hash) { +// setSectionHead writes the last block hash of a processed section to the index +// database. +func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) - c.indexDb.Put(append([]byte("shead"), data[:]...), shead.Bytes()) + c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) } -// removeSectionHead removes the reference to a processed section from the index database -func (c *ChainIndexer) removeSectionHead(idx uint64) { +// removeSectionHead removes the reference to a processed section from the index +// database. +func (c *ChainIndexer) removeSectionHead(section uint64) { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) c.indexDb.Delete(append([]byte("shead"), data[:]...)) } diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index 827976d51c..780e46e433 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -14,11 +14,10 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package core implements the Ethereum consensus protocol. package core import ( - "encoding/binary" + "fmt" "math/big" "math/rand" "testing" @@ -28,208 +27,208 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) +// Runs multiple tests with randomized parameters. func TestChainIndexerSingle(t *testing.T) { - // run multiple tests with randomized parameters for i := 0; i < 10; i++ { testChainIndexer(t, 1) } } +// Runs multiple tests with randomized parameters and different number of +// chain backends. func TestChainIndexerWithChildren(t *testing.T) { - // run multiple tests with randomized parameters and different number of - // chained indexers for i := 2; i < 8; i++ { testChainIndexer(t, i) } } -// testChainIndexer runs a test with either a single ChainIndexer or a chain of multiple indexers -// sectionSize and confirmReq parameters are randomized -func testChainIndexer(t *testing.T, tciCount int) { +// testChainIndexer runs a test with either a single chain indexer or a chain of +// multiple backends. The section size and required confirmation count parameters +// are randomized. +func testChainIndexer(t *testing.T, count int) { db, _ := ethdb.NewMemDatabase() - stop := make(chan struct{}) - tciList := make([]*testChainIndex, tciCount) - var lastIndexer *ChainIndexer - for i, _ := range tciList { - tci := &testChainIndex{t: t, sectionSize: uint64(rand.Intn(100) + 1), confirmReq: uint64(rand.Intn(10)), processCh: make(chan uint64)} - tciList[i] = tci - tci.indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), tci, tci.sectionSize, tci.confirmReq, 0, stop) - if cs := tci.indexer.CanonicalSections(); cs != 0 { - t.Errorf("Expected 0 canonical sections, got %d", cs) + defer db.Close() + + // Create a chain of indexers and ensure they all report empty + backends := make([]*testChainIndexBackend, count) + for i := 0; i < count; i++ { + var ( + sectionSize = uint64(rand.Intn(100) + 1) + confirmsReq = uint64(rand.Intn(10)) + ) + backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)} + backends[i].indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i)) + defer backends[i].indexer.Close() + + if sections, _, _ := backends[i].indexer.Sections(); sections != 0 { + t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0) } - if lastIndexer != nil { - lastIndexer.AddChildIndexer(tci.indexer) + if i > 0 { + backends[i-1].indexer.AddChildIndexer(backends[i].indexer) } - lastIndexer = tci.indexer } - - // expectCs expects a certain number of available canonical sections - expectCs := func(indexer *ChainIndexer, expCs uint64) { - cnt := 0 - for { - cs := indexer.CanonicalSections() - if cs == expCs { - return + // notify pings the root indexer about a new head or reorg, then expect + // processed blocks if a section is processable + notify := func(headNum, failNum uint64, reorg bool) { + backends[0].indexer.newHead(headNum, reorg) + if reorg { + for _, backend := range backends { + headNum = backend.reorg(headNum) + backend.assertSections() } - // keep trying for 10 seconds if it does not match - cnt++ - if cnt == 10000 { - t.Fatalf("Expected %d canonical sections, got %d", expCs, cs) + return + } + var cascade bool + for _, backend := range backends { + headNum, cascade = backend.assertBlocks(headNum, failNum) + if !cascade { + break } - time.Sleep(time.Millisecond) + backend.assertSections() } } - - // notify the indexer about a new head or rollback, then expect processed blocks if a section is processable - notify := func(headNum, expFailAfter uint64, rollback bool) { - tciList[0].indexer.newHead(headNum, rollback) - if rollback { - for _, tci := range tciList { - headNum = tci.rollback(headNum) - expectCs(tci.indexer, tci.stored) - } - } else { - for _, tci := range tciList { - var more bool - headNum, more = tci.newBlocks(headNum, expFailAfter) - if !more { - break - } - expectCs(tci.indexer, tci.stored) - } + // inject inserts a new random canonical header into the database directly + inject := func(number uint64) { + header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()} + if number > 0 { + header.ParentHash = GetCanonicalHash(db, number-1) } + WriteHeader(db, header) + WriteCanonicalHash(db, header.Hash(), number) } - + // Start indexer with an already existing chain for i := uint64(0); i <= 100; i++ { - testCanonicalHeader(db, i) + inject(i) } - // start indexer with an already existing chain notify(100, 100, false) - // add new blocks one by one + + // Add new blocks one by one for i := uint64(101); i <= 1000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - // do a rollback + // Do a reorg notify(500, 500, true) - // create new fork + + // Create new fork for i := uint64(501); i <= 1000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - for i := uint64(1001); i <= 1500; i++ { - testCanonicalHeader(db, i) + inject(i) } - // create a failed processing scenario where less blocks are available at processing time than notified + // Failed processing scenario where less blocks are available than notified notify(2000, 1500, false) - // notify about a rollback (which could have caused the missing blocks if happened during processing) + + // Notify about a reorg (which could have caused the missing blocks if happened during processing) notify(1500, 1500, true) - // create new fork + // Create new fork for i := uint64(1501); i <= 2000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - close(stop) - db.Close() } -func testCanonicalHeader(db ethdb.Database, idx uint64) { - var rnd [8]byte - binary.BigEndian.PutUint64(rnd[:], uint64(rand.Int63())) - header := &types.Header{Number: big.NewInt(int64(idx)), Extra: rnd[:]} - if idx > 0 { - header.ParentHash = GetCanonicalHash(db, idx-1) - } - WriteHeader(db, header) - WriteCanonicalHash(db, header.Hash(), idx) -} - -// testChainIndex implements ChainIndexerBackend -type testChainIndex struct { +// testChainIndexBackend implements ChainIndexerBackend +type testChainIndexBackend struct { t *testing.T - sectionSize, confirmReq uint64 - section, headerCnt, stored uint64 indexer *ChainIndexer + section, headerCnt, stored uint64 processCh chan uint64 } -// newBlocks expects process calls after new blocks have arrived. If expFailAfter < headNum then -// we are simulating a scenario where a rollback has happened after the processing has started and -// the processing of a section fails. -func (t *testChainIndex) newBlocks(headNum, expFailAfter uint64) (uint64, bool) { - var newCount uint64 - if headNum >= t.confirmReq { - newCount = (headNum + 1 - t.confirmReq) / t.sectionSize - if newCount > t.stored { +// assertSections verifies if a chain indexer has the correct number of section. +func (b *testChainIndexBackend) assertSections() { + // Keep trying for 3 seconds if it does not match + var sections uint64 + for i := 0; i < 300; i++ { + sections, _, _ = b.indexer.Sections() + if sections == b.stored { + return + } + time.Sleep(10 * time.Millisecond) + } + b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored) +} + +// assertBlocks expects processing calls after new blocks have arrived. If the +// failNum < headNum then we are simulating a scenario where a reorg has happened +// after the processing has started and the processing of a section fails. +func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) { + var sections uint64 + if headNum >= b.indexer.confirmsReq { + sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize + if sections > b.stored { // expect processed blocks - for exp := t.stored * t.sectionSize; exp < newCount*t.sectionSize; exp++ { - if exp > expFailAfter { + for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ { + if expectd > failNum { // rolled back after processing started, no more process calls expected // wait until updating is done to make sure that processing actually fails - for { - t.indexer.lock.Lock() - u := t.indexer.updating - t.indexer.lock.Unlock() - if !u { + var updating bool + for i := 0; i < 300; i++ { + b.indexer.lock.Lock() + updating = b.indexer.knownSections > b.indexer.storedSections + b.indexer.lock.Unlock() + if !updating { break } - time.Sleep(time.Millisecond) + time.Sleep(10 * time.Millisecond) } - - newCount = exp / t.sectionSize + if updating { + b.t.Fatalf("update did not finish") + } + sections = expectd / b.indexer.sectionSize break } select { case <-time.After(10 * time.Second): - t.t.Fatalf("Expected processed block #%d, got nothing", exp) - case proc := <-t.processCh: - if proc != exp { - t.t.Errorf("Expected processed block #%d, got #%d", exp, proc) + b.t.Fatalf("Expected processed block #%d, got nothing", expectd) + case processed := <-b.processCh: + if processed != expectd { + b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed) } } } - t.stored = newCount + b.stored = sections } } - if t.stored == 0 { + if b.stored == 0 { return 0, false } - return t.stored*t.sectionSize - 1, true + return b.stored*b.indexer.sectionSize - 1, true } -func (t *testChainIndex) rollback(headNum uint64) uint64 { - firstChanged := headNum / t.sectionSize - if firstChanged < t.stored { - t.stored = firstChanged +func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { + firstChanged := headNum / b.indexer.sectionSize + if firstChanged < b.stored { + b.stored = firstChanged } - return t.stored * t.sectionSize + return b.stored * b.indexer.sectionSize } -func (t *testChainIndex) Reset(section uint64) { - t.section = section - t.headerCnt = 0 +func (b *testChainIndexBackend) Reset(section uint64) { + b.section = section + b.headerCnt = 0 } -func (t *testChainIndex) Process(header *types.Header) { - t.headerCnt++ - if t.headerCnt > t.sectionSize { - t.t.Error("Processing too many headers") +func (b *testChainIndexBackend) Process(header *types.Header) { + b.headerCnt++ + if b.headerCnt > b.indexer.sectionSize { + b.t.Error("Processing too many headers") } //t.processCh <- header.Number.Uint64() select { case <-time.After(10 * time.Second): - t.t.Fatal("Unexpected call to Process") - case t.processCh <- header.Number.Uint64(): + b.t.Fatal("Unexpected call to Process") + case b.processCh <- header.Number.Uint64(): } } -func (t *testChainIndex) Commit(db ethdb.Database) error { - if t.headerCnt != t.sectionSize { - t.t.Error("Not enough headers processed") +func (b *testChainIndexBackend) Commit(db ethdb.Database) error { + if b.headerCnt != b.indexer.sectionSize { + b.t.Error("Not enough headers processed") } return nil } - -func (t *testChainIndex) UpdateMsg(done, all uint64) {}