Skip to content

Commit

Permalink
(BIDS-2330) misc: add command fix-blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
guybrush committed Nov 27, 2023
1 parent 2bf7611 commit 0a510bb
Showing 1 changed file with 202 additions and 4 deletions.
206 changes: 202 additions & 4 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"eth2-exporter/db"
"eth2-exporter/exporter"
"eth2-exporter/rpc"
Expand All @@ -15,8 +16,10 @@ import (
"math"
"math/big"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/coocood/freecache"
Expand Down Expand Up @@ -53,7 +56,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, fix-exec-transactions-count, fix-blocks")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand All @@ -71,7 +74,7 @@ func main() {
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})")
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
flag.BoolVar(&opts.DryRun, "dry-run", true, "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()

Expand All @@ -81,8 +84,6 @@ func main() {
return
}

opts.DryRun = *dryRun != "false"

logrus.WithField("config", *configPath).WithField("version", version.Version).Printf("starting")
cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
Expand Down Expand Up @@ -362,6 +363,8 @@ func main() {
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
case "fix-exec-transactions-count":
err = fixExecTransactionsCount()
case "fix-blocks":
err = fixBlocks()
default:
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand All @@ -373,6 +376,201 @@ func main() {
}
}

func fixBlocks() error {
logrus.WithFields(logrus.Fields{"startBlock": opts.StartBlock, "endBlock": opts.EndBlock, "dry": opts.DryRun}).Infof("running command fixBlocks")

type ClEthV1ConfigSpecResponse struct {
Data map[string]string `json:"data"`
}

type ClEthV2BeaconBlocksBlockResponse struct {
Version string `json:"version"`
ExecutionOptimistic bool `json:"execution_optimistic"`
Finalized bool `json:"finalized"`
Data struct {
Message struct {
Slot string `json:"slot"`
}
}
}

spec := &ClEthV1ConfigSpecResponse{}
err := utils.HttpReq(context.Background(), http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/config/spec", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), nil, spec)
if err != nil {
logrus.Fatal(err)
}
nodeDepositNetworkId, ok := spec.Data["DEPOSIT_NETWORK_ID"]
if !ok {
logrus.Fatal(fmt.Errorf("missing DEPOSIT_NETWORK_ID in spec from node"))
}
if fmt.Sprintf("%d", utils.Config.Chain.ClConfig.DepositNetworkID) != nodeDepositNetworkId {
logrus.Fatal(fmt.Errorf("config.DepositNetworkId != node.DepositNetworkId: %v != %v", utils.Config.Chain.ClConfig.DepositNetworkID, nodeDepositNetworkId))
}

start := opts.StartBlock
end := opts.EndBlock
if end == 0 {
end = math.MaxUint64
}

for i := start; i < end; i += 1000 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res := &ClEthV2BeaconBlocksBlockResponse{}
err := utils.HttpReq(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/beacon/blocks/finalized", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), nil, res)
if err != nil {
return fmt.Errorf("error getting finalized block: %w", err)
}

finalizedSlot, err := strconv.ParseUint(res.Data.Message.Slot, 10, 64)

Check failure on line 425 in cmd/misc/main.go

View workflow job for this annotation

GitHub Actions / Run CI (ubuntu-latest, 1.20.x)

this value of err is never used (SA4006)

iStart := i
iEnd := i + 1000
if iEnd > finalizedSlot+1 {
iEnd = finalizedSlot + 1
}

err = fixBlocksInRange(int(iStart), int(iEnd))
if err != nil {
return err
}

if iEnd > finalizedSlot {
return nil
}
}
return nil
}

func fixBlocksInRange(start, end int) error {
type ClEthV2BeaconBlocksBlockRootResponse struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
Finalized bool `json:"finalized"`
Data struct {
Root string `json:"root"`
} `json:"data"`
}

logrus.Infof("checking blocks [%v-%v)", start, end)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
defer cancel()

var err error
type DbBlock struct {
Slot uint64 `db:"slot"`
ExecBlockNumber uint64 `db:"exec_block_number"`
BlockRoot []byte `db:"blockroot"`
}
type Block struct {
Slot uint64
DbBlocks []*DbBlock
ClBlockRootReponse *ClEthV2BeaconBlocksBlockRootResponse
}

dbBlocks := []*DbBlock{}
err = db.WriterDb.Select(&dbBlocks, `SELECT slot, coalesce(exec_block_number,0) exec_block_number, blockroot FROM blocks WHERE status = '1' AND slot >= $1 AND slot < $2`, start, end)
if err != nil {
return fmt.Errorf("error getting dbBlocks: %w", err)
}

blocksBySlotMu := sync.Mutex{}
blocksBySlot := map[uint64]*Block{}

for _, dbBlock := range dbBlocks {
block, exists := blocksBySlot[dbBlock.Slot]
if !exists {
block = &Block{Slot: dbBlock.Slot}
blocksBySlot[dbBlock.Slot] = block
}
block.DbBlocks = append(block.DbBlocks, dbBlock)
}

g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(4)

for slot := start; slot < end; slot++ {
slot := slot
g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}
res := &ClEthV2BeaconBlocksBlockRootResponse{}
ctx, cancel := context.WithTimeout(gCtx, time.Second*10)
defer cancel()
err = utils.HttpReq(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/beacon/blocks/%d/root", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port, slot), nil, res)
if err != nil {
var httpErr *utils.HttpReqHttpError
if errors.As(err, &httpErr) && httpErr.StatusCode == 404 {
// no sidecar for this slot
return nil
}
return err
}
blocksBySlotMu.Lock()
block, exists := blocksBySlot[uint64(slot)]
if !exists {
block = &Block{Slot: uint64(slot)}
blocksBySlot[uint64(slot)] = block
}
block.ClBlockRootReponse = res
blocksBySlotMu.Unlock()
return nil
})
}

