Logger updates 3 (#3730)

* accounts, cmd, eth, ethdb: port logs over to new system

* ethdb: drop concept of cache distribution between dbs

* eth: fix some log nitpicks to make them nicer
pull/3735/head
Péter Szilágyi 8 years ago committed by Jeffrey Wilcke
parent 82e7c1d124
commit 9184249b39
  1. 9
      accounts/url.go
  2. 2
      cmd/geth/chaincmd.go
  3. 14
      cmd/geth/main.go
  4. 7
      cmd/swarm/cleandb.go
  5. 9
      cmd/swarm/hash.go
  6. 6
      cmd/swarm/main.go
  7. 30
      cmd/swarm/manifest.go
  8. 23
      cmd/swarm/upload.go
  9. 19
      cmd/utils/cmd.go
  10. 6
      cmd/utils/flags.go
  11. 13
      eth/api.go
  12. 34
      eth/backend.go
  13. 4
      eth/bad_block.go
  14. 20
      eth/db_upgrade.go
  15. 64
      eth/downloader/downloader.go
  16. 6
      eth/downloader/peer.go
  17. 58
      eth/fetcher/fetcher.go
  18. 3
      eth/gasprice/gasprice.go
  19. 50
      eth/handler.go
  20. 13
      eth/peer.go
  21. 7
      eth/sync.go
  22. 151
      ethdb/database.go

@ -60,6 +60,15 @@ func (u URL) String() string {
return u.Path
}
// TerminalString implements the log.TerminalStringer interface.
func (u URL) TerminalString() string {
url := u.String()
if len(url) > 32 {
return url[:31] + "…"
}
return url
}
// MarshalJSON implements the json.Marshaller interface.
func (u URL) MarshalJSON() ([]byte, error) {
return json.Marshal(u.String())

@ -117,7 +117,7 @@ func initGenesis(ctx *cli.Context) error {
if err != nil {
utils.Fatalf("failed to write genesis block: %v", err)
}
log.Info(fmt.Sprintf("successfully wrote genesis block and/or chain rule set: %x", block.Hash()))
log.Info("Successfully wrote genesis state", "hash", block.Hash())
return nil
}

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/console"
"github.com/ethereum/go-ethereum/contracts/release"
"github.com/ethereum/go-ethereum/eth"
@ -202,11 +203,10 @@ func makeFullNode(ctx *cli.Context) *node.Node {
}{uint(params.VersionMajor<<16 | params.VersionMinor<<8 | params.VersionPatch), clientIdentifier, runtime.Version(), runtime.GOOS}
extra, err := rlp.EncodeToBytes(clientInfo)
if err != nil {
log.Warn(fmt.Sprint("error setting canonical miner information:", err))
log.Warn("Failed to set canonical miner information", "err", err)
}
if uint64(len(extra)) > params.MaximumExtraDataSize {
log.Warn(fmt.Sprint("error setting canonical miner information: extra exceeds", params.MaximumExtraDataSize))
log.Debug(fmt.Sprintf("extra: %x\n", extra))
log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize)
extra = nil
}
stack := utils.MakeNode(ctx, clientIdentifier, gitCommit)
@ -271,7 +271,7 @@ func startNode(ctx *cli.Context, stack *node.Node) {
// Open and self derive any wallets already attached
for _, wallet := range stack.AccountManager().Wallets() {
if err := wallet.Open(""); err != nil {
log.Warn(fmt.Sprintf("Failed to open wallet %s: %v", wallet.URL(), err))
log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
} else {
wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader)
}
@ -280,13 +280,13 @@ func startNode(ctx *cli.Context, stack *node.Node) {
for event := range events {
if event.Arrive {
if err := event.Wallet.Open(""); err != nil {
log.Info(fmt.Sprintf("New wallet appeared: %s, failed to open: %s", event.Wallet.URL(), err))
log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
} else {
log.Info(fmt.Sprintf("New wallet appeared: %s, %s", event.Wallet.URL(), event.Wallet.Status()))
log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", event.Wallet.Status())
event.Wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader)
}
} else {
log.Info(fmt.Sprintf("Old wallet dropped: %s", event.Wallet.URL()))
log.Info("Old wallet dropped", "url", event.Wallet.URL())
event.Wallet.Close()
}
}

@ -17,8 +17,7 @@
package main
import (
"log"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
@ -26,14 +25,14 @@ import (
func cleandb(ctx *cli.Context) {
args := ctx.Args()
if len(args) != 1 {
log.Fatal("need path to chunks database as the first and only argument")
utils.Fatalf("Need path to chunks database as the first and only argument")
}
chunkDbPath := args[0]
hash := storage.MakeHashFunc("SHA3")
dbStore, err := storage.NewDbStore(chunkDbPath, hash, 10000000, 0)
if err != nil {
log.Fatalf("cannot initialise dbstore: %v", err)
utils.Fatalf("Cannot initialise dbstore: %v", err)
}
dbStore.Cleanup()
}

@ -19,9 +19,9 @@ package main
import (
"fmt"
"log"
"os"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
@ -29,12 +29,11 @@ import (
func hash(ctx *cli.Context) {
args := ctx.Args()
if len(args) < 1 {
log.Fatal("Usage: swarm hash <file name>")
utils.Fatalf("Usage: swarm hash <file name>")
}
f, err := os.Open(args[0])
if err != nil {
fmt.Println("Error opening file " + args[1])
os.Exit(1)
utils.Fatalf("Error opening file " + args[1])
}
defer f.Close()
@ -42,7 +41,7 @@ func hash(ctx *cli.Context) {
chunker := storage.NewTreeChunker(storage.NewChunkerParams())
key, err := chunker.Split(f, stat.Size(), nil, nil, nil)
if err != nil {
log.Fatalf("%v\n", err)
utils.Fatalf("%v\n", err)
} else {
fmt.Printf("%v\n", key)
}

@ -277,7 +277,7 @@ func bzzd(ctx *cli.Context) error {
signal.Notify(sigc, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
log.Info(fmt.Sprint("Got sigterm, shutting down..."))
log.Info("Got sigterm, shutting swarm down...")
stack.Stop()
}()
networkId := ctx.GlobalUint64(SwarmNetworkIdFlag.Name)
@ -342,7 +342,7 @@ func getAccount(ctx *cli.Context, stack *node.Node) *ecdsa.PrivateKey {
}
// Try to load the arg as a hex key file.
if key, err := crypto.LoadECDSA(keyid); err == nil {
log.Info(fmt.Sprintf("swarm account key loaded: %#x", crypto.PubkeyToAddress(key.PublicKey)))
log.Info("Swarm account key loaded", "address", crypto.PubkeyToAddress(key.PublicKey))
return key
}
// Otherwise try getting it from the keystore.
@ -399,7 +399,7 @@ func injectBootnodes(srv *p2p.Server, nodes []string) {
for _, url := range nodes {
n, err := discover.ParseNode(url)
if err != nil {
log.Error(fmt.Sprintf("invalid bootnode %q", err))
log.Error("Invalid swarm bootnode", "err", err)
continue
}
srv.AddPeer(n)

@ -20,19 +20,18 @@ package main
import (
"encoding/json"
"fmt"
"log"
"mime"
"path/filepath"
"strings"
"github.com/ethereum/go-ethereum/cmd/utils"
"gopkg.in/urfave/cli.v1"
)
func add(ctx *cli.Context) {
args := ctx.Args()
if len(args) < 3 {
log.Fatal("need atleast three arguments <MHASH> <path> <HASH> [<content-type>]")
utils.Fatalf("Need atleast three arguments <MHASH> <path> <HASH> [<content-type>]")
}
var (
@ -66,7 +65,7 @@ func update(ctx *cli.Context) {
args := ctx.Args()
if len(args) < 3 {
log.Fatal("need atleast three arguments <MHASH> <path> <HASH>")
utils.Fatalf("Need atleast three arguments <MHASH> <path> <HASH>")
}
var (
@ -98,7 +97,7 @@ func update(ctx *cli.Context) {
func remove(ctx *cli.Context) {
args := ctx.Args()
if len(args) < 2 {
log.Fatal("need atleast two arguments <MHASH> <path>")
utils.Fatalf("Need atleast two arguments <MHASH> <path>")
}
var (
@ -134,19 +133,19 @@ func addEntryToManifest(ctx *cli.Context, mhash, path, hash, ctype string) strin
mroot, err := client.downloadManifest(mhash)
if err != nil {
log.Fatalln("manifest download failed:", err)
utils.Fatalf("Manifest download failed: %v", err)
}
//TODO: check if the "hash" to add is valid and present in swarm
_, err = client.downloadManifest(hash)
if err != nil {
log.Fatalln("hash to add is not present:", err)
utils.Fatalf("Hash to add is not present: %v", err)
}
// See if we path is in this Manifest or do we have to dig deeper
for _, entry := range mroot.Entries {
if path == entry.Path {
log.Fatal(path, "Already present, not adding anything")
utils.Fatalf("Path %s already present, not adding anything", path)
} else {
if entry.ContentType == "application/bzz-manifest+json" {
prfxlen := strings.HasPrefix(path, entry.Path)
@ -183,7 +182,7 @@ func addEntryToManifest(ctx *cli.Context, mhash, path, hash, ctype string) strin
newManifestHash, err := client.uploadManifest(mroot)
if err != nil {
log.Fatalln("manifest upload failed:", err)
utils.Fatalf("Manifest upload failed: %v", err)
}
return newManifestHash
@ -208,7 +207,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st
mroot, err := client.downloadManifest(mhash)
if err != nil {
log.Fatalln("manifest download failed:", err)
utils.Fatalf("Manifest download failed: %v", err)
}
//TODO: check if the "hash" with which to update is valid and present in swarm
@ -228,7 +227,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st
}
if longestPathEntry.Path == "" && newEntry.Path == "" {
log.Fatal(path, " Path not present in the Manifest, not setting anything")
utils.Fatalf("Path %s not present in the Manifest, not setting anything", path)
}
if longestPathEntry.Path != "" {
@ -268,7 +267,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st
newManifestHash, err := client.uploadManifest(mroot)
if err != nil {
log.Fatalln("manifest upload failed:", err)
utils.Fatalf("Manifest upload failed: %v", err)
}
return newManifestHash
}
@ -292,7 +291,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string {
mroot, err := client.downloadManifest(mhash)
if err != nil {
log.Fatalln("manifest download failed:", err)
utils.Fatalf("Manifest download failed: %v", err)
}
// See if we path is in this Manifest or do we have to dig deeper
@ -310,7 +309,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string {
}
if longestPathEntry.Path == "" && entryToRemove.Path == "" {
log.Fatal(path, "Path not present in the Manifest, not removing anything")
utils.Fatalf("Path %s not present in the Manifest, not removing anything", path)
}
if longestPathEntry.Path != "" {
@ -342,8 +341,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string {
newManifestHash, err := client.uploadManifest(mroot)
if err != nil {
log.Fatalln("manifest upload failed:", err)
utils.Fatalf("Manifest upload failed: %v", err)
}
return newManifestHash
}

@ -23,7 +23,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"net/http"
"os"
@ -32,6 +31,8 @@ import (
"path/filepath"
"strings"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/urfave/cli.v1"
)
@ -44,7 +45,7 @@ func upload(ctx *cli.Context) {
defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name)
)
if len(args) != 1 {
log.Fatal("need filename as the first and only argument")
utils.Fatalf("Need filename as the first and only argument")
}
var (
@ -53,25 +54,25 @@ func upload(ctx *cli.Context) {
)
fi, err := os.Stat(expandPath(file))
if err != nil {
log.Fatal(err)
utils.Fatalf("Failed to stat file: %v", err)
}
if fi.IsDir() {
if !recursive {
log.Fatal("argument is a directory and recursive upload is disabled")
utils.Fatalf("Argument is a directory and recursive upload is disabled")
}
if !wantManifest {
log.Fatal("manifest is required for directory uploads")
utils.Fatalf("Manifest is required for directory uploads")
}
mhash, err := client.uploadDirectory(file, defaultPath)
if err != nil {
log.Fatal(err)
utils.Fatalf("Failed to upload directory: %v", err)
}
fmt.Println(mhash)
return
}
entry, err := client.uploadFile(file, fi)
if err != nil {
log.Fatalln("upload failed:", err)
utils.Fatalf("Upload failed: %v", err)
}
mroot := manifest{[]manifestEntry{entry}}
if !wantManifest {
@ -82,7 +83,7 @@ func upload(ctx *cli.Context) {
}
hash, err := client.uploadManifest(mroot)
if err != nil {
log.Fatalln("manifest upload failed:", err)
utils.Fatalf("Manifest upload failed: %v", err)
}
fmt.Println(hash)
}
@ -173,7 +174,7 @@ func (c *client) uploadFileContent(file string, fi os.FileInfo) (string, error)
return "", err
}
defer fd.Close()
log.Printf("uploading file %s (%d bytes)", file, fi.Size())
log.Info("Uploading swarm content", "file", file, "bytes", fi.Size())
return c.postRaw("application/octet-stream", fi.Size(), fd)
}
@ -182,7 +183,7 @@ func (c *client) uploadManifest(m manifest) (string, error) {
if err != nil {
panic(err)
}
log.Println("uploading manifest")
log.Info("Uploading swarm manifest")
return c.postRaw("application/json", int64(len(jsm)), ioutil.NopCloser(bytes.NewReader(jsm)))
}
@ -192,7 +193,7 @@ func (c *client) uploadToManifest(mhash string, path string, fpath string, fi os
return "", err
}
defer fd.Close()
log.Printf("uploading file %s (%d bytes) and adding path %v", fpath, fi.Size(), path)
log.Info("Uploading swarm content and path", "file", fpath, "bytes", fi.Size(), "path", path)
req, err := http.NewRequest("PUT", c.api+"/bzz:/"+mhash+"/"+path, fd)
if err != nil {
return "", err

@ -67,12 +67,12 @@ func StartNode(stack *node.Node) {
signal.Notify(sigc, os.Interrupt)
defer signal.Stop(sigc)
<-sigc
log.Info(fmt.Sprint("Got interrupt, shutting down..."))
log.Info("Got interrupt, shutting down...")
go stack.Stop()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Info(fmt.Sprintf("Already shutting down, interrupt %d more times for panic.", i-1))
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
debug.Exit() // ensure trace and CPU profile data is flushed.
@ -90,7 +90,7 @@ func ImportChain(chain *core.BlockChain, fn string) error {
defer close(interrupt)
go func() {
if _, ok := <-interrupt; ok {
log.Info(fmt.Sprint("caught interrupt during import, will stop at next batch"))
log.Info("Interrupted during import, stopping at next batch")
}
close(stop)
}()
@ -103,7 +103,7 @@ func ImportChain(chain *core.BlockChain, fn string) error {
}
}
log.Info(fmt.Sprint("Importing blockchain ", fn))
log.Info("Importing blockchain", "file", fn)
fh, err := os.Open(fn)
if err != nil {
return err
@ -151,8 +151,7 @@ func ImportChain(chain *core.BlockChain, fn string) error {
return fmt.Errorf("interrupted")
}
if hasAllBlocks(chain, blocks[:i]) {
log.Info(fmt.Sprintf("skipping batch %d, all blocks present [%x / %x]",
batch, blocks[0].Hash().Bytes()[:4], blocks[i-1].Hash().Bytes()[:4]))
log.Info("Skipping batch as all blocks present", "batch", batch, "first", blocks[0].Hash(), "last", blocks[i-1].Hash())
continue
}
@ -173,7 +172,7 @@ func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool {
}
func ExportChain(blockchain *core.BlockChain, fn string) error {
log.Info(fmt.Sprint("Exporting blockchain to ", fn))
log.Info("Exporting blockchain", "file", fn)
fh, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
if err != nil {
return err
@ -189,13 +188,13 @@ func ExportChain(blockchain *core.BlockChain, fn string) error {
if err := blockchain.Export(writer); err != nil {
return err
}
log.Info(fmt.Sprint("Exported blockchain to ", fn))
log.Info("Exported blockchain", "file", fn)
return nil
}
func ExportAppendChain(blockchain *core.BlockChain, fn string, first uint64, last uint64) error {
log.Info(fmt.Sprint("Exporting blockchain to ", fn))
log.Info("Exporting blockchain", "file", fn)
// TODO verify mode perms
fh, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, os.ModePerm)
if err != nil {
@ -212,6 +211,6 @@ func ExportAppendChain(blockchain *core.BlockChain, fn string, first uint64, las
if err := blockchain.ExportN(writer, first, last); err != nil {
return err
}
log.Info(fmt.Sprint("Exported blockchain to ", fn))
log.Info("Exported blockchain to", "file", fn)
return nil
}

@ -492,7 +492,7 @@ func MakeBootstrapNodes(ctx *cli.Context) []*discover.Node {
for _, url := range urls {
node, err := discover.ParseNode(url)
if err != nil {
log.Error(fmt.Sprintf("Bootstrap URL %s: %v\n", url, err))
log.Error("Bootstrap URL invalid", "enode", url, "err", err)
continue
}
bootnodes = append(bootnodes, node)
@ -512,7 +512,7 @@ func MakeBootstrapNodesV5(ctx *cli.Context) []*discv5.Node {
for _, url := range urls {
node, err := discv5.ParseNode(url)
if err != nil {
log.Error(fmt.Sprintf("Bootstrap URL %s: %v\n", url, err))
log.Error("Bootstrap URL invalid", "enode", url, "err", err)
continue
}
bootnodes = append(bootnodes, node)
@ -609,7 +609,7 @@ func MakeAddress(ks *keystore.KeyStore, account string) (accounts.Account, error
func MakeEtherbase(ks *keystore.KeyStore, ctx *cli.Context) common.Address {
accounts := ks.Accounts()
if !ctx.GlobalIsSet(EtherbaseFlag.Name) && len(accounts) == 0 {
log.Error(fmt.Sprint("WARNING: No etherbase set and no accounts found as default"))
log.Warn("No etherbase set and no accounts found as default")
return common.Address{}
}
etherbase := ctx.GlobalString(EtherbaseFlag.Name)

@ -37,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
@ -103,17 +102,17 @@ func (s *PublicMinerAPI) SubmitWork(nonce types.BlockNonce, solution, digest com
// result[0], 32 bytes hex encoded current block header pow-hash
// result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
func (s *PublicMinerAPI) GetWork() (work [3]string, err error) {
func (s *PublicMinerAPI) GetWork() ([3]string, error) {
if !s.e.IsMining() {
if err := s.e.StartMining(0); err != nil {
return work, err
return [3]string{}, err
}
}
if work, err = s.agent.GetWork(); err == nil {
return
work, err := s.agent.GetWork()
if err != nil {
return work, fmt.Errorf("mining not ready: %v", err)
}
log.Debug(fmt.Sprintf("%v", err))
return work, fmt.Errorf("mining not ready")
return work, nil
}
// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined

@ -179,8 +179,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if err := addMipmapBloomBins(chainDb); err != nil {
return nil, err
}
log.Info(fmt.Sprintf("Protocol Versions: %v, Network Id: %v", ProtocolVersions, config.NetworkId))
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
bcVersion := core.GetBlockChainVersion(chainDb)
@ -198,7 +197,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
log.Info(fmt.Sprint("WARNING: Wrote default ethereum genesis block"))
log.Warn("Wrote default Ethereum genesis block")
}
if config.ChainConfig == nil {
@ -208,7 +207,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.chainConfig = config.ChainConfig
log.Info(fmt.Sprint("Chain config:", eth.chainConfig))
log.Info("Initialised chain configuration", "config", eth.chainConfig)
eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.pow, eth.EventMux(), vm.Config{EnablePreimageRecording: config.EnablePreimageRecording})
if err != nil {
@ -269,7 +268,7 @@ func SetupGenesisBlock(chainDb *ethdb.Database, config *Config) error {
if err != nil {
return err
}
log.Info(fmt.Sprintf("Successfully wrote custom genesis block: %x", block.Hash()))
log.Info("Successfully wrote custom genesis block", "hash", block.Hash())
}
// Load up a test setup if directly injected
if config.TestGenesisState != nil {
@ -288,13 +287,13 @@ func SetupGenesisBlock(chainDb *ethdb.Database, config *Config) error {
func CreatePoW(config *Config) (pow.PoW, error) {
switch {
case config.PowFake:
log.Info(fmt.Sprintf("ethash used in fake mode"))
log.Warn("Ethash used in fake mode")
return pow.PoW(core.FakePow{}), nil
case config.PowTest:
log.Info(fmt.Sprintf("ethash used in test mode"))
log.Warn("Ethash used in test mode")
return ethash.NewForTesting()
case config.PowShared:
log.Info(fmt.Sprintf("ethash used in shared mode"))
log.Warn("Ethash used in shared mode")
return ethash.NewShared(), nil
default:
return ethash.New(), nil
@ -377,9 +376,8 @@ func (self *Ethereum) SetEtherbase(etherbase common.Address) {
func (s *Ethereum) StartMining(threads int) error {
eb, err := s.Etherbase()
if err != nil {
err = fmt.Errorf("Cannot start mining without etherbase address: %v", err)
log.Error(fmt.Sprint(err))
return err
log.Error("Cannot start mining without etherbase", "err", err)
return fmt.Errorf("etherbase missing: %v", err)
}
go s.miner.Start(eb, threads)
return nil
@ -466,14 +464,14 @@ func (self *Ethereum) StartAutoDAG() {
return // already started
}
go func() {
log.Info(fmt.Sprintf("Automatic pregeneration of ethash DAG ON (ethash dir: %s)", ethash.DefaultDir))
log.Info("Pre-generation of ethash DAG on", "dir", ethash.DefaultDir)
var nextEpoch uint64
timer := time.After(0)
self.autodagquit = make(chan bool)
for {
select {
case <-timer:
log.Info(fmt.Sprintf("checking DAG (ethash dir: %s)", ethash.DefaultDir))
log.Info("Checking DAG availability", "dir", ethash.DefaultDir)
currentBlock := self.BlockChain().CurrentBlock().NumberU64()
thisEpoch := currentBlock / epochLength
if nextEpoch <= thisEpoch {
@ -482,19 +480,19 @@ func (self *Ethereum) StartAutoDAG() {
previousDag, previousDagFull := dagFiles(thisEpoch - 1)
os.Remove(filepath.Join(ethash.DefaultDir, previousDag))
os.Remove(filepath.Join(ethash.DefaultDir, previousDagFull))
log.Info(fmt.Sprintf("removed DAG for epoch %d (%s)", thisEpoch-1, previousDag))
log.Info("Removed previous DAG", "epoch", thisEpoch-1, "dag", previousDag)
}
nextEpoch = thisEpoch + 1
dag, _ := dagFiles(nextEpoch)
if _, err := os.Stat(dag); os.IsNotExist(err) {
log.Info(fmt.Sprintf("Pregenerating DAG for epoch %d (%s)", nextEpoch, dag))
log.Info("Pre-generating next DAG", "epoch", nextEpoch, "dag", dag)
err := ethash.MakeDAG(nextEpoch*epochLength, "") // "" -> ethash.DefaultDir
if err != nil {
log.Error(fmt.Sprintf("Error generating DAG for epoch %d (%s)", nextEpoch, dag))
log.Error("Error generating DAG", "epoch", nextEpoch, "dag", dag, "err", err)
return
}
} else {
log.Error(fmt.Sprintf("DAG for epoch %d (%s)", nextEpoch, dag))
log.Warn("DAG already exists", "epoch", nextEpoch, "dag", dag)
}
}
}
@ -512,7 +510,7 @@ func (self *Ethereum) StopAutoDAG() {
close(self.autodagquit)
self.autodagquit = nil
}
log.Info(fmt.Sprintf("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir))
log.Info("Pre-generation of ethash DAG off", "dir", ethash.DefaultDir)
}
// dagFiles(epoch) returns the two alternative DAG filenames (not a path)

@ -65,9 +65,9 @@ func sendBadBlockReport(block *types.Block, err error) {
client := http.Client{Timeout: 8 * time.Second}
resp, err := client.Post(badBlocksURL, "application/json", bytes.NewReader(jsonStr))
if err != nil {
log.Debug(fmt.Sprint(err))
log.Debug("Failed to report bad block", "err", err)
return
}
log.Debug(fmt.Sprintf("Bad Block Report posted (%d)", resp.StatusCode))
log.Debug("Bad block report posted", "status", resp.StatusCode)
resp.Body.Close()
}

@ -49,7 +49,7 @@ func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) {
return nil // empty database, nothing to do
}
log.Info(fmt.Sprintf("Upgrading chain database to use sequential keys"))
log.Warn("Upgrading chain database to use sequential keys")
stopChn := make(chan struct{})
stoppedChn := make(chan struct{})
@ -72,11 +72,11 @@ func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) {
err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn)
}
if err == nil && !stopped {
log.Info(fmt.Sprintf("Database conversion successful"))
log.Info("Database conversion successful")
db.Put(useSequentialKeys, []byte{42})
}
if err != nil {
log.Error(fmt.Sprintf("Database conversion failed: %v", err))
log.Error("Database conversion failed", "err", err)
}
close(stoppedChn)
}()
@ -105,7 +105,7 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e
it.Release()
it = db.(*ethdb.LDBDatabase).NewIterator()
it.Seek(keyPtr)
log.Info(fmt.Sprintf("converting %d canonical numbers...", cnt))
log.Info("Converting canonical numbers", "count", cnt)
}
number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64()
newKey := []byte("h12345678n")
@ -124,7 +124,7 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e
it.Next()
}
if cnt > 0 {
log.Info(fmt.Sprintf("converted %d canonical numbers...", cnt))
log.Info("converted canonical numbers", "count", cnt)
}
return nil, false
}
@ -148,7 +148,7 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool
it.Release()
it = db.(*ethdb.LDBDatabase).NewIterator()
it.Seek(keyPtr)
log.Info(fmt.Sprintf("converting %d blocks...", cnt))
log.Info("Converting blocks", "count", cnt)
}
// convert header, body, td and block receipts
var keyPrefix [38]byte
@ -176,7 +176,7 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool
}
}
if cnt > 0 {
log.Info(fmt.Sprintf("converted %d blocks...", cnt))
log.Info("Converted blocks", "count", cnt)
}
return nil, false
}
@ -203,7 +203,7 @@ func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (e
it.Next()
}
if cnt > 0 {
log.Info(fmt.Sprintf("removed %d orphaned block receipts...", cnt))
log.Info("Removed orphaned block receipts", "count", cnt)
}
return nil, false
}
@ -283,7 +283,7 @@ func addMipmapBloomBins(db ethdb.Database) (err error) {
}
tstart := time.Now()
log.Info(fmt.Sprint("upgrading db log bloom bins"))
log.Warn("Upgrading db log bloom bins")
for i := uint64(0); i <= latestBlock.NumberU64(); i++ {
hash := core.GetCanonicalHash(db, i)
if (hash == common.Hash{}) {
@ -291,6 +291,6 @@ func addMipmapBloomBins(db ethdb.Database) (err error) {
}
core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i))
}
log.Info(fmt.Sprint("upgrade completed in", time.Since(tstart)))
log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart)))
return nil
}

@ -518,7 +518,7 @@ func (d *Downloader) Terminate() {
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
p.logger.Debug("Retrieving remote chain height")
p.log.Debug("Retrieving remote chain height")
// Request the advertised remote head block and wait for the response
head, _ := p.currentHead()
@ -540,15 +540,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.logger.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
}
head := headers[0]
p.logger.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
case <-timeout:
p.logger.Debug("Waiting for head header timed out", "elapsed", ttl)
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, errTimeout
case <-d.bodyCh:
@ -568,7 +568,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.headHeader().Number.Uint64()
p.logger.Debug("Looking for common ancestor", "local", ceil, "remote", height)
p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
if d.mode == FullSync {
ceil = d.headBlock().NumberU64()
} else if d.mode == FastSync {
@ -614,13 +614,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) == 0 {
p.logger.Warn("Empty head header set")
p.log.Warn("Empty head header set")
return 0, errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
for i := 0; i < len(headers); i++ {
if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
p.logger.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
return 0, errInvalidChain
}
}
@ -637,7 +637,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit-1 {
p.logger.Warn("Lied about chain head", "reported", height, "found", number)
p.log.Warn("Lied about chain head", "reported", height, "found", number)
return 0, errStallingPeer
}
break
@ -645,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
case <-timeout:
p.logger.Debug("Waiting for head header timed out", "elapsed", ttl)
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return 0, errTimeout
case <-d.bodyCh:
@ -657,10 +657,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// If the head fetch already found an ancestor, return
if !common.EmptyHash(hash) {
if int64(number) <= floor {
p.logger.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
return 0, errInvalidAncestor
}
p.logger.Debug("Found common ancestor", "number", number, "hash", hash)
p.log.Debug("Found common ancestor", "number", number, "hash", hash)
return number, nil
}
// Ancestor not found, we need to binary search over our chain
@ -692,7 +692,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Make sure the peer actually gave something valid
headers := packer.(*headerPack).headers
if len(headers) != 1 {
p.logger.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return 0, errBadPeer
}
arrived = true
@ -704,13 +704,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.logger.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
}
start = check
case <-timeout:
p.logger.Debug("Waiting for search header timed out", "elapsed", ttl)
p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
return 0, errTimeout
case <-d.bodyCh:
@ -722,10 +722,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
// Ensure valid ancestry and return
if int64(start) <= floor {
p.logger.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
return 0, errInvalidAncestor
}
p.logger.Debug("Found common ancestor", "number", start, "hash", hash)
p.log.Debug("Found common ancestor", "number", start, "hash", hash)
return start, nil
}
@ -738,8 +738,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
p.logger.Debug("Directing header downloads", "origin", from)
defer p.logger.Debug("Header download terminated")
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
@ -756,10 +756,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
timeout.Reset(ttl)
if skeleton {
p.logger.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
} else {
p.logger.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
}
}
@ -788,7 +788,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
}
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
p.logger.Debug("No more headers available")
p.log.Debug("No more headers available")
select {
case d.headerProcCh <- nil:
return nil
@ -802,7 +802,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.logger.Debug("Skeleton chain invalid", "err", err)
p.log.Debug("Skeleton chain invalid", "err", err)
return errInvalidChain
}
headers = filled[proced:]
@ -810,7 +810,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
p.logger.Trace("Scheduling new headers", "count", len(headers), "from", from)
p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
@ -822,7 +822,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
case <-timeout.C:
// Header retrieval timed out, consider the peer bad and drop
p.logger.Debug("Header request timed out", "elapsed", ttl)
p.log.Debug("Header request timed out", "elapsed", ttl)
headerTimeoutMeter.Mark(1)
d.dropPeer(p.id)
@ -1050,11 +1050,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// Issue a log to the user to see what's going on
switch {
case err == nil && packet.Items() == 0:
peer.logger.Trace("Requested data not delivered", "type", kind)
peer.log.Trace("Requested data not delivered", "type", kind)
case err == nil:
peer.logger.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
default:
peer.logger.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
}
}
// Blocks assembled, try to update the progress
@ -1097,10 +1097,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
peer.logger.Trace("Data delivery timed out", "type", kind)
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0)
} else {
peer.logger.Debug("Stalling delivery, dropping", "type", kind)
peer.log.Debug("Stalling delivery, dropping", "type", kind)
d.dropPeer(pid)
}
}
@ -1137,11 +1137,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
continue
}
if request.From > 0 {
peer.logger.Trace("Requesting new batch of data", "type", kind, "from", request.From)
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
} else if len(request.Headers) > 0 {
peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
} else {
peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {

@ -87,7 +87,7 @@ type peer struct {
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
logger log.Logger // Contextual logger to add extra infos to peer logs
log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex
}
@ -110,7 +110,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
getNodeData: getNodeData,
version: version,
logger: logger,
log: logger,
}
}
@ -272,7 +272,7 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
p.logger.Trace("Peer throughput measurements updated",
p.log.Trace("Peer throughput measurements updated",
"hps", p.headerThroughput, "bps", p.blockThroughput,
"rps", p.receiptThroughput, "sps", p.stateThroughput,
"miss", len(p.lacking), "rtt", p.rtt)

@ -19,7 +19,6 @@ package fetcher
import (
"errors"
"fmt"
"math/rand"
"time"
@ -78,8 +77,8 @@ type announce struct {
origin string // Identifier of the peer originating the notification
fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
}
// headerFilterTask represents a batch of headers needing fetcher filtering.
@ -220,7 +219,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
log.Trace(fmt.Sprintf("[eth/62] filtering %d headers", len(headers)))
log.Trace("Filtering headers", "headers", len(headers))
// Send the filter channel to the fetcher
filter := make(chan *headerFilterTask)
@ -248,7 +247,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace(fmt.Sprintf("[eth/62] filtering %d:%d bodies", len(transactions), len(uncles)))
log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
@ -323,14 +322,14 @@ func (f *Fetcher) loop() {
count := f.announces[notification.origin] + 1
if count > hashLimit {
log.Debug(fmt.Sprintf("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit))
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug(fmt.Sprintf("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist))
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
propAnnounceDropMeter.Mark(1)
break
}
@ -380,16 +379,8 @@ func (f *Fetcher) loop() {
}
// Send out all block header requests
for peer, hashes := range request {
if len(hashes) > 0 {
log.Trace("", "msg", log.Lazy{Fn: func() string {
list := "["
for _, hash := range hashes {
list += fmt.Sprintf("%x…, ", hash[:4])
}
list = list[:len(list)-2] + "]"
return fmt.Sprintf("[eth/62] Peer %s: fetching headers %s", peer, list)
}})
}
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() {
@ -422,17 +413,8 @@ func (f *Fetcher) loop() {
}
// Send out all block body requests
for peer, hashes := range request {
if len(hashes) > 0 {
log.Trace("", "msg", log.Lazy{Fn: func() string {
list := "["
for _, hash := range hashes {
list += fmt.Sprintf("%x…, ", hash[:4])
}
list = list[:len(list)-2] + "]"
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
return fmt.Sprintf("[eth/62] Peer %s: fetching bodies %s", peer, list)
}})
}
// Create a closure of the fetch and schedule in on a new thread
if f.completingHook != nil {
f.completingHook(hashes)
@ -465,7 +447,7 @@ func (f *Fetcher) loop() {
if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
if header.Number.Uint64() != announce.number {
log.Trace(fmt.Sprintf("[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d", announce.origin, header.Hash().Bytes()[:4], announce.number, header.Number.Uint64()))
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
f.dropPeer(announce.origin)
f.forgetHash(hash)
continue
@ -477,7 +459,7 @@ func (f *Fetcher) loop() {
// If the block is empty (header only), short circuit into the final import queue
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
log.Trace(fmt.Sprintf("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4]))
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
@ -489,7 +471,7 @@ func (f *Fetcher) loop() {
// Otherwise add to the list of blocks needing completion
incomplete = append(incomplete, announce)
} else {
log.Trace(fmt.Sprintf("[eth/62] Peer %s: block #%d [%x…] already imported, discarding header", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4]))
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
f.forgetHash(hash)
}
} else {
@ -620,14 +602,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug(fmt.Sprintf("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit))
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug(fmt.Sprintf("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist))
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
@ -644,9 +626,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
log.Debug("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprintf("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
}})
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
}
}
@ -657,14 +637,14 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
log.Debug(fmt.Sprintf("Peer %s: importing block #%d [%x…]", peer, block.NumberU64(), hash[:4]))
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug(fmt.Sprintf("Peer %s: parent [%x…] of block #%d [%x…] unknown", peer, block.ParentHash().Bytes()[:4], block.NumberU64(), hash[:4]))
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
return
}
// Quickly validate the header and propagate the block if it passes
@ -679,13 +659,13 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
default:
// Something went very wrong, drop the peer
log.Debug(fmt.Sprintf("Peer %s: block #%d [%x…] verification failed: %v", peer, block.NumberU64(), hash[:4], err))
log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertChain(types.Blocks{block}); err != nil {
log.Warn(fmt.Sprintf("Peer %s: block #%d [%x…] import failed: %v", peer, block.NumberU64(), hash[:4], err))
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}
// If import succeeded, broadcast the block

@ -17,7 +17,6 @@
package gasprice
import (
"fmt"
"math/big"
"math/rand"
"sync"
@ -176,7 +175,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) {
self.lastBase = newBase
self.lastBaseMutex.Unlock()
log.Trace(fmt.Sprintf("Processed block #%v, base price is %v\n", i, newBase.Int64()))
log.Trace("Processed block, base price updated", "number", i, "base", newBase)
}
// returns the lowers possible price with which a tx was or could have been included

@ -115,7 +115,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int
}
// Figure out whether to allow fast sync or not
if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Info(fmt.Sprintf("blockchain not empty, fast sync disabled"))
log.Warn("Blockchain not empty, fast sync disabled")
fastSync = false
}
if fastSync {
@ -178,7 +178,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
if blockchain.Genesis().Hash().Hex() == defaultGenesisHash && networkId == 1 {
log.Debug(fmt.Sprint("Bad Block Reporting is enabled"))
log.Debug("Bad block reporting is enabled")
manager.badBlockReportingEnabled = true
}
@ -199,12 +199,12 @@ func (pm *ProtocolManager) removePeer(id string) {
if peer == nil {
return
}
log.Debug(fmt.Sprint("Removing peer", id))
log.Debug("Removing Ethereum peer", "peer", id)
// Unregister the peer from the downloader and Ethereum peer set
pm.downloader.UnregisterPeer(id)
if err := pm.peers.Unregister(id); err != nil {
log.Error(fmt.Sprint("Removal failed:", err))
log.Error("Peer removal failed", "peer", id, "err", err)
}
// Hard disconnect at the networking layer
if peer != nil {
@ -226,7 +226,7 @@ func (pm *ProtocolManager) Start() {
}
func (pm *ProtocolManager) Stop() {
log.Info(fmt.Sprint("Stopping ethereum protocol handler..."))
log.Info("Stopping Ethereum protocol")
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
@ -247,7 +247,7 @@ func (pm *ProtocolManager) Stop() {
// Wait for all peer handler goroutines and the loops to come down.
pm.wg.Wait()
log.Info(fmt.Sprint("Ethereum protocol handler stopped"))
log.Info("Ethereum protocol stopped")
}
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@ -260,22 +260,20 @@ func (pm *ProtocolManager) handle(p *peer) error {
if pm.peers.Len() >= pm.maxPeers {
return p2p.DiscTooManyPeers
}
log.Debug(fmt.Sprintf("%v: peer connected [%s]", p, p.Name()))
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
td, head, genesis := pm.blockchain.Status()
if err := p.Handshake(pm.networkId, td, head, genesis); err != nil {
log.Debug(fmt.Sprintf("%v: handshake failed: %v", p, err))
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
log.Trace(fmt.Sprintf("%v: adding peer", p))
if err := pm.peers.Register(p); err != nil {
log.Error(fmt.Sprintf("%v: addition failed: %v", p, err))
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
@ -296,7 +294,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
log.Debug(fmt.Sprintf("%v: timed out DAO fork-check, dropping", p))
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
@ -310,7 +308,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
log.Debug(fmt.Sprintf("%v: message handling failed: %v", p, err))
p.Log().Debug("Message handling failed", "err", err)
return err
}
}
@ -386,7 +384,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
log.Warn(fmt.Sprintf("%v: GetBlockHeaders skip overflow attack (current %v, skip %v, next %v)\nMalicious peer infos: %s", p, current, query.Skip, next, infos))
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
@ -434,7 +432,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO {
log.Debug(fmt.Sprintf("%v: seems to be on the same side of the DAO fork", p))
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
@ -451,10 +449,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Validate the header and either drop the peer or continue
if err := core.ValidateDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
log.Debug(fmt.Sprintf("%v: verified to be on the other side of the DAO fork, dropping", p))
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
log.Debug(fmt.Sprintf("%v: verified to be on the same side of the DAO fork", p))
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
@ -463,7 +461,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug(fmt.Sprint(err))
log.Debug("Failed to deliver headers", "err", err)
}
}
@ -516,7 +514,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if len(trasactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
if err != nil {
log.Debug(fmt.Sprint(err))
log.Debug("Failed to deliver bodies", "err", err)
}
}
@ -555,7 +553,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Deliver all to the downloader
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
log.Debug(fmt.Sprintf("failed to deliver node state data: %v", err))
log.Debug("Failed to deliver node state data", "err", err)
}
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
@ -586,7 +584,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error(fmt.Sprintf("failed to encode receipt: %v", err))
log.Error("Failed to encode receipt", "err", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
@ -602,7 +600,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Deliver all to the downloader
if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
log.Debug(fmt.Sprintf("failed to deliver receipts: %v", err))
log.Debug("Failed to deliver receipts", "err", err)
}
case msg.Code == NewBlockHashesMsg:
@ -695,7 +693,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error(fmt.Sprintf("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4]))
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
// Send the block to a subset of our peers
@ -703,14 +701,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
for _, peer := range transfer {
peer.SendNewBlock(block, td)
}
log.Trace(fmt.Sprintf("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt)))
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
// Otherwise if the block is indeed in out own chain, announce it
if pm.blockchain.HasBlock(hash) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
log.Trace(fmt.Sprintf("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)))
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}
@ -723,7 +721,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}
log.Trace(fmt.Sprint("broadcast tx to", len(peers), "peers"))
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
// Mined broadcast loop

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
@ -191,41 +190,41 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
// RequestHeaders is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
log.Debug(fmt.Sprintf("%v fetching a single header: %x", p, hash))
p.Log().Debug("Fetching single header", "hash", hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
log.Debug(fmt.Sprintf("%v fetching %d headers from %x, skipping %d (reverse = %v)", p, amount, origin[:4], skip, reverse))
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
log.Debug(fmt.Sprintf("%v fetching %d headers from #%d, skipping %d (reverse = %v)", p, amount, origin, skip, reverse))
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
func (p *peer) RequestBodies(hashes []common.Hash) error {
log.Debug(fmt.Sprintf("%v fetching %d block bodies", p, len(hashes)))
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
}
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
func (p *peer) RequestNodeData(hashes []common.Hash) error {
log.Debug(fmt.Sprintf("%v fetching %v state data", p, len(hashes)))
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
return p2p.Send(p.rw, GetNodeDataMsg, hashes)
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *peer) RequestReceipts(hashes []common.Hash) error {
log.Debug(fmt.Sprintf("%v fetching %v receipts", p, len(hashes)))
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
return p2p.Send(p.rw, GetReceiptsMsg, hashes)
}

@ -17,7 +17,6 @@
package eth
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
@ -87,7 +86,7 @@ func (pm *ProtocolManager) txsyncLoop() {
delete(pending, s.p.ID())
}
// Send the pack in the background.
log.Trace(fmt.Sprintf("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size))
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
@ -117,7 +116,7 @@ func (pm *ProtocolManager) txsyncLoop() {
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
log.Debug(fmt.Sprintf("%v: tx send failed: %v", pack.p.Peer, err))
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
// Schedule the next send.
@ -187,7 +186,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 {
log.Info(fmt.Sprintf("fast sync complete, auto disabling"))
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&pm.fastSync, 0)
}
}

@ -17,8 +17,6 @@
package ethdb
import (
"fmt"
"path/filepath"
"strconv"
"strings"
"sync"
@ -37,20 +35,6 @@ import (
var OpenFileLimit = 64
// cacheRatio specifies how the total allotted cache is distributed between the
// various system databases.
var cacheRatio = map[string]float64{
"chaindata": 1.0,
"lightchaindata": 1.0,
}
// handleRatio specifies how the total allotted file descriptors is distributed
// between the various system databases.
var handleRatio = map[string]float64{
"chaindata": 1.0,
"lightchaindata": 1.0,
}
type LDBDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
@ -67,20 +51,22 @@ type LDBDatabase struct {
quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
log log.Logger // Contextual logger tracking the database path
}
// NewLDBDatabase returns a LevelDB wrapped object.
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
// Calculate the cache and file descriptor allowance for this particular database
cache = int(float64(cache) * cacheRatio[filepath.Base(file)])
logger := log.New("database", file)
// Ensure we have some minimal caching and file guarantees
if cache < 16 {
cache = 16
}
handles = int(float64(handles) * handleRatio[filepath.Base(file)])
if handles < 16 {
handles = 16
}
log.Info(fmt.Sprintf("Allotted %dMB cache and %d file handles to %s", cache, handles, file))
logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)
// Open the db and recover any potential corruptions
db, err := leveldb.OpenFile(file, &opt.Options{
@ -97,8 +83,9 @@ func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
return nil, err
}
return &LDBDatabase{
fn: file,
db: db,
fn: file,
db: db,
log: logger,
}, nil
}
@ -108,103 +95,103 @@ func (db *LDBDatabase) Path() string {
}
// Put puts the given key / value to the queue
func (self *LDBDatabase) Put(key []byte, value []byte) error {
func (db *LDBDatabase) Put(key []byte, value []byte) error {
// Measure the database put latency, if requested
if self.putTimer != nil {
defer self.putTimer.UpdateSince(time.Now())
if db.putTimer != nil {
defer db.putTimer.UpdateSince(time.Now())
}
// Generate the data to write to disk, update the meter and write
//value = rle.Compress(value)
if self.writeMeter != nil {
self.writeMeter.Mark(int64(len(value)))
if db.writeMeter != nil {
db.writeMeter.Mark(int64(len(value)))
}
return self.db.Put(key, value, nil)
return db.db.Put(key, value, nil)
}
// Get returns the given key if it's present.
func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
// Measure the database get latency, if requested
if self.getTimer != nil {
defer self.getTimer.UpdateSince(time.Now())
if db.getTimer != nil {
defer db.getTimer.UpdateSince(time.Now())
}
// Retrieve the key and increment the miss counter if not found
dat, err := self.db.Get(key, nil)
dat, err := db.db.Get(key, nil)
if err != nil {
if self.missMeter != nil {
self.missMeter.Mark(1)
if db.missMeter != nil {
db.missMeter.Mark(1)
}
return nil, err
}
// Otherwise update the actually retrieved amount of data
if self.readMeter != nil {
self.readMeter.Mark(int64(len(dat)))
if db.readMeter != nil {
db.readMeter.Mark(int64(len(dat)))
}
return dat, nil
//return rle.Decompress(dat)
}
// Delete deletes the key from the queue and database
func (self *LDBDatabase) Delete(key []byte) error {
func (db *LDBDatabase) Delete(key []byte) error {
// Measure the database delete latency, if requested
if self.delTimer != nil {
defer self.delTimer.UpdateSince(time.Now())
if db.delTimer != nil {
defer db.delTimer.UpdateSince(time.Now())
}
// Execute the actual operation
return self.db.Delete(key, nil)
return db.db.Delete(key, nil)
}
func (self *LDBDatabase) NewIterator() iterator.Iterator {
return self.db.NewIterator(nil, nil)
func (db *LDBDatabase) NewIterator() iterator.Iterator {
return db.db.NewIterator(nil, nil)
}
func (self *LDBDatabase) Close() {
func (db *LDBDatabase) Close() {
// Stop the metrics collection to avoid internal database races
self.quitLock.Lock()
defer self.quitLock.Unlock()
db.quitLock.Lock()
defer db.quitLock.Unlock()
if self.quitChan != nil {
if db.quitChan != nil {
errc := make(chan error)
self.quitChan <- errc
db.quitChan <- errc
if err := <-errc; err != nil {
log.Error(fmt.Sprintf("metrics failure in '%s': %v\n", self.fn, err))
db.log.Error("Metrics collection failed", "err", err)
}
}
err := self.db.Close()
err := db.db.Close()
if err == nil {
log.Info(fmt.Sprint("closed db:", self.fn))
db.log.Info("Database closed")
} else {
log.Error(fmt.Sprintf("error closing db %s: %v", self.fn, err))
db.log.Error("Failed to close database", "err", err)
}
}
func (self *LDBDatabase) LDB() *leveldb.DB {
return self.db
func (db *LDBDatabase) LDB() *leveldb.DB {
return db.db
}
// Meter configures the database metrics collectors and
func (self *LDBDatabase) Meter(prefix string) {
func (db *LDBDatabase) Meter(prefix string) {
// Short circuit metering if the metrics system is disabled
if !metrics.Enabled {
return
}
// Initialize all the metrics collector at the requested prefix
self.getTimer = metrics.NewTimer(prefix + "user/gets")
self.putTimer = metrics.NewTimer(prefix + "user/puts")
self.delTimer = metrics.NewTimer(prefix + "user/dels")
self.missMeter = metrics.NewMeter(prefix + "user/misses")
self.readMeter = metrics.NewMeter(prefix + "user/reads")
self.writeMeter = metrics.NewMeter(prefix + "user/writes")
self.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
self.compReadMeter = metrics.NewMeter(prefix + "compact/input")
self.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
db.getTimer = metrics.NewTimer(prefix + "user/gets")
db.putTimer = metrics.NewTimer(prefix + "user/puts")
db.delTimer = metrics.NewTimer(prefix + "user/dels")
db.missMeter = metrics.NewMeter(prefix + "user/misses")
db.readMeter = metrics.NewMeter(prefix + "user/reads")
db.writeMeter = metrics.NewMeter(prefix + "user/writes")
db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
// Create a quit channel for the periodic collector and run it
self.quitLock.Lock()
self.quitChan = make(chan chan error)
self.quitLock.Unlock()
db.quitLock.Lock()
db.quitChan = make(chan chan error)
db.quitLock.Unlock()
go self.meter(3 * time.Second)
go db.meter(3 * time.Second)
}
// meter periodically retrieves internal leveldb counters and reports them to
@ -218,7 +205,7 @@ func (self *LDBDatabase) Meter(prefix string) {
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
func (self *LDBDatabase) meter(refresh time.Duration) {
func (db *LDBDatabase) meter(refresh time.Duration) {
// Create the counters to store current and previous values
counters := make([][]float64, 2)
for i := 0; i < 2; i++ {
@ -227,9 +214,9 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
// Iterate ad infinitum and collect the stats
for i := 1; ; i++ {
// Retrieve the database stats
stats, err := self.db.GetProperty("leveldb.stats")
stats, err := db.db.GetProperty("leveldb.stats")
if err != nil {
log.Error(fmt.Sprintf("failed to read database stats: %v", err))
db.log.Error("Failed to read database stats", "err", err)
return
}
// Find the compaction table, skip the header
@ -238,7 +225,7 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
lines = lines[1:]
}
if len(lines) <= 3 {
log.Error(fmt.Sprintf("compaction table not found"))
db.log.Error("Compaction table not found")
return
}
lines = lines[3:]
@ -253,27 +240,27 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
break
}
for idx, counter := range parts[3:] {
if value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64); err != nil {
log.Error(fmt.Sprintf("compaction entry parsing failed: %v", err))
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err)
return
} else {
counters[i%2][idx] += value
}
counters[i%2][idx] += value
}
}
// Update all the requested meters
if self.compTimeMeter != nil {
self.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if self.compReadMeter != nil {
self.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
}
if self.compWriteMeter != nil {
self.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
}
// Sleep a bit, then repeat the stats collection
select {
case errc := <-self.quitChan:
case errc := <-db.quitChan:
// Quit requesting, stop hammering the database
errc <- nil
return

Loading…
Cancel
Save