From a91b704b013aaabb3bfc796f5a3d8c940dfb7f05 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Nov 2019 10:51:57 +0100 Subject: [PATCH] consensus/ethash: refactor remote sealer (#20335) The original idea behind this change was to remove a use of the deprecated CancelRequest method. Simply removing it would've been an option, but I couldn't resist and did a bit of a refactoring instead. All remote sealing code was contained in a single giant function. Remote sealing is now extracted into its own object, remoteSealer. --- consensus/ethash/algorithm_test.go | 2 +- consensus/ethash/api.go | 26 +-- consensus/ethash/ethash.go | 112 +++------ consensus/ethash/sealer.go | 351 +++++++++++++++++------------ consensus/ethash/sealer_test.go | 107 ++++----- 5 files changed, 294 insertions(+), 304 deletions(-) diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index cf8552f3ab..f4f65047be 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -729,7 +729,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() - ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil, false) + ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal, nil}, nil, false) defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index 4d8eed4161..68b3a84b09 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -28,7 +28,7 @@ var errEthashStopped = errors.New("ethash stopped") // API exposes ethash related methods for the RPC interface. type API struct { - ethash *Ethash // Make sure the mode of ethash is normal. + ethash *Ethash } // GetWork returns a work package for external miner. @@ -39,7 +39,7 @@ type API struct { // result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty // result[3] - hex encoded block number func (api *API) GetWork() ([4]string, error) { - if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + if api.ethash.remote == nil { return [4]string{}, errors.New("not supported") } @@ -47,13 +47,11 @@ func (api *API) GetWork() ([4]string, error) { workCh = make(chan [4]string, 1) errc = make(chan error, 1) ) - select { - case api.ethash.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: - case <-api.ethash.exitCh: + case api.ethash.remote.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: + case <-api.ethash.remote.exitCh: return [4]string{}, errEthashStopped } - select { case work := <-workCh: return work, nil @@ -66,23 +64,21 @@ func (api *API) GetWork() ([4]string, error) { // It returns an indication if the work was accepted. // Note either an invalid solution, a stale work a non-existent work will return false. func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool { - if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + if api.ethash.remote == nil { return false } var errc = make(chan error, 1) - select { - case api.ethash.submitWorkCh <- &mineResult{ + case api.ethash.remote.submitWorkCh <- &mineResult{ nonce: nonce, mixDigest: digest, hash: hash, errc: errc, }: - case <-api.ethash.exitCh: + case <-api.ethash.remote.exitCh: return false } - err := <-errc return err == nil } @@ -94,21 +90,19 @@ func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) boo // It accepts the miner hash rate and an identifier which must be unique // between nodes. func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { - if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + if api.ethash.remote == nil { return false } var done = make(chan struct{}, 1) - select { - case api.ethash.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}: - case <-api.ethash.exitCh: + case api.ethash.remote.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}: + case <-api.ethash.remote.exitCh: return false } // Block until hash rate submitted successfully. <-done - return true } diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 78892e1da8..4bf78b0a1d 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -34,9 +34,7 @@ import ( "unsafe" mmap "github.com/edsrzf/mmap-go" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" @@ -50,7 +48,7 @@ var ( two256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) // sharedEthash is a full instance that can be shared between multiple users. - sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil, false) + sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal, nil}, nil, false) // algorithmRevision is the data structure version used for file naming. algorithmRevision = 23 @@ -403,36 +401,8 @@ type Config struct { DatasetsInMem int DatasetsOnDisk int PowMode Mode -} - -// sealTask wraps a seal block with relative result channel for remote sealer thread. -type sealTask struct { - block *types.Block - results chan<- *types.Block -} - -// mineResult wraps the pow solution parameters for the specified block. -type mineResult struct { - nonce types.BlockNonce - mixDigest common.Hash - hash common.Hash - - errc chan error -} - -// hashrate wraps the hash rate submitted by the remote sealer. -type hashrate struct { - id common.Hash - ping time.Time - rate uint64 - done chan struct{} -} - -// sealWork wraps a seal work package for remote sealer. -type sealWork struct { - errc chan error - res chan [4]string + Log log.Logger `toml:"-"` } // Ethash is a consensus engine based on proof-of-work implementing the ethash @@ -448,52 +418,42 @@ type Ethash struct { threads int // Number of threads to mine on if mining update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate - - // Remote sealer related fields - workCh chan *sealTask // Notification channel to push new work and relative result channel to remote sealer - fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work - submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result - fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. - submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + remote *remoteSealer // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode fakeDelay time.Duration // Time delay to sleep for before returning from verify - lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields - closeOnce sync.Once // Ensures exit channel will not be closed twice. - exitCh chan chan error // Notification channel to exiting backend threads + lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + closeOnce sync.Once // Ensures exit channel will not be closed twice. } // New creates a full sized ethash PoW scheme and starts a background thread for // remote mining, also optionally notifying a batch of remote services of new work // packages. func New(config Config, notify []string, noverify bool) *Ethash { + if config.Log == nil { + config.Log = log.Root() + } if config.CachesInMem <= 0 { - log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) + config.Log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) config.CachesInMem = 1 } if config.CacheDir != "" && config.CachesOnDisk > 0 { - log.Info("Disk storage enabled for ethash caches", "dir", config.CacheDir, "count", config.CachesOnDisk) + config.Log.Info("Disk storage enabled for ethash caches", "dir", config.CacheDir, "count", config.CachesOnDisk) } if config.DatasetDir != "" && config.DatasetsOnDisk > 0 { - log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) + config.Log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) } ethash := &Ethash{ - config: config, - caches: newlru("cache", config.CachesInMem, newCache), - datasets: newlru("dataset", config.DatasetsInMem, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeterForced(), - workCh: make(chan *sealTask), - fetchWorkCh: make(chan *sealWork), - submitWorkCh: make(chan *mineResult), - fetchRateCh: make(chan chan uint64), - submitRateCh: make(chan *hashrate), - exitCh: make(chan chan error), - } - go ethash.remote(notify, noverify) + config: config, + caches: newlru("cache", config.CachesInMem, newCache), + datasets: newlru("dataset", config.DatasetsInMem, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeterForced(), + } + ethash.remote = startRemoteSealer(ethash, notify, noverify) return ethash } @@ -501,19 +461,13 @@ func New(config Config, notify []string, noverify bool) *Ethash { // purposes. func NewTester(notify []string, noverify bool) *Ethash { ethash := &Ethash{ - config: Config{PowMode: ModeTest}, - caches: newlru("cache", 1, newCache), - datasets: newlru("dataset", 1, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeterForced(), - workCh: make(chan *sealTask), - fetchWorkCh: make(chan *sealWork), - submitWorkCh: make(chan *mineResult), - fetchRateCh: make(chan chan uint64), - submitRateCh: make(chan *hashrate), - exitCh: make(chan chan error), - } - go ethash.remote(notify, noverify) + config: Config{PowMode: ModeTest, Log: log.Root()}, + caches: newlru("cache", 1, newCache), + datasets: newlru("dataset", 1, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeterForced(), + } + ethash.remote = startRemoteSealer(ethash, notify, noverify) return ethash } @@ -524,6 +478,7 @@ func NewFaker() *Ethash { return &Ethash{ config: Config{ PowMode: ModeFake, + Log: log.Root(), }, } } @@ -535,6 +490,7 @@ func NewFakeFailer(fail uint64) *Ethash { return &Ethash{ config: Config{ PowMode: ModeFake, + Log: log.Root(), }, fakeFail: fail, } @@ -547,6 +503,7 @@ func NewFakeDelayer(delay time.Duration) *Ethash { return &Ethash{ config: Config{ PowMode: ModeFake, + Log: log.Root(), }, fakeDelay: delay, } @@ -558,6 +515,7 @@ func NewFullFaker() *Ethash { return &Ethash{ config: Config{ PowMode: ModeFullFake, + Log: log.Root(), }, } } @@ -573,13 +531,11 @@ func (ethash *Ethash) Close() error { var err error ethash.closeOnce.Do(func() { // Short circuit if the exit channel is not allocated. - if ethash.exitCh == nil { + if ethash.remote == nil { return } - errc := make(chan error) - ethash.exitCh <- errc - err = <-errc - close(ethash.exitCh) + close(ethash.remote.requestExit) + <-ethash.remote.exitCh }) return err } @@ -680,8 +636,8 @@ func (ethash *Ethash) Hashrate() float64 { var res = make(chan uint64, 1) select { - case ethash.fetchRateCh <- res: - case <-ethash.exitCh: + case ethash.remote.fetchRateCh <- res: + case <-ethash.remote.exitCh: // Return local hashrate only if ethash is stopped. return ethash.hashrate.Rate1() } diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 43db1fcb7f..52c4ed46dc 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -18,6 +18,7 @@ package ethash import ( "bytes" + "context" crand "crypto/rand" "encoding/json" "errors" @@ -33,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" ) const ( @@ -56,7 +56,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, resu select { case results <- block.WithSeal(header): default: - log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header())) + ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header())) } return nil } @@ -85,8 +85,8 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, resu threads = 0 // Allows disabling local mining without extra logic around local/remote } // Push new work to remote sealer - if ethash.workCh != nil { - ethash.workCh <- &sealTask{block: block, results: results} + if ethash.remote != nil { + ethash.remote.workCh <- &sealTask{block: block, results: results} } var ( pend sync.WaitGroup @@ -111,14 +111,14 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, resu select { case results <- result: default: - log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header())) + ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header())) } close(abort) case <-ethash.update: // Thread count was changed on user request, restart close(abort) if err := ethash.Seal(chain, block, results, stop); err != nil { - log.Error("Failed to restart sealing after update", "err", err) + ethash.config.Log.Error("Failed to restart sealing after update", "err", err) } } // Wait for all miners to terminate and return the block @@ -143,7 +143,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s attempts = int64(0) nonce = seed ) - logger := log.New("miner", id) + logger := ethash.config.Log.New("miner", id) logger.Trace("Started ethash search for new nonces", "seed", seed) search: for { @@ -186,160 +186,128 @@ search: runtime.KeepAlive(dataset) } -// remote is a standalone goroutine to handle remote mining related stuff. -func (ethash *Ethash) remote(notify []string, noverify bool) { - var ( - works = make(map[common.Hash]*types.Block) - rates = make(map[common.Hash]hashrate) +// This is the timeout for HTTP requests to notify external miners. +const remoteSealerTimeout = 1 * time.Second - results chan<- *types.Block - currentBlock *types.Block - currentWork [4]string +type remoteSealer struct { + works map[common.Hash]*types.Block + rates map[common.Hash]hashrate + currentBlock *types.Block + currentWork [4]string + notifyCtx context.Context + cancelNotify context.CancelFunc // cancels all notification requests + reqWG sync.WaitGroup // tracks notification request goroutines - notifyTransport = &http.Transport{} - notifyClient = &http.Client{ - Transport: notifyTransport, - Timeout: time.Second, - } - notifyReqs = make([]*http.Request, len(notify)) - ) - // notifyWork notifies all the specified mining endpoints of the availability of - // new work to be processed. - notifyWork := func() { - work := currentWork - blob, _ := json.Marshal(work) - - for i, url := range notify { - // Terminate any previously pending request and create the new work - if notifyReqs[i] != nil { - notifyTransport.CancelRequest(notifyReqs[i]) - } - notifyReqs[i], _ = http.NewRequest("POST", url, bytes.NewReader(blob)) - notifyReqs[i].Header.Set("Content-Type", "application/json") - - // Push the new work concurrently to all the remote nodes - go func(req *http.Request, url string) { - res, err := notifyClient.Do(req) - if err != nil { - log.Warn("Failed to notify remote miner", "err", err) - } else { - log.Trace("Notified remote miner", "miner", url, "hash", log.Lazy{Fn: func() common.Hash { return common.HexToHash(work[0]) }}, "target", work[2]) - res.Body.Close() - } - }(notifyReqs[i], url) - } - } - // makeWork creates a work package for external miner. - // - // The work package consists of 3 strings: - // result[0], 32 bytes hex encoded current block header pow-hash - // result[1], 32 bytes hex encoded seed hash used for DAG - // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty - // result[3], hex encoded block number - makeWork := func(block *types.Block) { - hash := ethash.SealHash(block.Header()) - - currentWork[0] = hash.Hex() - currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() - currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() - currentWork[3] = hexutil.EncodeBig(block.Number()) - - // Trace the seal work fetched by remote sealer. - currentBlock = block - works[hash] = block - } - // submitWork verifies the submitted pow solution, returning - // whether the solution was accepted or not (not can be both a bad pow as well as - // any other error, like no pending work or stale mining result). - submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, sealhash common.Hash) bool { - if currentBlock == nil { - log.Error("Pending work without block", "sealhash", sealhash) - return false - } - // Make sure the work submitted is present - block := works[sealhash] - if block == nil { - log.Warn("Work submitted but none pending", "sealhash", sealhash, "curnumber", currentBlock.NumberU64()) - return false - } - // Verify the correctness of submitted result. - header := block.Header() - header.Nonce = nonce - header.MixDigest = mixDigest - - start := time.Now() - if !noverify { - if err := ethash.verifySeal(nil, header, true); err != nil { - log.Warn("Invalid proof-of-work submitted", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start)), "err", err) - return false - } - } - // Make sure the result channel is assigned. - if results == nil { - log.Warn("Ethash result channel is empty, submitted mining result is rejected") - return false - } - log.Trace("Verified correct proof-of-work", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start))) + ethash *Ethash + noverify bool + notifyURLs []string + results chan<- *types.Block + workCh chan *sealTask // Notification channel to push new work and relative result channel to remote sealer + fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work + submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result + fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. + submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + requestExit chan struct{} + exitCh chan struct{} +} - // Solutions seems to be valid, return to the miner and notify acceptance. - solution := block.WithSeal(header) +// sealTask wraps a seal block with relative result channel for remote sealer thread. +type sealTask struct { + block *types.Block + results chan<- *types.Block +} - // The submitted solution is within the scope of acceptance. - if solution.NumberU64()+staleThreshold > currentBlock.NumberU64() { - select { - case results <- solution: - log.Debug("Work submitted is acceptable", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) - return true - default: - log.Warn("Sealing result is not read by miner", "mode", "remote", "sealhash", sealhash) - return false - } - } - // The submitted block is too old to accept, drop it. - log.Warn("Work submitted is too old", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) - return false +// mineResult wraps the pow solution parameters for the specified block. +type mineResult struct { + nonce types.BlockNonce + mixDigest common.Hash + hash common.Hash + + errc chan error +} + +// hashrate wraps the hash rate submitted by the remote sealer. +type hashrate struct { + id common.Hash + ping time.Time + rate uint64 + + done chan struct{} +} + +// sealWork wraps a seal work package for remote sealer. +type sealWork struct { + errc chan error + res chan [4]string +} + +func startRemoteSealer(ethash *Ethash, urls []string, noverify bool) *remoteSealer { + ctx, cancel := context.WithCancel(context.Background()) + s := &remoteSealer{ + ethash: ethash, + noverify: noverify, + notifyURLs: urls, + notifyCtx: ctx, + cancelNotify: cancel, + works: make(map[common.Hash]*types.Block), + rates: make(map[common.Hash]hashrate), + workCh: make(chan *sealTask), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + requestExit: make(chan struct{}), + exitCh: make(chan struct{}), } + go s.loop() + return s +} + +func (s *remoteSealer) loop() { + defer func() { + s.ethash.config.Log.Trace("Ethash remote sealer is exiting") + s.cancelNotify() + s.reqWG.Wait() + close(s.exitCh) + }() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { - case work := <-ethash.workCh: + case work := <-s.workCh: // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. - results = work.results + s.results = work.results + s.makeWork(work.block) + s.notifyWork() - makeWork(work.block) - - // Notify and requested URLs of the new work availability - notifyWork() - - case work := <-ethash.fetchWorkCh: + case work := <-s.fetchWorkCh: // Return current mining work to remote miner. - if currentBlock == nil { + if s.currentBlock == nil { work.errc <- errNoMiningWork } else { - work.res <- currentWork + work.res <- s.currentWork } - case result := <-ethash.submitWorkCh: + case result := <-s.submitWorkCh: // Verify submitted PoW solution based on maintained mining blocks. - if submitWork(result.nonce, result.mixDigest, result.hash) { + if s.submitWork(result.nonce, result.mixDigest, result.hash) { result.errc <- nil } else { result.errc <- errInvalidSealResult } - case result := <-ethash.submitRateCh: + case result := <-s.submitRateCh: // Trace remote sealer's hash rate by submitted value. - rates[result.id] = hashrate{rate: result.rate, ping: time.Now()} + s.rates[result.id] = hashrate{rate: result.rate, ping: time.Now()} close(result.done) - case req := <-ethash.fetchRateCh: + case req := <-s.fetchRateCh: // Gather all hash rate submitted by remote sealer. var total uint64 - for _, rate := range rates { + for _, rate := range s.rates { // this could overflow total += rate.rate } @@ -347,25 +315,126 @@ func (ethash *Ethash) remote(notify []string, noverify bool) { case <-ticker.C: // Clear stale submitted hash rate. - for id, rate := range rates { + for id, rate := range s.rates { if time.Since(rate.ping) > 10*time.Second { - delete(rates, id) + delete(s.rates, id) } } // Clear stale pending blocks - if currentBlock != nil { - for hash, block := range works { - if block.NumberU64()+staleThreshold <= currentBlock.NumberU64() { - delete(works, hash) + if s.currentBlock != nil { + for hash, block := range s.works { + if block.NumberU64()+staleThreshold <= s.currentBlock.NumberU64() { + delete(s.works, hash) } } } - case errc := <-ethash.exitCh: - // Exit remote loop if ethash is closed and return relevant error. - errc <- nil - log.Trace("Ethash remote sealer is exiting") + case <-s.requestExit: return } } } + +// makeWork creates a work package for external miner. +// +// The work package consists of 3 strings: +// result[0], 32 bytes hex encoded current block header pow-hash +// result[1], 32 bytes hex encoded seed hash used for DAG +// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +// result[3], hex encoded block number +func (s *remoteSealer) makeWork(block *types.Block) { + hash := s.ethash.SealHash(block.Header()) + s.currentWork[0] = hash.Hex() + s.currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() + s.currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() + s.currentWork[3] = hexutil.EncodeBig(block.Number()) + + // Trace the seal work fetched by remote sealer. + s.currentBlock = block + s.works[hash] = block +} + +// notifyWork notifies all the specified mining endpoints of the availability of +// new work to be processed. +func (s *remoteSealer) notifyWork() { + work := s.currentWork + blob, _ := json.Marshal(work) + s.reqWG.Add(len(s.notifyURLs)) + for _, url := range s.notifyURLs { + go s.sendNotification(s.notifyCtx, url, blob, work) + } +} + +func (s *remoteSealer) sendNotification(ctx context.Context, url string, json []byte, work [4]string) { + defer s.reqWG.Done() + + req, err := http.NewRequest("POST", url, bytes.NewReader(json)) + if err != nil { + s.ethash.config.Log.Warn("Can't create remote miner notification", "err", err) + return + } + ctx, cancel := context.WithTimeout(ctx, remoteSealerTimeout) + defer cancel() + req = req.WithContext(ctx) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + s.ethash.config.Log.Warn("Failed to notify remote miner", "err", err) + } else { + s.ethash.config.Log.Trace("Notified remote miner", "miner", url, "hash", work[0], "target", work[2]) + resp.Body.Close() + } +} + +// submitWork verifies the submitted pow solution, returning +// whether the solution was accepted or not (not can be both a bad pow as well as +// any other error, like no pending work or stale mining result). +func (s *remoteSealer) submitWork(nonce types.BlockNonce, mixDigest common.Hash, sealhash common.Hash) bool { + if s.currentBlock == nil { + s.ethash.config.Log.Error("Pending work without block", "sealhash", sealhash) + return false + } + // Make sure the work submitted is present + block := s.works[sealhash] + if block == nil { + s.ethash.config.Log.Warn("Work submitted but none pending", "sealhash", sealhash, "curnumber", s.currentBlock.NumberU64()) + return false + } + // Verify the correctness of submitted result. + header := block.Header() + header.Nonce = nonce + header.MixDigest = mixDigest + + start := time.Now() + if !s.noverify { + if err := s.ethash.verifySeal(nil, header, true); err != nil { + s.ethash.config.Log.Warn("Invalid proof-of-work submitted", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start)), "err", err) + return false + } + } + // Make sure the result channel is assigned. + if s.results == nil { + s.ethash.config.Log.Warn("Ethash result channel is empty, submitted mining result is rejected") + return false + } + s.ethash.config.Log.Trace("Verified correct proof-of-work", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start))) + + // Solutions seems to be valid, return to the miner and notify acceptance. + solution := block.WithSeal(header) + + // The submitted solution is within the scope of acceptance. + if solution.NumberU64()+staleThreshold > s.currentBlock.NumberU64() { + select { + case s.results <- solution: + s.ethash.config.Log.Debug("Work submitted is acceptable", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) + return true + default: + s.ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "remote", "sealhash", sealhash) + return false + } + } + // The submitted block is too old to accept, drop it. + s.ethash.config.Log.Warn("Work submitted is too old", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) + return false +} diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go index 82f08d673c..7f83def072 100644 --- a/consensus/ethash/sealer_test.go +++ b/consensus/ethash/sealer_test.go @@ -20,59 +20,39 @@ import ( "encoding/json" "io/ioutil" "math/big" - "net" "net/http" + "net/http/httptest" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/testlog" + "github.com/ethereum/go-ethereum/log" ) // Tests whether remote HTTP servers are correctly notified of new work. func TestRemoteNotify(t *testing.T) { - // Start a simple webserver to capture notifications + // Start a simple web server to capture notifications. sink := make(chan [3]string) - - server := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - blob, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Fatalf("failed to read miner notification: %v", err) - } - var work [3]string - if err := json.Unmarshal(blob, &work); err != nil { - t.Fatalf("failed to unmarshal miner notification: %v", err) - } - sink <- work - }), - } - // Open a custom listener to extract its local address - listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to open notification server: %v", err) - } - defer listener.Close() - - go server.Serve(listener) - - // Wait for server to start listening - var tries int - for tries = 0; tries < 10; tries++ { - conn, _ := net.DialTimeout("tcp", listener.Addr().String(), 1*time.Second) - if conn != nil { - break + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Errorf("failed to read miner notification: %v", err) } - } - if tries == 10 { - t.Fatal("tcp listener not ready for more than 10 seconds") - } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Errorf("failed to unmarshal miner notification: %v", err) + } + sink <- work + })) + defer server.Close() - // Create the custom ethash engine - ethash := NewTester([]string{"http://" + listener.Addr().String()}, false) + // Create the custom ethash engine. + ethash := NewTester([]string{server.URL}, false) defer ethash.Close() - // Stream a work task and ensure the notification bubbles out + // Stream a work task and ensure the notification bubbles out. header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) @@ -97,46 +77,37 @@ func TestRemoteNotify(t *testing.T) { // Tests that pushing work packages fast to the miner doesn't cause any data race // issues in the notifications. func TestRemoteMultiNotify(t *testing.T) { - // Start a simple webserver to capture notifications + // Start a simple web server to capture notifications. sink := make(chan [3]string, 64) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Errorf("failed to read miner notification: %v", err) + } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Errorf("failed to unmarshal miner notification: %v", err) + } + sink <- work + })) + defer server.Close() - server := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - blob, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Fatalf("failed to read miner notification: %v", err) - } - var work [3]string - if err := json.Unmarshal(blob, &work); err != nil { - t.Fatalf("failed to unmarshal miner notification: %v", err) - } - sink <- work - }), - } - // Open a custom listener to extract its local address - listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to open notification server: %v", err) - } - defer listener.Close() - - go server.Serve(listener) - - // Create the custom ethash engine - ethash := NewTester([]string{"http://" + listener.Addr().String()}, false) + // Create the custom ethash engine. + ethash := NewTester([]string{server.URL}, false) + ethash.config.Log = testlog.Logger(t, log.LvlWarn) defer ethash.Close() - // Stream a lot of work task and ensure all the notifications bubble out + // Stream a lot of work task and ensure all the notifications bubble out. for i := 0; i < cap(sink); i++ { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil, nil) } + for i := 0; i < cap(sink); i++ { select { case <-sink: - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatalf("notification %d timed out", i) } } @@ -206,10 +177,10 @@ func TestStaleSubmission(t *testing.T) { select { case res := <-results: if res.Header().Nonce != fakeNonce { - t.Errorf("case %d block nonce mismatch, want %s, get %s", id+1, fakeNonce, res.Header().Nonce) + t.Errorf("case %d block nonce mismatch, want %x, get %x", id+1, fakeNonce, res.Header().Nonce) } if res.Header().MixDigest != fakeDigest { - t.Errorf("case %d block digest mismatch, want %s, get %s", id+1, fakeDigest, res.Header().MixDigest) + t.Errorf("case %d block digest mismatch, want %x, get %x", id+1, fakeDigest, res.Header().MixDigest) } if res.Header().Difficulty.Uint64() != c.headers[c.submitIndex].Difficulty.Uint64() { t.Errorf("case %d block difficulty mismatch, want %d, get %d", id+1, c.headers[c.submitIndex].Difficulty, res.Header().Difficulty)