|
|
@ -24,9 +24,7 @@ import ( |
|
|
|
"path/filepath" |
|
|
|
"path/filepath" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common" |
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/ethdb" |
|
|
|
"github.com/ethereum/go-ethereum/ethdb" |
|
|
|
"github.com/ethereum/go-ethereum/log" |
|
|
|
"github.com/ethereum/go-ethereum/log" |
|
|
|
"github.com/ethereum/go-ethereum/metrics" |
|
|
|
"github.com/ethereum/go-ethereum/metrics" |
|
|
@ -389,115 +387,3 @@ func (f *Freezer) repair() error { |
|
|
|
f.tail.Store(tail) |
|
|
|
f.tail.Store(tail) |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// convertLegacyFn takes a raw freezer entry in an older format and
|
|
|
|
|
|
|
|
// returns it in the new format.
|
|
|
|
|
|
|
|
type convertLegacyFn = func([]byte) ([]byte, error) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// MigrateTable processes the entries in a given table in sequence
|
|
|
|
|
|
|
|
// converting them to a new format if they're of an old format.
|
|
|
|
|
|
|
|
func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error { |
|
|
|
|
|
|
|
if f.readonly { |
|
|
|
|
|
|
|
return errReadOnly |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
f.writeLock.Lock() |
|
|
|
|
|
|
|
defer f.writeLock.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
table, ok := f.tables[kind] |
|
|
|
|
|
|
|
if !ok { |
|
|
|
|
|
|
|
return errUnknownTable |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// forEach iterates every entry in the table serially and in order, calling `fn`
|
|
|
|
|
|
|
|
// with the item as argument. If `fn` returns an error the iteration stops
|
|
|
|
|
|
|
|
// and that error will be returned.
|
|
|
|
|
|
|
|
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error { |
|
|
|
|
|
|
|
var ( |
|
|
|
|
|
|
|
items = t.items.Load() |
|
|
|
|
|
|
|
batchSize = uint64(1024) |
|
|
|
|
|
|
|
maxBytes = uint64(1024 * 1024) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
for i := offset; i < items; { |
|
|
|
|
|
|
|
if i+batchSize > items { |
|
|
|
|
|
|
|
batchSize = items - i |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
data, err := t.RetrieveItems(i, batchSize, maxBytes) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for j, item := range data { |
|
|
|
|
|
|
|
if err := fn(i+uint64(j), item); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
i += uint64(len(data)) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
|
|
|
|
|
|
|
|
// process assumes no deletion at tail and needs to be modified to account for that.
|
|
|
|
|
|
|
|
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 { |
|
|
|
|
|
|
|
return errors.New("migration not supported for tail-deleted freezers") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ancientsPath := filepath.Dir(table.index.Name()) |
|
|
|
|
|
|
|
// Set up new dir for the migrated table, the content of which
|
|
|
|
|
|
|
|
// we'll at the end move over to the ancients dir.
|
|
|
|
|
|
|
|
migrationPath := filepath.Join(ancientsPath, "migration") |
|
|
|
|
|
|
|
newTable, err := newFreezerTable(migrationPath, kind, table.noCompression, false) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
var ( |
|
|
|
|
|
|
|
batch = newTable.newBatch() |
|
|
|
|
|
|
|
out []byte |
|
|
|
|
|
|
|
start = time.Now() |
|
|
|
|
|
|
|
logged = time.Now() |
|
|
|
|
|
|
|
offset = newTable.items.Load() |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
if offset > 0 { |
|
|
|
|
|
|
|
log.Info("found previous migration attempt", "migrated", offset) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Iterate through entries and transform them
|
|
|
|
|
|
|
|
if err := forEach(table, offset, func(i uint64, blob []byte) error { |
|
|
|
|
|
|
|
if i%10000 == 0 && time.Since(logged) > 16*time.Second { |
|
|
|
|
|
|
|
log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start))) |
|
|
|
|
|
|
|
logged = time.Now() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
out, err = convert(blob) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if err := batch.AppendRaw(i, out); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if err := batch.commit(); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start))) |
|
|
|
|
|
|
|
// Release and delete old table files. Note this won't
|
|
|
|
|
|
|
|
// delete the index file.
|
|
|
|
|
|
|
|
table.releaseFilesAfter(0, true) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if err := newTable.Close(); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
files, err := os.ReadDir(migrationPath) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Move migrated files to ancients dir.
|
|
|
|
|
|
|
|
for _, f := range files { |
|
|
|
|
|
|
|
// This will replace the old index file as a side-effect.
|
|
|
|
|
|
|
|
if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Delete by now empty dir.
|
|
|
|
|
|
|
|
if err := os.Remove(migrationPath); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|