From 794c6133efa2c7e8376d9d141c900ea541790bce Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 7 Sep 2021 12:31:17 +0200 Subject: [PATCH] core/rawdb: freezer batch write (#23462) This change is a rewrite of the freezer code. When writing ancient chain data to the freezer, the previous version first encoded each individual item to a temporary buffer, then wrote the buffer. For small item sizes (for example, in the block hash freezer table), this strategy causes a lot of system calls for writing tiny chunks of data. It also allocated a lot of temporary []byte buffers. In the new version, we instead encode multiple items into a re-useable batch buffer, which is then written to the file all at once. This avoids performing a system call for every inserted item. To make the internal batching work, the ancient database API had to be changed. While integrating this new API in BlockChain.InsertReceiptChain, additional optimizations were also added there. Co-authored-by: Felix Lange --- core/blockchain.go | 199 ++++++--------- core/blockchain_test.go | 121 ++++++--- core/error.go | 2 + core/rawdb/accessors_chain.go | 92 +++++-- core/rawdb/accessors_chain_test.go | 146 ++++++++++- core/rawdb/database.go | 12 +- core/rawdb/freezer.go | 232 ++++++++++------- core/rawdb/freezer_batch.go | 248 ++++++++++++++++++ core/rawdb/freezer_table.go | 169 +++++------- core/rawdb/freezer_table_test.go | 396 +++++++++++++++-------------- core/rawdb/freezer_test.go | 301 ++++++++++++++++++++++ core/rawdb/table.go | 7 +- ethdb/database.go | 16 +- 13 files changed, 1350 insertions(+), 591 deletions(-) create mode 100644 core/rawdb/freezer_batch.go create mode 100644 core/rawdb/freezer_test.go diff --git a/core/blockchain.go b/core/blockchain.go index e14291aa54..66bc395c9d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -207,8 +207,7 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config - shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. - terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. } // NewBlockChain returns a fully initialised block chain using information @@ -1085,38 +1084,6 @@ const ( SideStatTy ) -// truncateAncient rewinds the blockchain to the specified header and deletes all -// data in the ancient store that exceeds the specified header. -func (bc *BlockChain) truncateAncient(head uint64) error { - frozen, err := bc.db.Ancients() - if err != nil { - return err - } - // Short circuit if there is no data to truncate in ancient store. - if frozen <= head+1 { - return nil - } - // Truncate all the data in the freezer beyond the specified head - if err := bc.db.TruncateAncients(head + 1); err != nil { - return err - } - // Clear out any stale content from the caches - bc.hc.headerCache.Purge() - bc.hc.tdCache.Purge() - bc.hc.numberCache.Purge() - - // Clear out any stale content from the caches - bc.bodyCache.Purge() - bc.bodyRLPCache.Purge() - bc.receiptsCache.Purge() - bc.blockCache.Purge() - bc.txLookupCache.Purge() - bc.futureBlocks.Purge() - - log.Info("Rewind ancient data", "number", head) - return nil -} - // numberHash is just a container for a number and a hash, to represent a block type numberHash struct { number uint64 @@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var ( stats = struct{ processed, ignored int32 }{} start = time.Now() - size = 0 + size = int64(0) ) + // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Rewind may have occurred, skip in that case. if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 { @@ -1169,68 +1138,63 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) bc.currentFastBlock.Store(head) headFastBlockGauge.Update(int64(head.NumberU64())) - bc.chainmu.Unlock() return true } } - bc.chainmu.Unlock() return false } + // writeAncient writes blockchain and corresponding receipt chain into ancient store. // // this function only accepts canonical chain data. All side chain will be reverted // eventually. writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { - var ( - previous = bc.CurrentFastBlock() - batch = bc.db.NewBatch() - ) - // If any error occurs before updating the head or we are inserting a side chain, - // all the data written this time wll be rolled back. - defer func() { - if previous != nil { - if err := bc.truncateAncient(previous.NumberU64()); err != nil { - log.Crit("Truncate ancient store failed", "err", err) - } - } - }() - var deleted []*numberHash - for i, block := range blockChain { - // Short circuit insertion if shutting down or processing failed - if bc.insertStopped() { - return 0, errInsertionInterrupted - } - // Short circuit insertion if it is required(used in testing only) - if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) { - return i, errors.New("insertion is terminated for testing purpose") - } - // Short circuit if the owner header is unknown - if !bc.HasHeader(block.Hash(), block.NumberU64()) { - return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4]) - } - if block.NumberU64() == 1 { - // Make sure to write the genesis into the freezer - if frozen, _ := bc.db.Ancients(); frozen == 0 { - h := rawdb.ReadCanonicalHash(bc.db, 0) - b := rawdb.ReadBlock(bc.db, h, 0) - size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0)) - log.Info("Wrote genesis to ancients") + first := blockChain[0] + last := blockChain[len(blockChain)-1] + + // Ensure genesis is in ancients. + if first.NumberU64() == 1 { + if frozen, _ := bc.db.Ancients(); frozen == 0 { + b := bc.genesisBlock + td := bc.genesisBlock.Difficulty() + writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td) + size += writeSize + if err != nil { + log.Error("Error writing genesis to ancients", "err", err) + return 0, err } + log.Info("Wrote genesis to ancients") } - // Flush data into ancient database. - size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) - - // Write tx indices if any condition is satisfied: - // * If user requires to reserve all tx indices(txlookuplimit=0) - // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) - // * If block number is large enough to be regarded as a recent block - // It means blocks below the ancientLimit-txlookupLimit won't be indexed. - // - // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with - // an external ancient database, during the setup, blockchain will start - // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) - // range. In this case, all tx indices of newly imported blocks should be - // generated. + } + // Before writing the blocks to the ancients, we need to ensure that + // they correspond to the what the headerchain 'expects'. + // We only check the last block/header, since it's a contiguous chain. + if !bc.HasHeader(last.Hash(), last.NumberU64()) { + return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4]) + } + + // Write all chain data to ancients. + td := bc.GetTd(first.Hash(), first.NumberU64()) + writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) + size += writeSize + if err != nil { + log.Error("Error importing chain data to ancients", "err", err) + return 0, err + } + + // Write tx indices if any condition is satisfied: + // * If user requires to reserve all tx indices(txlookuplimit=0) + // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) + // * If block number is large enough to be regarded as a recent block + // It means blocks below the ancientLimit-txlookupLimit won't be indexed. + // + // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with + // an external ancient database, during the setup, blockchain will start + // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) + // range. In this case, all tx indices of newly imported blocks should be + // generated. + var batch = bc.db.NewBatch() + for _, block := range blockChain { if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { rawdb.WriteTxLookupEntriesByBlock(batch, block) } else if rawdb.ReadTxIndexTail(bc.db) != nil { @@ -1238,51 +1202,50 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } stats.processed++ } + // Flush all tx-lookup index data. - size += batch.ValueSize() + size += int64(batch.ValueSize()) if err := batch.Write(); err != nil { + // The tx index data could not be written. + // Roll back the ancient store update. + fastBlock := bc.CurrentFastBlock().NumberU64() + if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + log.Error("Can't truncate ancient store after failed insert", "err", err) + } return 0, err } - batch.Reset() // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { return 0, err } - if !updateHead(blockChain[len(blockChain)-1]) { - return 0, errors.New("side blocks can't be accepted as the ancient chain data") - } - previous = nil // disable rollback explicitly - // Wipe out canonical block data. - for _, nh := range deleted { - rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number) - rawdb.DeleteCanonicalHash(batch, nh.number) - } - for _, block := range blockChain { - // Always keep genesis block in active database. - if block.NumberU64() != 0 { - rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) - rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + // Update the current fast block because all block data is now present in DB. + previousFastBlock := bc.CurrentFastBlock().NumberU64() + if !updateHead(blockChain[len(blockChain)-1]) { + // We end up here if the header chain has reorg'ed, and the blocks/receipts + // don't match the canonical chain. + if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + log.Error("Can't truncate ancient store after failed insert", "err", err) } + return 0, errSideChainReceipts } - if err := batch.Write(); err != nil { - return 0, err - } - batch.Reset() - // Wipe out side chain too. - for _, nh := range deleted { - for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) { - rawdb.DeleteBlock(batch, hash, nh.number) + // Delete block data from the main database. + batch.Reset() + canonHashes := make(map[common.Hash]struct{}) + for _, block := range blockChain { + canonHashes[block.Hash()] = struct{}{} + if block.NumberU64() == 0 { + continue } + rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) } - for _, block := range blockChain { - // Always keep genesis block in active database. - if block.NumberU64() != 0 { - for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { - rawdb.DeleteBlock(batch, hash, block.NumberU64()) - } + // Delete side chain hash-to-number mappings. + for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { + if _, canon := canonHashes[nh.Hash]; !canon { + rawdb.DeleteHeader(batch, nh.Hash, nh.Number) } } if err := batch.Write(); err != nil { @@ -1290,6 +1253,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } return 0, nil } + // writeLive writes blockchain and corresponding receipt chain into active store. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { skipPresenceCheck := false @@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if err := batch.Write(); err != nil { return 0, err } - size += batch.ValueSize() + size += int64(batch.ValueSize()) batch.Reset() } stats.processed++ @@ -1336,7 +1300,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // we can ensure all components of body is completed(body, receipts, // tx indexes) if batch.ValueSize() > 0 { - size += batch.ValueSize() + size += int64(batch.ValueSize()) if err := batch.Write(); err != nil { return 0, err } @@ -1344,6 +1308,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ updateHead(blockChain[len(blockChain)-1]) return 0, nil } + // Write downloaded chain data and corresponding receipt chain data if len(ancientBlocks) > 0 { if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4e5df633b7..8d94f17aab 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -670,6 +670,7 @@ func TestFastVsFullChains(t *testing.T) { if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } + // Iterate over all chain data components, and cross reference for i := 0; i < len(blocks); i++ { num, hash := blocks[i].NumberU64(), blocks[i].Hash() @@ -693,10 +694,27 @@ func TestFastVsFullChains(t *testing.T) { } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) { t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles()) } - if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) { + + // Check receipts. + freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config()) + anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config()) + areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config()) + if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) { t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts) } + + // Check that hash-to-number mappings are present in all databases. + if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m) + } + if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m) + } + if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m) + } } + // Check that the canonical chains are the same between the databases for i := 0; i < len(blocks)+1; i++ { if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash { @@ -1639,20 +1657,34 @@ func TestBlockchainRecovery(t *testing.T) { } } -func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { - // Configure and generate a sample block chain - var ( - gendb = rawdb.NewMemoryDatabase() - key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address = crypto.PubkeyToAddress(key.PublicKey) - funds = big.NewInt(1000000000) - gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} - genesis = gspec.MustCommit(gendb) - ) - height := uint64(1024) - blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) +// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain. +func TestInsertReceiptChainRollback(t *testing.T) { + // Generate forked chain. The returned BlockChain object is used to process the side chain blocks. + tmpChain, sideblocks, canonblocks, err := getLongAndShortChains() + if err != nil { + t.Fatal(err) + } + defer tmpChain.Stop() + // Get the side chain receipts. + if _, err := tmpChain.InsertChain(sideblocks); err != nil { + t.Fatal("processing side chain failed:", err) + } + t.Log("sidechain head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash()) + sidechainReceipts := make([]types.Receipts, len(sideblocks)) + for i, block := range sideblocks { + sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) + } + // Get the canon chain receipts. + if _, err := tmpChain.InsertChain(canonblocks); err != nil { + t.Fatal("processing canon chain failed:", err) + } + t.Log("canon head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash()) + canonReceipts := make([]types.Receipts, len(canonblocks)) + for i, block := range canonblocks { + canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) + } - // Import the chain as a ancient-first node and ensure all pointers are updated + // Set up a BlockChain that uses the ancient store. frdir, err := ioutil.TempDir("", "") if err != nil { t.Fatalf("failed to create temp freezer dir: %v", err) @@ -1662,38 +1694,43 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } + gspec := Genesis{Config: params.AllEthashProtocolChanges} gspec.MustCommit(ancientDb) - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) - defer ancient.Stop() + ancientChain, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) + defer ancientChain.Stop() - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) + // Import the canonical header chain. + canonHeaders := make([]*types.Header, len(canonblocks)) + for i, block := range canonblocks { + canonHeaders[i] = block.Header() } - // Abort ancient receipt chain insertion deliberately - ancient.terminateInsert = func(hash common.Hash, number uint64) bool { - return number == blocks[len(blocks)/2].NumberU64() + if _, err = ancientChain.InsertHeaderChain(canonHeaders, 1); err != nil { + t.Fatal("can't import canon headers:", err) } - previousFastBlock := ancient.CurrentFastBlock() - if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { - t.Fatalf("failed to insert receipt %d: %v", n, err) + + // Try to insert blocks/receipts of the side chain. + _, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks))) + if err == nil { + t.Fatal("expected error from InsertReceiptChain.") } - if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() { - t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64()) + if ancientChain.CurrentFastBlock().NumberU64() != 0 { + t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentFastBlock().NumberU64()) } - if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 { - t.Fatalf("failed to truncate ancient data") + if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 { + t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen) } - ancient.terminateInsert = nil - if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { - t.Fatalf("failed to insert receipt %d: %v", n, err) + + // Insert blocks/receipts of the canonical chain. + _, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks))) + if err != nil { + t.Fatalf("can't import canon chain receipts: %v", err) } - if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() { + if ancientChain.CurrentFastBlock().NumberU64() != canonblocks[len(canonblocks)-1].NumberU64() { t.Fatalf("failed to insert ancient recept chain after rollback") } + if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 { + t.Fatalf("wrong ancients count %d", frozen) + } } // Tests that importing a very large side fork, which is larger than the canon chain, @@ -1958,9 +1995,8 @@ func testInsertKnownChainData(t *testing.T, typ string) { asserter(t, blocks2[len(blocks2)-1]) } -// getLongAndShortChains returns two chains, -// A is longer, B is heavier -func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) { +// getLongAndShortChains returns two chains: A is longer, B is heavier. +func getLongAndShortChains() (bc *BlockChain, longChain []*types.Block, heavyChain []*types.Block, err error) { // Generate a canonical chain to act as the main dataset engine := ethash.NewFaker() db := rawdb.NewMemoryDatabase() @@ -1968,7 +2004,7 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error // Generate and import the canonical chain, // Offset the time, to keep the difficulty low - longChain, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) { + longChain, _ = GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) diskdb := rawdb.NewMemoryDatabase() @@ -1982,10 +2018,13 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error // Generate fork chain, make it shorter than canon, with common ancestor pretty early parentIndex := 3 parent := longChain[parentIndex] - heavyChain, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) { + heavyChainExt, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) b.OffsetTime(-9) }) + heavyChain = append(heavyChain, longChain[:parentIndex+1]...) + heavyChain = append(heavyChain, heavyChainExt...) + // Verify that the test is sane var ( longerTd = new(big.Int) diff --git a/core/error.go b/core/error.go index 33cf874d15..594f3a26e5 100644 --- a/core/error.go +++ b/core/error.go @@ -31,6 +31,8 @@ var ( // ErrNoGenesis is returned when there is no Genesis Block. ErrNoGenesis = errors.New("genesis not found in chain") + + errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data") ) // List of evm-call-message pre-checking errors. All state transition messages will diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 76132bf37e..58226fb046 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -19,6 +19,7 @@ package rawdb import ( "bytes" "encoding/binary" + "fmt" "math/big" "sort" @@ -81,6 +82,37 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { return hashes } +type NumberHash struct { + Number uint64 + Hash common.Hash +} + +// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, +// both canonical and reorged forks included. +// This method considers both limits to be _inclusive_. +func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { + var ( + start = encodeBlockNumber(first) + keyLength = len(headerPrefix) + 8 + 32 + hashes = make([]*NumberHash, 0, 1+last-first) + it = db.NewIterator(headerPrefix, start) + ) + defer it.Release() + for it.Next() { + key := it.Key() + if len(key) != keyLength { + continue + } + num := binary.BigEndian.Uint64(key[len(headerPrefix) : len(headerPrefix)+8]) + if num > last { + break + } + hash := common.BytesToHash(key[len(key)-32:]) + hashes = append(hashes, &NumberHash{num, hash}) + } + return hashes +} + // ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the // certain chain range. If the accumulated entries reaches the given threshold, // abort the iteration and return the semi-finish result. @@ -656,34 +688,48 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { } // WriteAncientBlock writes entire block data into ancient store and returns the total written size. -func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int { - // Encode all block components to RLP format. - headerBlob, err := rlp.EncodeToBytes(block.Header()) - if err != nil { - log.Crit("Failed to RLP encode block header", "err", err) - } - bodyBlob, err := rlp.EncodeToBytes(block.Body()) - if err != nil { - log.Crit("Failed to RLP encode body", "err", err) +func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) { + var ( + tdSum = new(big.Int).Set(td) + stReceipts []*types.ReceiptForStorage + ) + return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i, block := range blocks { + // Convert receipts to storage format and sum up total difficulty. + stReceipts = stReceipts[:0] + for _, receipt := range receipts[i] { + stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt)) + } + header := block.Header() + if i > 0 { + tdSum.Add(tdSum, header.Difficulty) + } + if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil { + return err + } + } + return nil + }) +} + +func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error { + num := block.NumberU64() + if err := op.AppendRaw(freezerHashTable, num, block.Hash().Bytes()); err != nil { + return fmt.Errorf("can't add block %d hash: %v", num, err) } - storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) - for i, receipt := range receipts { - storageReceipts[i] = (*types.ReceiptForStorage)(receipt) + if err := op.Append(freezerHeaderTable, num, header); err != nil { + return fmt.Errorf("can't append block header %d: %v", num, err) } - receiptBlob, err := rlp.EncodeToBytes(storageReceipts) - if err != nil { - log.Crit("Failed to RLP encode block receipts", "err", err) + if err := op.Append(freezerBodiesTable, num, block.Body()); err != nil { + return fmt.Errorf("can't append block body %d: %v", num, err) } - tdBlob, err := rlp.EncodeToBytes(td) - if err != nil { - log.Crit("Failed to RLP encode block total difficulty", "err", err) + if err := op.Append(freezerReceiptTable, num, receipts); err != nil { + return fmt.Errorf("can't append block %d receipts: %v", num, err) } - // Write all blob to flatten files. - err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob) - if err != nil { - log.Crit("Failed to write block data to ancient store", "err", err) + if err := op.Append(freezerDifficultyTable, num, td); err != nil { + return fmt.Errorf("can't append block %d total difficulty: %v", num, err) } - return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength + return nil } // DeleteBlock removes all block data associated with a hash. diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index f20e8b1fff..58d7645e50 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "golang.org/x/crypto/sha3" @@ -438,7 +439,7 @@ func TestAncientStorage(t *testing.T) { if err != nil { t.Fatalf("failed to create temp freezer dir: %v", err) } - defer os.Remove(frdir) + defer os.RemoveAll(frdir) db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) if err != nil { @@ -467,8 +468,10 @@ func TestAncientStorage(t *testing.T) { if blob := ReadTdRLP(db, hash, number); len(blob) > 0 { t.Fatalf("non existent td returned") } + // Write and verify the header in the database - WriteAncientBlock(db, block, nil, big.NewInt(100)) + WriteAncientBlocks(db, []*types.Block{block}, []types.Receipts{nil}, big.NewInt(100)) + if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 { t.Fatalf("no header returned") } @@ -481,6 +484,7 @@ func TestAncientStorage(t *testing.T) { if blob := ReadTdRLP(db, hash, number); len(blob) == 0 { t.Fatalf("no td returned") } + // Use a fake hash for data retrieval, nothing should be returned. fakeHash := common.BytesToHash([]byte{0x01, 0x02, 0x03}) if blob := ReadHeaderRLP(db, fakeHash, number); len(blob) != 0 { @@ -528,3 +532,141 @@ func TestCanonicalHashIteration(t *testing.T) { } } } + +func TestHashesInRange(t *testing.T) { + mkHeader := func(number, seq int) *types.Header { + h := types.Header{ + Difficulty: new(big.Int), + Number: big.NewInt(int64(number)), + GasLimit: uint64(seq), + } + return &h + } + db := NewMemoryDatabase() + // For each number, write N versions of that particular number + total := 0 + for i := 0; i < 15; i++ { + for ii := 0; ii < i; ii++ { + WriteHeader(db, mkHeader(i, ii)) + total++ + } + } + if have, want := len(ReadAllHashesInRange(db, 10, 10)), 10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 10, 9)), 0; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 0, 100)), total; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 9, 10)), 9+10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 10)), 10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 16)), 0; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 1)), 1; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } +} + +// This measures the write speed of the WriteAncientBlocks operation. +func BenchmarkWriteAncientBlocks(b *testing.B) { + // Open freezer database. + frdir, err := ioutil.TempDir("", "") + if err != nil { + b.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.RemoveAll(frdir) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + if err != nil { + b.Fatalf("failed to create database with ancient backend") + } + + // Create the data to insert. The blocks must have consecutive numbers, so we create + // all of them ahead of time. However, there is no need to create receipts + // individually for each block, just make one batch here and reuse it for all writes. + const batchSize = 128 + const blockTxs = 20 + allBlocks := makeTestBlocks(b.N, blockTxs) + batchReceipts := makeTestReceipts(batchSize, blockTxs) + b.ResetTimer() + + // The benchmark loop writes batches of blocks, but note that the total block count is + // b.N. This means the resulting ns/op measurement is the time it takes to write a + // single block and its associated data. + var td = big.NewInt(55) + var totalSize int64 + for i := 0; i < b.N; i += batchSize { + length := batchSize + if i+batchSize > b.N { + length = b.N - i + } + + blocks := allBlocks[i : i+length] + receipts := batchReceipts[:length] + writeSize, err := WriteAncientBlocks(db, blocks, receipts, td) + if err != nil { + b.Fatal(err) + } + totalSize += writeSize + } + + // Enable MB/s reporting. + b.SetBytes(totalSize / int64(b.N)) +} + +// makeTestBlocks creates fake blocks for the ancient write benchmark. +func makeTestBlocks(nblock int, txsPerBlock int) []*types.Block { + key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + signer := types.LatestSignerForChainID(big.NewInt(8)) + + // Create transactions. + txs := make([]*types.Transaction, txsPerBlock) + for i := 0; i < len(txs); i++ { + var err error + to := common.Address{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + txs[i], err = types.SignNewTx(key, signer, &types.LegacyTx{ + Nonce: 2, + GasPrice: big.NewInt(30000), + Gas: 0x45454545, + To: &to, + }) + if err != nil { + panic(err) + } + } + + // Create the blocks. + blocks := make([]*types.Block, nblock) + for i := 0; i < nblock; i++ { + header := &types.Header{ + Number: big.NewInt(int64(i)), + Extra: []byte("test block"), + } + blocks[i] = types.NewBlockWithHeader(header).WithBody(txs, nil) + blocks[i].Hash() // pre-cache the block hash + } + return blocks +} + +// makeTestReceipts creates fake receipts for the ancient write benchmark. +func makeTestReceipts(n int, nPerBlock int) []types.Receipts { + receipts := make([]*types.Receipt, nPerBlock) + for i := 0; i < len(receipts); i++ { + receipts[i] = &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + CumulativeGasUsed: 0x888888888, + Logs: make([]*types.Log, 5), + } + } + allReceipts := make([]types.Receipts, n) + for i := 0; i < n; i++ { + allReceipts[i] = receipts + } + return allReceipts +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 90619169a0..0e116ef999 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -104,9 +104,9 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) { return 0, errNotSupported } -// AppendAncient returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { - return errNotSupported +// ModifyAncients is not supported. +func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) { + return 0, errNotSupported } // TruncateAncients returns an error as we don't have a backing chain freezer. @@ -122,9 +122,7 @@ func (db *nofreezedb) Sync() error { // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { - return &nofreezedb{ - KeyValueStore: db, - } + return &nofreezedb{KeyValueStore: db} } // NewDatabaseWithFreezer creates a high level database on top of a given key- @@ -132,7 +130,7 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { // storage. func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) { // Create the idle freezer instance - frdb, err := newFreezer(freezer, namespace, readonly) + frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy) if err != nil { return nil, err } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 4262a615a7..4cecbc50f4 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -61,6 +61,9 @@ const ( // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. freezerBatchLimit = 30000 + + // freezerTableSize defines the maximum size of freezer data files. + freezerTableSize = 2 * 1000 * 1000 * 1000 ) // freezer is an memory mapped append-only database to store immutable chain data @@ -77,6 +80,10 @@ type freezer struct { frozen uint64 // Number of blocks already frozen threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) + // This lock synchronizes writers and the truncate operation. + writeLock sync.Mutex + writeBatch *freezerBatch + readonly bool tables map[string]*freezerTable // Data tables for storing everything instanceLock fileutil.Releaser // File-system lock to prevent double opens @@ -90,7 +97,10 @@ type freezer struct { // newFreezer creates a chain freezer that moves ancient chain data into // append-only flat file containers. -func newFreezer(datadir string, namespace string, readonly bool) (*freezer, error) { +// +// The 'tables' argument defines the data tables. If the value of a map +// entry is true, snappy compression is disabled for the table. +func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) { // Create the initial freezer object var ( readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) @@ -119,8 +129,10 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro trigger: make(chan chan struct{}), quit: make(chan struct{}), } - for name, disableSnappy := range FreezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy) + + // Create the tables. + for name, disableSnappy := range tables { + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() @@ -130,6 +142,8 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro } freezer.tables[name] = table } + + // Truncate all tables to common length. if err := freezer.repair(); err != nil { for _, table := range freezer.tables { table.Close() @@ -137,12 +151,19 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro lock.Release() return nil, err } + + // Create the write batch. + freezer.writeBatch = newFreezerBatch(freezer) + log.Info("Opened ancient database", "database", datadir, "readonly", readonly) return freezer, nil } // Close terminates the chain freezer, unmapping all the data files. func (f *freezer) Close() error { + f.writeLock.Lock() + defer f.writeLock.Unlock() + var errs []error f.closeOnce.Do(func() { close(f.quit) @@ -199,60 +220,49 @@ func (f *freezer) Ancients() (uint64, error) { // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { + // This needs the write lock to avoid data races on table fields. + // Speed doesn't matter here, AncientSize is for debugging. + f.writeLock.Lock() + defer f.writeLock.Unlock() + if table := f.tables[kind]; table != nil { return table.size() } return 0, errUnknownTable } -// AppendAncient injects all binary blobs belong to block at the end of the -// append-only immutable table files. -// -// Notably, this function is lock free but kind of thread-safe. All out-of-order -// injection will be rejected. But if two injections with same number happen at -// the same time, we can get into the trouble. -func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) { +// ModifyAncients runs the given write operation. +func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { if f.readonly { - return errReadOnly - } - // Ensure the binary blobs we are appending is continuous with freezer. - if atomic.LoadUint64(&f.frozen) != number { - return errOutOrderInsertion + return 0, errReadOnly } - // Rollback all inserted data if any insertion below failed to ensure - // the tables won't out of sync. + f.writeLock.Lock() + defer f.writeLock.Unlock() + + // Roll back all tables to the starting position in case of error. + prevItem := f.frozen defer func() { if err != nil { - rerr := f.repair() - if rerr != nil { - log.Crit("Failed to repair freezer", "err", rerr) + // The write operation has failed. Go back to the previous item position. + for name, table := range f.tables { + err := table.truncate(prevItem) + if err != nil { + log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) + } } - log.Info("Append ancient failed", "number", number, "err", err) } }() - // Inject all the components into the relevant data tables - if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { - log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { - log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { - log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { - log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) - return err + + f.writeBatch.reset() + if err := fn(f.writeBatch); err != nil { + return 0, err } - if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { - log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) - return err + item, writeSize, err := f.writeBatch.commit() + if err != nil { + return 0, err } - atomic.AddUint64(&f.frozen, 1) // Only modify atomically - return nil + atomic.StoreUint64(&f.frozen, item) + return writeSize, nil } // TruncateAncients discards any recent data above the provided threshold number. @@ -260,6 +270,9 @@ func (f *freezer) TruncateAncients(items uint64) error { if f.readonly { return errReadOnly } + f.writeLock.Lock() + defer f.writeLock.Unlock() + if atomic.LoadUint64(&f.frozen) <= items { return nil } @@ -286,6 +299,24 @@ func (f *freezer) Sync() error { return nil } +// repair truncates all data tables to the same length. +func (f *freezer) repair() error { + min := uint64(math.MaxUint64) + for _, table := range f.tables { + items := atomic.LoadUint64(&table.items) + if min > items { + min = items + } + } + for _, table := range f.tables { + if err := table.truncate(min); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, min) + return nil +} + // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -352,54 +383,28 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { backoff = true continue } + // Seems we have data ready to be frozen, process in usable batches - limit := *number - threshold - if limit-f.frozen > freezerBatchLimit { - limit = f.frozen + freezerBatchLimit - } var ( start = time.Now() - first = f.frozen - ancients = make([]common.Hash, 0, limit-f.frozen) + first, _ = f.Ancients() + limit = *number - threshold ) - for f.frozen <= limit { - // Retrieves all the components of the canonical block - hash := ReadCanonicalHash(nfdb, f.frozen) - if hash == (common.Hash{}) { - log.Error("Canonical hash missing, can't freeze", "number", f.frozen) - break - } - header := ReadHeaderRLP(nfdb, hash, f.frozen) - if len(header) == 0 { - log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - body := ReadBodyRLP(nfdb, hash, f.frozen) - if len(body) == 0 { - log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - receipts := ReadReceiptsRLP(nfdb, hash, f.frozen) - if len(receipts) == 0 { - log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - td := ReadTdRLP(nfdb, hash, f.frozen) - if len(td) == 0 { - log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) - // Inject all the components into the relevant data tables - if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil { - break - } - ancients = append(ancients, hash) + if limit-first > freezerBatchLimit { + limit = first + freezerBatchLimit + } + ancients, err := f.freezeRange(nfdb, first, limit) + if err != nil { + log.Error("Error in block freeze operation", "err", err) + backoff = true + continue } + // Batch of blocks have been frozen, flush them before wiping from leveldb if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } + // Wipe out all data from the active database batch := db.NewBatch() for i := 0; i < len(ancients); i++ { @@ -464,6 +469,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Crit("Failed to delete dangling side blocks", "err", err) } } + // Log something friendly for the user context := []interface{}{ "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, @@ -480,20 +486,54 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } } -// repair truncates all data tables to the same length. -func (f *freezer) repair() error { - min := uint64(math.MaxUint64) - for _, table := range f.tables { - items := atomic.LoadUint64(&table.items) - if min > items { - min = items - } - } - for _, table := range f.tables { - if err := table.truncate(min); err != nil { - return err +func (f *freezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) { + hashes = make([]common.Hash, 0, limit-number) + + _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for ; number <= limit; number++ { + // Retrieve all the components of the canonical block. + hash := ReadCanonicalHash(nfdb, number) + if hash == (common.Hash{}) { + return fmt.Errorf("canonical hash missing, can't freeze block %d", number) + } + header := ReadHeaderRLP(nfdb, hash, number) + if len(header) == 0 { + return fmt.Errorf("block header missing, can't freeze block %d", number) + } + body := ReadBodyRLP(nfdb, hash, number) + if len(body) == 0 { + return fmt.Errorf("block body missing, can't freeze block %d", number) + } + receipts := ReadReceiptsRLP(nfdb, hash, number) + if len(receipts) == 0 { + return fmt.Errorf("block receipts missing, can't freeze block %d", number) + } + td := ReadTdRLP(nfdb, hash, number) + if len(td) == 0 { + return fmt.Errorf("total difficulty missing, can't freeze block %d", number) + } + + // Write to the batch. + if err := op.AppendRaw(freezerHashTable, number, hash[:]); err != nil { + return fmt.Errorf("can't write hash to freezer: %v", err) + } + if err := op.AppendRaw(freezerHeaderTable, number, header); err != nil { + return fmt.Errorf("can't write header to freezer: %v", err) + } + if err := op.AppendRaw(freezerBodiesTable, number, body); err != nil { + return fmt.Errorf("can't write body to freezer: %v", err) + } + if err := op.AppendRaw(freezerReceiptTable, number, receipts); err != nil { + return fmt.Errorf("can't write receipts to freezer: %v", err) + } + if err := op.AppendRaw(freezerDifficultyTable, number, td); err != nil { + return fmt.Errorf("can't write td to freezer: %v", err) + } + + hashes = append(hashes, hash) } - } - atomic.StoreUint64(&f.frozen, min) - return nil + return nil + }) + + return hashes, err } diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go new file mode 100644 index 0000000000..8297c0ac1d --- /dev/null +++ b/core/rawdb/freezer_batch.go @@ -0,0 +1,248 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "fmt" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/rlp" + "github.com/golang/snappy" +) + +// This is the maximum amount of data that will be buffered in memory +// for a single freezer table batch. +const freezerBatchBufferLimit = 2 * 1024 * 1024 + +// freezerBatch is a write operation of multiple items on a freezer. +type freezerBatch struct { + tables map[string]*freezerTableBatch +} + +func newFreezerBatch(f *freezer) *freezerBatch { + batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))} + for kind, table := range f.tables { + batch.tables[kind] = table.newBatch() + } + return batch +} + +// Append adds an RLP-encoded item of the given kind. +func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error { + return batch.tables[kind].Append(num, item) +} + +// AppendRaw adds an item of the given kind. +func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error { + return batch.tables[kind].AppendRaw(num, item) +} + +// reset initializes the batch. +func (batch *freezerBatch) reset() { + for _, tb := range batch.tables { + tb.reset() + } +} + +// commit is called at the end of a write operation and +// writes all remaining data to tables. +func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) { + // Check that count agrees on all batches. + item = uint64(math.MaxUint64) + for name, tb := range batch.tables { + if item < math.MaxUint64 && tb.curItem != item { + return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item) + } + item = tb.curItem + } + + // Commit all table batches. + for _, tb := range batch.tables { + if err := tb.commit(); err != nil { + return 0, 0, err + } + writeSize += tb.totalBytes + } + return item, writeSize, nil +} + +// freezerTableBatch is a batch for a freezer table. +type freezerTableBatch struct { + t *freezerTable + + sb *snappyBuffer + encBuffer writeBuffer + dataBuffer []byte + indexBuffer []byte + curItem uint64 // expected index of next append + totalBytes int64 // counts written bytes since reset +} + +// newBatch creates a new batch for the freezer table. +func (t *freezerTable) newBatch() *freezerTableBatch { + batch := &freezerTableBatch{t: t} + if !t.noCompression { + batch.sb = new(snappyBuffer) + } + batch.reset() + return batch +} + +// reset clears the batch for reuse. +func (batch *freezerTableBatch) reset() { + batch.dataBuffer = batch.dataBuffer[:0] + batch.indexBuffer = batch.indexBuffer[:0] + batch.curItem = atomic.LoadUint64(&batch.t.items) + batch.totalBytes = 0 +} + +// Append rlp-encodes and adds data at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) Append(item uint64, data interface{}) error { + if item != batch.curItem { + return errOutOrderInsertion + } + + // Encode the item. + batch.encBuffer.Reset() + if err := rlp.Encode(&batch.encBuffer, data); err != nil { + return err + } + encItem := batch.encBuffer.data + if batch.sb != nil { + encItem = batch.sb.compress(encItem) + } + return batch.appendItem(encItem) +} + +// AppendRaw injects a binary blob at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error { + if item != batch.curItem { + return errOutOrderInsertion + } + + encItem := blob + if batch.sb != nil { + encItem = batch.sb.compress(blob) + } + return batch.appendItem(encItem) +} + +func (batch *freezerTableBatch) appendItem(data []byte) error { + // Check if item fits into current data file. + itemSize := int64(len(data)) + itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer)) + if itemOffset+itemSize > int64(batch.t.maxFileSize) { + // It doesn't fit, go to next file first. + if err := batch.commit(); err != nil { + return err + } + if err := batch.t.advanceHead(); err != nil { + return err + } + itemOffset = 0 + } + + // Put data to buffer. + batch.dataBuffer = append(batch.dataBuffer, data...) + batch.totalBytes += itemSize + + // Put index entry to buffer. + entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)} + batch.indexBuffer = entry.append(batch.indexBuffer) + batch.curItem++ + + return batch.maybeCommit() +} + +// maybeCommit writes the buffered data if the buffer is full enough. +func (batch *freezerTableBatch) maybeCommit() error { + if len(batch.dataBuffer) > freezerBatchBufferLimit { + return batch.commit() + } + return nil +} + +// commit writes the batched items to the backing freezerTable. +func (batch *freezerTableBatch) commit() error { + // Write data. + _, err := batch.t.head.Write(batch.dataBuffer) + if err != nil { + return err + } + dataSize := int64(len(batch.dataBuffer)) + batch.dataBuffer = batch.dataBuffer[:0] + + // Write index. + _, err = batch.t.index.Write(batch.indexBuffer) + if err != nil { + return err + } + indexSize := int64(len(batch.indexBuffer)) + batch.indexBuffer = batch.indexBuffer[:0] + + // Update headBytes of table. + batch.t.headBytes += dataSize + atomic.StoreUint64(&batch.t.items, batch.curItem) + + // Update metrics. + batch.t.sizeGauge.Inc(dataSize + indexSize) + batch.t.writeMeter.Mark(dataSize + indexSize) + return nil +} + +// snappyBuffer writes snappy in block format, and can be reused. It is +// reset when WriteTo is called. +type snappyBuffer struct { + dst []byte +} + +// compress snappy-compresses the data. +func (s *snappyBuffer) compress(data []byte) []byte { + // The snappy library does not care what the capacity of the buffer is, + // but only checks the length. If the length is too small, it will + // allocate a brand new buffer. + // To avoid that, we check the required size here, and grow the size of the + // buffer to utilize the full capacity. + if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n { + if cap(s.dst) < n { + s.dst = make([]byte, n) + } + s.dst = s.dst[:n] + } + + s.dst = snappy.Encode(s.dst, data) + return s.dst +} + +// writeBuffer implements io.Writer for a byte slice. +type writeBuffer struct { + data []byte +} + +func (wb *writeBuffer) Write(data []byte) (int, error) { + wb.data = append(wb.data, data...) + return len(data), nil +} + +func (wb *writeBuffer) Reset() { + wb.data = wb.data[:0] +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 9d052f7cd8..22405cf9b4 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -17,6 +17,7 @@ package rawdb import ( + "bytes" "encoding/binary" "errors" "fmt" @@ -55,19 +56,20 @@ type indexEntry struct { const indexEntrySize = 6 -// unmarshallBinary deserializes binary b into the rawIndex entry. +// unmarshalBinary deserializes binary b into the rawIndex entry. func (i *indexEntry) unmarshalBinary(b []byte) error { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) return nil } -// marshallBinary serializes the rawIndex entry into binary. -func (i *indexEntry) marshallBinary() []byte { - b := make([]byte, indexEntrySize) - binary.BigEndian.PutUint16(b[:2], uint16(i.filenum)) - binary.BigEndian.PutUint32(b[2:6], i.offset) - return b +// append adds the encoded entry to the end of b. +func (i *indexEntry) append(b []byte) []byte { + offset := len(b) + out := append(b, make([]byte, indexEntrySize)...) + binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum)) + binary.BigEndian.PutUint32(out[offset+2:], i.offset) + return out } // bounds returns the start- and end- offsets, and the file number of where to @@ -107,7 +109,7 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file + headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables @@ -118,12 +120,7 @@ type freezerTable struct { // NewFreezerTable opens the given path as a freezer table. func NewFreezerTable(path, name string, disableSnappy bool) (*freezerTable, error) { - return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, disableSnappy) -} - -// newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy) + return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -164,10 +161,10 @@ func truncateFreezerFile(file *os.File, size int64) error { return nil } -// newCustomTable opens a freezer table, creating the data and index files if they are +// newTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -313,7 +310,7 @@ func (t *freezerTable) repair() error { } // Update the item and byte counters and return t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file - t.headBytes = uint32(contentSize) + t.headBytes = contentSize t.headId = lastIndex.filenum // Close opened files and preopen all files @@ -387,14 +384,14 @@ func (t *freezerTable) truncate(items uint64) error { t.releaseFilesAfter(expected.filenum, true) // Set back the historic head t.head = newHead - atomic.StoreUint32(&t.headId, expected.filenum) + t.headId = expected.filenum } if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { return err } // All data files truncated, set internal counters and return + t.headBytes = int64(expected.offset) atomic.StoreUint64(&t.items, items) - atomic.StoreUint32(&t.headBytes, expected.offset) // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() @@ -471,94 +468,6 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { } } -// Append injects a binary blob at the end of the freezer table. The item number -// is a precautionary parameter to ensure data correctness, but the table will -// reject already existing data. -// -// Note, this method will *not* flush any data to disk so be sure to explicitly -// fsync before irreversibly deleting data from the database. -func (t *freezerTable) Append(item uint64, blob []byte) error { - // Encode the blob before the lock portion - if !t.noCompression { - blob = snappy.Encode(nil, blob) - } - // Read lock prevents competition with truncate - retry, err := t.append(item, blob, false) - if err != nil { - return err - } - if retry { - // Read lock was insufficient, retry with a writelock - _, err = t.append(item, blob, true) - } - return err -} - -// append injects a binary blob at the end of the freezer table. -// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to -// false. -// However, if the data will grown the current file out of bounds, then this -// method will return 'true, nil', indicating that the caller should retry, this time -// with 'wlock' set to true. -func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) { - if wlock { - t.lock.Lock() - defer t.lock.Unlock() - } else { - t.lock.RLock() - defer t.lock.RUnlock() - } - // Ensure the table is still accessible - if t.index == nil || t.head == nil { - return false, errClosed - } - // Ensure only the next item can be written, nothing else - if atomic.LoadUint64(&t.items) != item { - return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) - } - bLen := uint32(len(encodedBlob)) - if t.headBytes+bLen < bLen || - t.headBytes+bLen > t.maxFileSize { - // Writing would overflow, so we need to open a new data file. - // If we don't already hold the writelock, abort and let the caller - // invoke this method a second time. - if !wlock { - return true, nil - } - nextID := atomic.LoadUint32(&t.headId) + 1 - // We open the next file in truncated mode -- if this file already - // exists, we need to start over from scratch on it - newHead, err := t.openFile(nextID, openFreezerFileTruncated) - if err != nil { - return false, err - } - // Close old file, and reopen in RDONLY mode - t.releaseFile(t.headId) - t.openFile(t.headId, openFreezerFileForReadOnly) - - // Swap out the current head - t.head = newHead - atomic.StoreUint32(&t.headBytes, 0) - atomic.StoreUint32(&t.headId, nextID) - } - if _, err := t.head.Write(encodedBlob); err != nil { - return false, err - } - newOffset := atomic.AddUint32(&t.headBytes, bLen) - idx := indexEntry{ - filenum: atomic.LoadUint32(&t.headId), - offset: newOffset, - } - // Write indexEntry - t.index.Write(idx.marshallBinary()) - - t.writeMeter.Mark(int64(bLen + indexEntrySize)) - t.sizeGauge.Inc(int64(bLen + indexEntrySize)) - - atomic.AddUint64(&t.items, 1) - return false, nil -} - // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -651,6 +560,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() + // Ensure the table and the item is accessible if t.index == nil || t.head == nil { return nil, nil, errClosed @@ -763,6 +673,32 @@ func (t *freezerTable) sizeNolock() (uint64, error) { return total, nil } +// advanceHead should be called when the current head file would outgrow the file limits, +// and a new file must be opened. The caller of this method must hold the write-lock +// before calling this method. +func (t *freezerTable) advanceHead() error { + t.lock.Lock() + defer t.lock.Unlock() + + // We open the next file in truncated mode -- if this file already + // exists, we need to start over from scratch on it. + nextID := t.headId + 1 + newHead, err := t.openFile(nextID, openFreezerFileTruncated) + if err != nil { + return err + } + + // Close old file, and reopen in RDONLY mode. + t.releaseFile(t.headId) + t.openFile(t.headId, openFreezerFileForReadOnly) + + // Swap out the current head. + t.head = newHead + t.headBytes = 0 + t.headId = nextID + return nil +} + // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { @@ -775,10 +711,21 @@ func (t *freezerTable) Sync() error { // DumpIndex is a debug print utility function, mainly for testing. It can also // be used to analyse a live freezer table index. func (t *freezerTable) DumpIndex(start, stop int64) { + t.dumpIndex(os.Stdout, start, stop) +} + +func (t *freezerTable) dumpIndexString(start, stop int64) string { + var out bytes.Buffer + out.WriteString("\n") + t.dumpIndex(&out, start, stop) + return out.String() +} + +func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { buf := make([]byte, indexEntrySize) - fmt.Printf("| number | fileno | offset |\n") - fmt.Printf("|--------|--------|--------|\n") + fmt.Fprintf(w, "| number | fileno | offset |\n") + fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { @@ -786,10 +733,10 @@ func (t *freezerTable) DumpIndex(start, stop int64) { } var entry indexEntry entry.unmarshalBinary(buf) - fmt.Printf("| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) + fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) if stop > 0 && i >= uint64(stop) { break } } - fmt.Printf("|--------------------------|\n") + fmt.Fprintf(w, "|--------------------------|\n") } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index e8a8b5c463..803809b520 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -18,49 +18,36 @@ package rawdb import ( "bytes" - "encoding/binary" "fmt" - "io/ioutil" "math/rand" "os" "path/filepath" - "sync" "testing" "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/stretchr/testify/require" ) func init() { rand.Seed(time.Now().Unix()) } -// Gets a chunk of data, filled with 'b' -func getChunk(size int, b int) []byte { - data := make([]byte, size) - for i := range data { - data[i] = byte(b) - } - return data -} - // TestFreezerBasics test initializing a freezertable from scratch, writing to the table, // and reading it back. func TestFreezerBasics(t *testing.T) { t.Parallel() // set cutoff at 50 bytes - f, err := newCustomTable(os.TempDir(), + f, err := newTable(os.TempDir(), fmt.Sprintf("unittest-%d", rand.Uint64()), metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true) if err != nil { t.Fatal(err) } defer f.Close() + // Write 15 bytes 255 times, results in 85 files - for x := 0; x < 255; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) //print(t, f, 0) //print(t, f, 1) @@ -98,16 +85,21 @@ func TestFreezerBasicsClosing(t *testing.T) { f *freezerTable err error ) - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } - // Write 15 bytes 255 times, results in 85 files + + // Write 15 bytes 255 times, results in 85 files. + // In-between writes, the table is closed and re-opened. for x := 0; x < 255; x++ { data := getChunk(15, x) - f.Append(uint64(x), data) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(uint64(x), data)) + require.NoError(t, batch.commit()) f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -124,7 +116,7 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -137,22 +129,22 @@ func TestFreezerRepairDanglingHead(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 255; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) + // The last item should be there if _, err = f.Retrieve(0xfe); err != nil { t.Fatal(err) } f.Close() } + // open the index idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) if err != nil { @@ -165,9 +157,10 @@ func TestFreezerRepairDanglingHead(t *testing.T) { } idxFile.Truncate(stat.Size() - 4) idxFile.Close() + // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -188,22 +181,22 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) - { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 0xff; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // open the index idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) if err != nil { @@ -213,9 +206,10 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // 0-indexEntry, 1-indexEntry, corrupt-indexEntry idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) idxFile.Close() + // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -228,15 +222,17 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 + batch := f.newBatch() for x := 1; x < 0xff; x++ { - data := getChunk(15, ^x) - f.Append(uint64(x), data) + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } + require.NoError(t, batch.commit()) f.Close() } + // And if we open it, we should now be able to read all of them (new values) { - f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, _ := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) @@ -255,22 +251,21 @@ func TestSnappyDetection(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) + // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 0xff; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) f.Close() } + // Open without snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, false) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false) if err != nil { t.Fatal(err) } @@ -282,7 +277,7 @@ func TestSnappyDetection(t *testing.T) { // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -292,8 +287,8 @@ func TestSnappyDetection(t *testing.T) { t.Fatalf("expected no error, got %v", err) } } - } + func assertFileSize(f string, size int64) error { stat, err := os.Stat(f) if err != nil { @@ -303,7 +298,6 @@ func assertFileSize(f string, size int64) error { return fmt.Errorf("error, expected size %d, got %d", size, stat.Size()) } return nil - } // TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data, @@ -313,16 +307,15 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) - { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 9 times : 150 bytes - for x := 0; x < 9; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 9, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { f.Close() @@ -331,6 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { f.Close() // File sizes should be 45, 45, 45 : items[3, 3, 3) } + // Crop third file fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) // Truncate third file: 45 ,45, 20 @@ -345,17 +339,18 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { file.Truncate(20) file.Close() } + // Open db it again // It should restore the file(s) to // 45, 45, 15 // with 3+3+1 items { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } + defer f.Close() if f.items != 7 { - f.Close() t.Fatalf("expected %d items, got %d", 7, f.items) } if err := assertFileSize(fileToCrop, 15); err != nil { @@ -365,30 +360,29 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { } func TestFreezerTruncate(t *testing.T) { - t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncation-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // Reopen, truncate { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -401,9 +395,7 @@ func TestFreezerTruncate(t *testing.T) { if f.headBytes != 15 { t.Fatalf("expected %d bytes, got %d", 15, f.headBytes) } - } - } // TestFreezerRepairFirstFile tests a head file with the very first item only half-written. @@ -412,20 +404,26 @@ func TestFreezerRepairFirstFile(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 80 bytes, splitting out into two files - f.Append(0, getChunk(40, 0xFF)) - f.Append(1, getChunk(40, 0xEE)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE))) + require.NoError(t, batch.commit()) + // The last item should be there - if _, err = f.Retrieve(f.items - 1); err != nil { + if _, err = f.Retrieve(1); err != nil { t.Fatal(err) } f.Close() } + // Truncate the file in half fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) { @@ -439,9 +437,10 @@ func TestFreezerRepairFirstFile(t *testing.T) { file.Truncate(20) file.Close() } + // Reopen { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -449,9 +448,14 @@ func TestFreezerRepairFirstFile(t *testing.T) { f.Close() t.Fatalf("expected %d items, got %d", 0, f.items) } + // Write 40 bytes - f.Append(1, getChunk(40, 0xDD)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD))) + require.NoError(t, batch.commit()) + f.Close() + // Should have been truncated down to zero and then 40 written if err := assertFileSize(fileToCrop, 40); err != nil { t.Fatal(err) @@ -468,25 +472,26 @@ func TestFreezerReadAndTruncate(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // Reopen and read all files { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -497,40 +502,48 @@ func TestFreezerReadAndTruncate(t *testing.T) { for y := byte(0); y < 30; y++ { f.Retrieve(uint64(y)) } + // Now, truncate back to zero f.truncate(0) + // Write the data again + batch := f.newBatch() for x := 0; x < 30; x++ { - data := getChunk(15, ^x) - if err := f.Append(uint64(x), data); err != nil { - t.Fatalf("error %v", err) - } + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } + require.NoError(t, batch.commit()) f.Close() } } -func TestOffset(t *testing.T) { +func TestFreezerOffset(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("offset-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } + // Write 6 x 20 bytes, splitting out into three files - f.Append(0, getChunk(20, 0xFF)) - f.Append(1, getChunk(20, 0xEE)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) - f.Append(2, getChunk(20, 0xdd)) - f.Append(3, getChunk(20, 0xcc)) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) - f.Append(4, getChunk(20, 0xbb)) - f.Append(5, getChunk(20, 0xaa)) - f.DumpIndex(0, 100) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.commit()) + + t.Log(f.dumpIndexString(0, 100)) f.Close() } + // Now crop it. { // delete files 0 and 1 @@ -558,7 +571,7 @@ func TestOffset(t *testing.T) { filenum: tailId, offset: itemOffset, } - buf := zeroIndex.marshallBinary() + buf := zeroIndex.append(nil) // Overwrite index zero copy(indexBuf, buf) // Remove the four next indices by overwriting @@ -567,44 +580,36 @@ func TestOffset(t *testing.T) { // Need to truncate the moved index items indexFile.Truncate(indexEntrySize * (1 + 2)) indexFile.Close() - } + // Now open again - checkPresent := func(numDeleted uint64) { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } - f.DumpIndex(0, 100) - // It should allow writing item 6 - f.Append(numDeleted+2, getChunk(20, 0x99)) - - // It should be fine to fetch 4,5,6 - if got, err := f.Retrieve(numDeleted); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - if got, err := f.Retrieve(numDeleted + 1); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - if got, err := f.Retrieve(numDeleted + 2); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - - // It should error at 0, 1,2,3 - for i := numDeleted - 1; i > numDeleted-10; i-- { - if _, err := f.Retrieve(i); err == nil { - t.Fatal("expected err") - } - } - } - checkPresent(4) - // Now, let's pretend we have deleted 1M items + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + // It should allow writing item 6. + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) + require.NoError(t, batch.commit()) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x99), + }) + } + + // Edit the index again, with a much larger initial offset of 1M. { // Read the index file p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) @@ -624,13 +629,71 @@ func TestOffset(t *testing.T) { offset: itemOffset, filenum: tailId, } - buf := zeroIndex.marshallBinary() + buf := zeroIndex.append(nil) // Overwrite index zero copy(indexBuf, buf) indexFile.WriteAt(indexBuf, 0) indexFile.Close() } - checkPresent(1000000) + + // Check that existing items have been moved to index 1M. + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 999999: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1000000: getChunk(20, 0xbb), + 1000001: getChunk(20, 0xaa), + }) + } +} + +func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { + t.Helper() + + for item, wantBytes := range items { + value, err := f.Retrieve(item) + if err != nil { + t.Fatalf("can't get expected item %d: %v", item, err) + } + if !bytes.Equal(value, wantBytes) { + t.Fatalf("item %d has wrong value %x (want %x)", item, value, wantBytes) + } + } +} + +func checkRetrieveError(t *testing.T, f *freezerTable, items map[uint64]error) { + t.Helper() + + for item, wantError := range items { + value, err := f.Retrieve(item) + if err == nil { + t.Fatalf("unexpected value %x for item %d, want error %v", item, value, wantError) + } + if err != wantError { + t.Fatalf("wrong error for item %d: %v", item, err) + } + } +} + +// Gets a chunk of data, filled with 'b' +func getChunk(size int, b int) []byte { + data := make([]byte, size) + for i := range data { + data[i] = byte(b) + } + return data } // TODO (?) @@ -644,53 +707,18 @@ func TestOffset(t *testing.T) { // should be handled already, and the case described above can only (?) happen if an // external process/user deletes files from the filesystem. -// TestAppendTruncateParallel is a test to check if the Append/truncate operations are -// racy. -// -// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent -// on timing rather than 'clever' input -- there's no determinism. -func TestAppendTruncateParallel(t *testing.T) { - dir, err := ioutil.TempDir("", "freezer") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(dir) +func writeChunks(t *testing.T, ft *freezerTable, n int, length int) { + t.Helper() - f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true) - if err != nil { - t.Fatal(err) - } - - fill := func(mark uint64) []byte { - data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, mark) - return data - } - - for i := 0; i < 5000; i++ { - f.truncate(0) - data0 := fill(0) - f.Append(0, data0) - data1 := fill(1) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - f.truncate(0) - wg.Done() - }() - go func() { - f.Append(1, data1) - wg.Done() - }() - wg.Wait() - - if have, err := f.Retrieve(0); err == nil { - if !bytes.Equal(have, data0) { - t.Fatalf("have %x want %x", have, data0) - } + batch := ft.newBatch() + for i := 0; i < n; i++ { + if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil { + t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err) } } + if err := batch.commit(); err != nil { + t.Fatalf("Commit returned error: %v", err) + } } // TestSequentialRead does some basic tests on the RetrieveItems. @@ -698,20 +726,17 @@ func TestSequentialRead(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("batchread-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) f.DumpIndex(0, 30) f.Close() } { // Open it, iterate, verify iteration - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -732,7 +757,7 @@ func TestSequentialRead(t *testing.T) { } { // Open it, iterate, verify byte limit. The byte limit is less than item // size, so each lookup should only return one item - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } @@ -761,16 +786,13 @@ func TestSequentialReadByteLimit(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("batchread-2-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true) if err != nil { t.Fatal(err) } // Write 10 bytes 30 times, // Splitting it at every 100 bytes (10 items) - for x := 0; x < 30; x++ { - data := getChunk(10, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 10) f.Close() } for i, tc := range []struct { @@ -786,7 +808,7 @@ func TestSequentialReadByteLimit(t *testing.T) { {100, 109, 10}, } { { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true) if err != nil { t.Fatal(err) } diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go new file mode 100644 index 0000000000..7359131c89 --- /dev/null +++ b/core/rawdb/freezer_test.go @@ -0,0 +1,301 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "math/big" + "math/rand" + "os" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/stretchr/testify/require" +) + +var freezerTestTableDef = map[string]bool{"test": true} + +func TestFreezerModify(t *testing.T) { + t.Parallel() + + // Create test data. + var valuesRaw [][]byte + var valuesRLP []*big.Int + for x := 0; x < 100; x++ { + v := getChunk(256, x) + valuesRaw = append(valuesRaw, v) + iv := big.NewInt(int64(x)) + iv = iv.Exp(iv, iv, nil) + valuesRLP = append(valuesRLP, iv) + } + + tables := map[string]bool{"raw": true, "rlp": false} + f, dir := newFreezerForTesting(t, tables) + defer os.RemoveAll(dir) + defer f.Close() + + // Commit test data. + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := range valuesRaw { + if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil { + return err + } + if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("ModifyAncients failed:", err) + } + + // Dump indexes. + for _, table := range f.tables { + t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw)))) + } + + // Read back test data. + checkAncientCount(t, f, "raw", uint64(len(valuesRaw))) + checkAncientCount(t, f, "rlp", uint64(len(valuesRLP))) + for i := range valuesRaw { + v, _ := f.Ancient("raw", uint64(i)) + if !bytes.Equal(v, valuesRaw[i]) { + t.Fatalf("wrong raw value at %d: %x", i, v) + } + ivEnc, _ := f.Ancient("rlp", uint64(i)) + want, _ := rlp.EncodeToBytes(valuesRLP[i]) + if !bytes.Equal(ivEnc, want) { + t.Fatalf("wrong RLP value at %d: %x", i, ivEnc) + } + } +} + +// This checks that ModifyAncients rolls back freezer updates +// when the function passed to it returns an error. +func TestFreezerModifyRollback(t *testing.T) { + t.Parallel() + + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + + theError := errors.New("oops") + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + // Append three items. This creates two files immediately, + // because the table size limit of the test freezer is 2048. + require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048))) + return theError + }) + if err != theError { + t.Errorf("ModifyAncients returned wrong error %q", err) + } + checkAncientCount(t, f, "test", 0) + f.Close() + + // Reopen and check that the rolled-back data doesn't reappear. + tables := map[string]bool{"test": true} + f2, err := newFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err) + } + defer f2.Close() + checkAncientCount(t, f2, "test", 0) +} + +// This test runs ModifyAncients and Ancient concurrently with each other. +func TestFreezerConcurrentModifyRetrieve(t *testing.T) { + t.Parallel() + + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + defer f.Close() + + var ( + numReaders = 5 + writeBatchSize = uint64(50) + written = make(chan uint64, numReaders*6) + wg sync.WaitGroup + ) + wg.Add(numReaders + 1) + + // Launch the writer. It appends 10000 items in batches. + go func() { + defer wg.Done() + defer close(written) + for item := uint64(0); item < 10000; item += writeBatchSize { + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < writeBatchSize; i++ { + item := item + i + value := getChunk(32, int(item)) + if err := op.AppendRaw("test", item, value); err != nil { + return err + } + } + return nil + }) + if err != nil { + panic(err) + } + for i := 0; i < numReaders; i++ { + written <- item + writeBatchSize + } + } + }() + + // Launch the readers. They read random items from the freezer up to the + // current frozen item count. + for i := 0; i < numReaders; i++ { + go func() { + defer wg.Done() + for frozen := range written { + for rc := 0; rc < 80; rc++ { + num := uint64(rand.Intn(int(frozen))) + value, err := f.Ancient("test", num) + if err != nil { + panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err)) + } + if !bytes.Equal(value, getChunk(32, int(num))) { + panic(fmt.Errorf("wrong value at %d", num)) + } + } + } + }() + } + + wg.Wait() +} + +// This test runs ModifyAncients and TruncateAncients concurrently with each other. +func TestFreezerConcurrentModifyTruncate(t *testing.T) { + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + defer f.Close() + + var item = make([]byte, 256) + + for i := 0; i < 1000; i++ { + // First reset and write 100 items. + if err := f.TruncateAncients(0); err != nil { + t.Fatal("truncate failed:", err) + } + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < 100; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("modify failed:", err) + } + checkAncientCount(t, f, "test", 100) + + // Now append 100 more items and truncate concurrently. + var ( + wg sync.WaitGroup + truncateErr error + modifyErr error + ) + wg.Add(3) + go func() { + _, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(100); i < 200; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + wg.Done() + }() + go func() { + truncateErr = f.TruncateAncients(10) + wg.Done() + }() + go func() { + f.AncientSize("test") + wg.Done() + }() + wg.Wait() + + // Now check the outcome. If the truncate operation went through first, the append + // fails, otherwise it succeeds. In either case, the freezer should be positioned + // at 10 after both operations are done. + if truncateErr != nil { + t.Fatal("concurrent truncate failed:", err) + } + if !(modifyErr == nil || modifyErr == errOutOrderInsertion) { + t.Fatal("wrong error from concurrent modify:", modifyErr) + } + checkAncientCount(t, f, "test", 10) + } +} + +func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) { + t.Helper() + + dir, err := ioutil.TempDir("", "freezer") + if err != nil { + t.Fatal(err) + } + // note: using low max table size here to ensure the tests actually + // switch between multiple files. + f, err := newFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatal("can't open freezer", err) + } + return f, dir +} + +// checkAncientCount verifies that the freezer contains n items. +func checkAncientCount(t *testing.T, f *freezer, kind string, n uint64) { + t.Helper() + + if frozen, _ := f.Ancients(); frozen != n { + t.Fatalf("Ancients() returned %d, want %d", frozen, n) + } + + // Check at index n-1. + if n > 0 { + index := n - 1 + if ok, _ := f.HasAncient(kind, index); !ok { + t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err != nil { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } + } + + // Check at index n. + index := n + if ok, _ := f.HasAncient(kind, index); ok { + t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err == nil { + t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index) + } else if err != errOutOfBounds { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 586451c064..02e23b5174 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -80,10 +80,9 @@ func (t *table) AncientSize(kind string) (uint64, error) { return t.db.AncientSize(kind) } -// AppendAncient is a noop passthrough that just forwards the request to the underlying -// database. -func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { - return t.db.AppendAncient(number, hash, header, body, receipts, td) +// ModifyAncients runs an ancient write operation on the underlying database. +func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) { + return t.db.ModifyAncients(fn) } // TruncateAncients is a noop passthrough that just forwards the request to the underlying diff --git a/ethdb/database.go b/ethdb/database.go index bdc09d5e98..3c6500d1dc 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -92,9 +92,10 @@ type AncientReader interface { // AncientWriter contains the methods required to write to immutable ancient data. type AncientWriter interface { - // AppendAncient injects all binary blobs belong to block at the end of the - // append-only immutable table files. - AppendAncient(number uint64, hash, header, body, receipt, td []byte) error + // ModifyAncients runs a write operation on the ancient store. + // If the function returns an error, any changes to the underlying store are reverted. + // The integer return value is the total size of the written data. + ModifyAncients(func(AncientWriteOp) error) (int64, error) // TruncateAncients discards all but the first n ancient data from the ancient store. TruncateAncients(n uint64) error @@ -103,6 +104,15 @@ type AncientWriter interface { Sync() error } +// AncientWriteOp is given to the function argument of ModifyAncients. +type AncientWriteOp interface { + // Append adds an RLP-encoded item. + Append(kind string, number uint64, item interface{}) error + + // AppendRaw adds an item without RLP-encoding it. + AppendRaw(kind string, number uint64, item []byte) error +} + // Reader contains the methods required to read data from both key-value as well as // immutable ancient data. type Reader interface {