diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 4b6e779d54..4f1ab87024 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -22,7 +22,6 @@ import ( "fmt" "math/big" "runtime" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -46,7 +45,6 @@ var ( // codebase, inherently breaking if the engine is swapped out. Please put common // error types into the consensus package. var ( - errInvalidChain = errors.New("invalid header chain") errLargeBlockTime = errors.New("timestamp too big") errZeroBlockTime = errors.New("timestamp equals parent's") errTooManyUncles = errors.New("too many uncles") @@ -90,111 +88,80 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He // a results channel to retrieve the async verifications. func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { // If we're running a full engine faking, accept any input as valid - if ethash.fakeFull { + if ethash.fakeFull || len(headers) == 0 { abort, results := make(chan struct{}), make(chan error, len(headers)) for i := 0; i < len(headers); i++ { results <- nil } return abort, results } + // Spawn as many workers as allowed threads workers := runtime.GOMAXPROCS(0) if len(headers) < workers { workers = len(headers) } - // Create a task channel and spawn the verifiers - type result struct { - index int - err error - } - inputs := make(chan int, workers) - outputs := make(chan result, len(headers)) - var badblock uint64 + // Create a task channel and spawn the verifiers + var ( + inputs = make(chan int) + done = make(chan int, workers) + errors = make([]error, len(headers)) + abort = make(chan struct{}) + ) for i := 0; i < workers; i++ { go func() { for index := range inputs { - // If we've found a bad block already before this, stop validating - if bad := atomic.LoadUint64(&badblock); bad != 0 && bad <= headers[index].Number.Uint64() { - outputs <- result{index: index, err: errInvalidChain} - continue - } - // We need to look up the first parent - var parent *types.Header - if index == 0 { - parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1) - } else if headers[index-1].Hash() == headers[index].ParentHash { - parent = headers[index-1] - } - // Ensure the validation is useful and execute it - var failure error - switch { - case chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()-1) != nil: - outputs <- result{index: index, err: nil} - case parent == nil: - failure = consensus.ErrUnknownAncestor - outputs <- result{index: index, err: failure} - default: - failure = ethash.verifyHeader(chain, headers[index], parent, false, seals[index]) - outputs <- result{index: index, err: failure} - } - // If a validation failure occurred, mark subsequent blocks invalid - if failure != nil { - number := headers[index].Number.Uint64() - if prev := atomic.LoadUint64(&badblock); prev == 0 || prev > number { - // This two step atomic op isn't thread-safe in that `badblock` might end - // up slightly higher than the block number of the first failure (if many - // workers try to write at the same time), but it's fine as we're mostly - // interested to avoid large useless work, we don't care about 1-2 extra - // runs. Doing "full thread safety" would involve mutexes, which would be - // a noticeable sync overhead on the fast spinning worker routines. - atomic.StoreUint64(&badblock, number) - } - } + errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index) + done <- index } }() } - // Feed item indices to the workers until done, sorting and feeding the results to the caller - dones := make([]bool, len(headers)) - errors := make([]error, len(headers)) - - abort := make(chan struct{}) - returns := make(chan error, len(headers)) + errorsOut := make(chan error, len(headers)) go func() { defer close(inputs) - - input, output := 0, 0 - for i := 0; i < len(headers)*2; i++ { - var res result - - // If there are tasks left, push to workers - if input < len(headers) { - select { - case inputs <- input: - input++ - continue - case <-abort: - return - case res = <-outputs: + var ( + in, out = 0, 0 + checked = make([]bool, len(headers)) + inputs = inputs + ) + for { + select { + case inputs <- in: + if in++; in == len(headers) { + // Reached end of headers. Stop sending to workers. + inputs = nil } - } else { - // Otherwise keep waiting for results - select { - case <-abort: - return - case res = <-outputs: + case index := <-done: + for checked[index] = true; checked[out]; out++ { + errorsOut <- errors[out] + if out == len(headers)-1 { + return + } } - } - // A result arrived, save and propagate if next - dones[res.index], errors[res.index] = true, res.err - for output < len(headers) && dones[output] { - returns <- errors[output] - output++ + case <-abort: + return } } }() - return abort, returns + return abort, errorsOut +} + +func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error { + var parent *types.Header + if index == 0 { + parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1) + } else if headers[index-1].Hash() == headers[index].ParentHash { + parent = headers[index-1] + } + if parent == nil { + return consensus.ErrUnknownAncestor + } + if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil { + return nil // known block + } + return ethash.verifyHeader(chain, headers[index], parent, false, seals[index]) } // VerifyUncles verifies that the given block's uncles conform to the consensus diff --git a/core/dao_test.go b/core/dao_test.go index cb6e54f8f2..bc9f3f3942 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -62,7 +62,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { gspec.MustCommit(db) bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) - blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1)) + blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) for j := 0; j < len(blocks)/2; j++ { blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] } @@ -83,7 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { gspec.MustCommit(db) bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) - blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1)) + blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) for j := 0; j < len(blocks)/2; j++ { blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] } @@ -105,7 +105,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { gspec.MustCommit(db) bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) - blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1)) + blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) for j := 0; j < len(blocks)/2; j++ { blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] } @@ -121,7 +121,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { gspec.MustCommit(db) bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) - blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1)) + blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) for j := 0; j < len(blocks)/2; j++ { blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] }