err = g.Wait()
if err != nil {
return err
}

blocksArr := []*Block{}
for _, block := range blocksBySlot {
blocksArr = append(blocksArr, block)
}
sort.Slice(blocksArr, func(i, j int) bool { return blocksArr[i].Slot < blocksArr[j].Slot })

for _, block := range blocksArr {
if block.ClBlockRootReponse == nil {
if opts.DryRun {
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "dbBlocks": len(block.DbBlocks)}).Warnf("wrong blocks.status (not updateing because dry)")
} else {
for _, dbBlock := range block.DbBlocks {
_, err = db.WriterDb.Exec(`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2`, dbBlock.Slot, dbBlock.BlockRoot)
if err != nil {
return fmt.Errorf("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w", block.Slot, dbBlock.BlockRoot, err)
}
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "blockroot": fmt.Sprintf("%#x", dbBlock.BlockRoot)}).Warnf("updated blocks.status to 3, because no block from cl")
}
}
}

if len(block.DbBlocks) == 0 {
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot)}).Warnf("missing block in db")
}

if len(block.DbBlocks) > 0 && block.ClBlockRootReponse != nil {
for _, dbBlock := range block.DbBlocks {
if block.ClBlockRootReponse.Data.Root != fmt.Sprintf("%#x", dbBlock.BlockRoot) {
if opts.DryRun {
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot)}).Warnf("wrong blocks.status (not updateing because dry), missmatching blockroot at slot: %v: %s != %#x", block.Slot, block.ClBlockRootReponse.Data.Root, dbBlock.BlockRoot)
} else {
_, err = db.WriterDb.Exec(`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2`, dbBlock.Slot, dbBlock.BlockRoot)
if err != nil {
return fmt.Errorf("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w", block.Slot, dbBlock.BlockRoot, err)
}
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "blockroot": fmt.Sprintf("%#x", dbBlock.BlockRoot)}).Warnf("updated blocks.status to 3, because missmatching blockroot at slot: %v: %s != %#x", block.Slot, block.ClBlockRootReponse.Data.Root, dbBlock.BlockRoot)
}
}
}
}
}

return nil
}

func fixExecTransactionsCount() error {
startBlockNumber := uint64(opts.StartBlock)
endBlockNumber := uint64(opts.EndBlock)
Expand Down

0 comments on commit 0a510bb

Please sign in to comment.