diff --git a/cmd/integration/commands/state_domains.go b/cmd/integration/commands/state_domains.go index 6ca2ecfbea1..dbb0871c2d7 100644 --- a/cmd/integration/commands/state_domains.go +++ b/cmd/integration/commands/state_domains.go @@ -18,12 +18,19 @@ package commands import ( "context" + "encoding/binary" "encoding/hex" "errors" "fmt" + "os" + "path" "path/filepath" + "runtime" + "sort" "strings" + "github.com/erigontech/erigon-lib/etl" + "github.com/erigontech/erigon-lib/seg" state3 "github.com/erigontech/erigon-lib/state" "github.com/spf13/cobra" @@ -33,8 +40,11 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/length" + downloadertype "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/mdbx" kv2 "github.com/erigontech/erigon-lib/kv/mdbx" + statelib "github.com/erigontech/erigon-lib/state" "github.com/erigontech/erigon/cmd/utils" "github.com/erigontech/erigon/core" "github.com/erigontech/erigon/core/state" @@ -52,12 +62,21 @@ func init() { withStartTx(readDomains) rootCmd.AddCommand(readDomains) + + withDataDir(purifyDomains) + purifyDomains.Flags().StringVar(&purifyDir, "purifiedDomain", "purified-output", "") + purifyDomains.Flags().BoolVar(&purifyOnlyCommitment, "only-commitment", true, "purify only commitment domain") + purifyDomains.Flags().BoolVar(&replaceInDatadir, "replace-in-datadir", false, "replace the purified domains directly in datadir (will remove .kvei and .bt too)") + rootCmd.AddCommand(purifyDomains) } // if trie variant is not hex, we could not have another rootHash with to verify it var ( - stepSize uint64 - lastStep uint64 + stepSize uint64 + lastStep uint64 + purifyDir string + purifyOnlyCommitment bool + replaceInDatadir bool ) // write command to just seek and query state by addr and domain from state db and files (if any) @@ -120,6 +139,315 @@ var readDomains = &cobra.Command{ }, } +var purifyDomains = &cobra.Command{ + Use: "purify_domains", + Short: `Regenerate kv files without repeating keys.`, + Example: "go run ./cmd/integration purify_domains --datadir=... --verbosity=3", + Args: cobra.ArbitraryArgs, + Run: func(cmd *cobra.Command, args []string) { + dirs := datadir.New(datadirCli) + // Iterate over all the files in dirs.SnapDomain and print them + domainDir := dirs.SnapDomain + + // make a temporary dir + tmpDir, err := os.MkdirTemp(dirs.Tmp, "purifyTemp") // make a temporary dir to store the keys + if err != nil { + fmt.Println("Error creating temporary directory: ", err) + return + } + // make a temporary DB to store the keys + + purifyDB := mdbx.MustOpen(tmpDir) + defer purifyDB.Close() + var purificationDomains []string + if purifyOnlyCommitment { + purificationDomains = []string{"commitment"} + } else { + purificationDomains = []string{"account", "storage" /*"code",*/, "commitment", "receipt"} + } + //purificationDomains := []string{"commitment"} + for _, domain := range purificationDomains { + if err := makePurifiableIndexDB(purifyDB, dirs, log.New(), domain); err != nil { + fmt.Println("Error making purifiable index DB: ", err) + return + } + } + for _, domain := range purificationDomains { + if err := makePurifiedDomains(purifyDB, dirs, log.New(), domain); err != nil { + fmt.Println("Error making purifiable index DB: ", err) + return + } + } + if err != nil { + fmt.Printf("error walking the path %q: %v\n", domainDir, err) + } + }, +} + +func makePurifiableIndexDB(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domain string) error { + var tbl string + switch domain { + case "account": + tbl = kv.MaxTxNum + case "storage": + tbl = kv.HeaderNumber + case "code": + tbl = kv.HeaderCanonical + case "commitment": + tbl = kv.HeaderTD + case "receipt": + tbl = kv.BadHeaderNumber + default: + return fmt.Errorf("invalid domain %s", domain) + } + // Iterate over all the files in dirs.SnapDomain and print them + filesNamesToIndex := []string{} + if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip directories + if info.IsDir() { + return nil + } + if !strings.Contains(info.Name(), domain) { + return nil + } + // Here you can decide if you only want to process certain file extensions + // e.g., .kv files + if filepath.Ext(path) != ".kv" { + // Skip non-kv files if that's your domain’s format + return nil + } + + fmt.Printf("Add file to indexing of %s: %s\n", domain, path) + + filesNamesToIndex = append(filesNamesToIndex, info.Name()) + return nil + }); err != nil { + return fmt.Errorf("failed to walk through the domainDir %s: %w", domain, err) + } + + collector := etl.NewCollector("Purification", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) + defer collector.Close() + // sort the files by name + sort.Slice(filesNamesToIndex, func(i, j int) bool { + res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[i]) + if !ok { + panic("invalid file name") + } + res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[j]) + if !ok { + panic("invalid file name") + } + return res.From < res2.From + }) + tx, err := db.BeginRw(context.Background()) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + + // now start the file indexing + for i, fileName := range filesNamesToIndex { + if i == 0 { + continue // we can skip first layer as all the keys are already mapped to 0. + } + layerBytes := make([]byte, 4) + binary.BigEndian.PutUint32(layerBytes, uint32(i)) + count := 0 + + dec, err := seg.NewDecompressor(path.Join(dirs.SnapDomain, fileName)) + if err != nil { + return fmt.Errorf("failed to create decompressor: %w", err) + } + defer dec.Close() + getter := dec.MakeGetter() + fmt.Printf("Indexing file %s\n", fileName) + var buf []byte + for getter.HasNext() { + buf = buf[:0] + buf, _ = getter.Next(buf) + + collector.Collect(buf, layerBytes) + count++ + //fmt.Println("count: ", count, "keyLength: ", len(buf)) + if count%100000 == 0 { + fmt.Printf("Indexed %d keys in file %s\n", count, fileName) + } + // skip values + getter.Skip() + } + fmt.Printf("Indexed %d keys in file %s\n", count, fileName) + } + fmt.Println("Loading the keys to DB") + if err := collector.Load(tx, tbl, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { + return fmt.Errorf("failed to load: %w", err) + } + + return tx.Commit() +} + +func makePurifiedDomains(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domain string) error { + var tbl string + compressionType := seg.CompressNone + switch domain { + case "account": + tbl = kv.MaxTxNum + case "storage": + compressionType = seg.CompressKeys + tbl = kv.HeaderNumber + case "code": + compressionType = seg.CompressVals + tbl = kv.HeaderCanonical + case "commitment": + tbl = kv.HeaderTD + case "receipt": + tbl = kv.BadHeaderNumber + default: + return fmt.Errorf("invalid domain %s", domain) + } + // Iterate over all the files in dirs.SnapDomain and print them + filesNamesToPurify := []string{} + if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip directories + if info.IsDir() { + return nil + } + if !strings.Contains(info.Name(), domain) { + return nil + } + // Here you can decide if you only want to process certain file extensions + // e.g., .kv files + if filepath.Ext(path) != ".kv" { + // Skip non-kv files if that's your domain’s format + return nil + } + + fmt.Printf("Add file to purification of %s: %s\n", domain, path) + + filesNamesToPurify = append(filesNamesToPurify, info.Name()) + return nil + }); err != nil { + return fmt.Errorf("failed to walk through the domainDir %s: %w", domain, err) + } + // sort the files by name + sort.Slice(filesNamesToPurify, func(i, j int) bool { + res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[i]) + if !ok { + panic("invalid file name") + } + res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[j]) + if !ok { + panic("invalid file name") + } + return res.From < res2.From + }) + + tx, err := db.BeginRo(context.Background()) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + outD := datadir.New(purifyDir) + compressCfg := statelib.DomainCompressCfg + compressCfg.Workers = runtime.NumCPU() + // now start the file indexing + for currentLayer, fileName := range filesNamesToPurify { + count := 0 + skipped := 0 + + dec, err := seg.NewDecompressor(path.Join(dirs.SnapDomain, fileName)) + if err != nil { + return fmt.Errorf("failed to create decompressor: %w", err) + } + defer dec.Close() + getter := dec.MakeGetter() + + valuesComp, err := seg.NewCompressor(context.Background(), "Purification", path.Join(outD.SnapDomain, fileName), dirs.Tmp, compressCfg, log.LvlTrace, log.New()) + if err != nil { + return fmt.Errorf("create %s values compressor: %w", path.Join(outD.SnapDomain, fileName), err) + } + + comp := seg.NewWriter(valuesComp, compressionType) + defer comp.Close() + + fmt.Printf("Indexing file %s\n", fileName) + var ( + bufKey []byte + bufVal []byte + ) + + var layer uint32 + for getter.HasNext() { + // get the key and value for the current entry + bufKey = bufKey[:0] + bufKey, _ = getter.Next(bufKey) + bufVal = bufVal[:0] + bufVal, _ = getter.Next(bufVal) + + layerBytes, err := tx.GetOne(tbl, bufKey) + if err != nil { + return fmt.Errorf("failed to get key %x: %w", bufKey, err) + } + // if the key is not found, then the layer is 0 + layer = 0 + if len(layerBytes) == 4 { + layer = binary.BigEndian.Uint32(layerBytes) + } + if layer != uint32(currentLayer) { + skipped++ + continue + } + if err := comp.AddWord(bufKey); err != nil { + return fmt.Errorf("failed to add key %x: %w", bufKey, err) + } + if err := comp.AddWord(bufVal); err != nil { + return fmt.Errorf("failed to add val %x: %w", bufVal, err) + } + count++ + if count%100000 == 0 { + fmt.Printf("Indexed %d keys, skipped %d, in file %s\n", count, skipped, fileName) + } + } + + fmt.Printf("Loaded %d keys in file %s. now compressing...\n", count, fileName) + if err := comp.Compress(); err != nil { + return fmt.Errorf("failed to compress: %w", err) + } + fmt.Printf("Compressed %d keys in file %s\n", count, fileName) + comp.Close() + if replaceInDatadir { + fmt.Printf("Replacing the file %s in datadir\n", fileName) + if err := os.Rename(path.Join(outD.SnapDomain, fileName), path.Join(dirs.SnapDomain, fileName)); err != nil { + return fmt.Errorf("failed to replace the file %s: %w", fileName, err) + } + kveiFile := strings.ReplaceAll(fileName, ".kv", ".kvei") + btFile := strings.ReplaceAll(fileName, ".kv", ".bt") + kveiFileTorrent := kveiFile + ".torrent" + btFileTorrent := btFile + ".torrent" + // also remove the .kvei and .bt files + if err := os.Remove(path.Join(dirs.SnapDomain, kveiFile)); err != nil { + return fmt.Errorf("failed to remove the file: %s, %w", kveiFile, err) + } + if err := os.Remove(path.Join(dirs.SnapDomain, btFile)); err != nil { + return fmt.Errorf("failed to remove the file: %s, %w", btFile, err) + } + if err := os.Remove(path.Join(dirs.SnapDomain, kveiFileTorrent)); err != nil { + return fmt.Errorf("failed to remove the file: %s, %w", kveiFileTorrent, err) + } + if err := os.Remove(path.Join(dirs.SnapDomain, btFileTorrent)); err != nil { + return fmt.Errorf("failed to remove the file: %s, %w", btFileTorrent, err) + } + fmt.Printf("Removed the files %s and %s\n", kveiFile, btFile) + } + } + return nil +} + func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain string, addrs [][]byte, logger log.Logger) error { sn, bsn, agg, _, _, _ := allSnapshots(ctx, chainDb, logger) defer sn.Close()