Skip to content

Commit

Permalink
[API-3627] Do not query logs for order details on source chain during…
Browse files Browse the repository at this point in the history
… settlement (#66)

* modify query order fill event bridge function to fetch tx hash by /tx_search for order id and parse fill amount from logs

* order settler uses query order fill event to get amount out of order instead of querying for the order submitted event on the source chain

* update fulfillment handler to use new query order fill event return values
  • Loading branch information
mattac21 authored Nov 22, 2024
1 parent 7ec7732 commit ad9cc0a
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,18 @@ func (r *orderFulfillmentHandler) UpdateFulfillmentStatus(ctx context.Context, o
metrics.FromContext(ctx).IncExcessiveOrderFulfillmentLatency(order.SourceChainID, order.DestinationChainID, order.OrderStatus)
}

// if the order is already filled, set the status to filled
fillTx, filler, timestamp, err := destinationChainBridgeClient.QueryOrderFillEvent(ctx, destinationChainGatewayContractAddress, order.OrderID)
orderFillEvent, timestamp, err := destinationChainBridgeClient.QueryOrderFillEvent(ctx, destinationChainGatewayContractAddress, order.OrderID)
if err != nil {
return "", fmt.Errorf("querying for order fill event on chainID %s at contract %s for order %s: %w", order.DestinationChainID, destinationChainGatewayContractAddress, order.OrderID, err)
} else if fillTx != nil && filler != nil {
}
if orderFillEvent != nil {
// if the order is already filled, set the status to filled
metrics.FromContext(ctx).IncFillOrderStatusChange(order.SourceChainID, order.DestinationChainID, dbtypes.OrderStatusFilled)
metrics.FromContext(ctx).ObserveFillLatency(order.SourceChainID, order.DestinationChainID, dbtypes.OrderStatusFilled, time.Since(order.CreatedAt))

if _, err := r.db.SetFillTx(ctx, db.SetFillTxParams{
FillTx: sql.NullString{String: *fillTx, Valid: true},
Filler: sql.NullString{String: *filler, Valid: true},
FillTx: sql.NullString{String: orderFillEvent.TxHash, Valid: true},
Filler: sql.NullString{String: orderFillEvent.Filler, Valid: true},
SourceChainID: order.SourceChainID,
OrderID: order.OrderID,
SourceChainGatewayContractAddress: order.SourceChainGatewayContractAddress,
Expand Down
8 changes: 3 additions & 5 deletions ordersettler/ordersettler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,11 @@ func (r *OrderSettler) findNewSettlements(ctx context.Context) error {
continue
}

orderDetails, err := sourceBridgeClient.QueryOrderSubmittedEvent(ctx, sourceGatewayAddress, fill.OrderID)
orderFillEvent, _, err := bridgeClient.QueryOrderFillEvent(ctx, chain.FastTransferContractAddress, fill.OrderID)
if err != nil {
return fmt.Errorf("getting order submitted event on chain %s for order %s: %w", sourceChainID, fill.OrderID, err)
} else if orderDetails == nil {
return fmt.Errorf("could not find order submitted event on chain %s for order %s", sourceChainID, fill.OrderID)
return fmt.Errorf("querying for order fill event on destination chain at address %s for order id %s: %w", chain.FastTransferContractAddress, fill.OrderID, err)
}
profit := big.NewInt(0).Sub(orderDetails.AmountIn, orderDetails.AmountOut)
profit := new(big.Int).Sub(amount, orderFillEvent.FillAmount)

_, err = r.db.InsertOrderSettlement(ctx, db.InsertOrderSettlementParams{
SourceChainID: sourceChainID,
Expand Down
5 changes: 3 additions & 2 deletions shared/bridges/cctp/bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cctp
import (
"context"
"fmt"
"github.com/skip-mev/go-fast-solver/shared/contracts/fast_transfer_gateway"
"math/big"
"time"

"github.com/skip-mev/go-fast-solver/shared/contracts/fast_transfer_gateway"

"github.com/skip-mev/go-fast-solver/db/gen/db"
"github.com/skip-mev/go-fast-solver/ordersettler/types"
)
Expand Down Expand Up @@ -57,7 +58,7 @@ type BridgeClient interface {
InitiateBatchSettlement(ctx context.Context, batch types.SettlementBatch) (string, string, error)
IsSettlementComplete(ctx context.Context, gatewayContractAddress, orderID string) (bool, error)
OrderFillsByFiller(ctx context.Context, gatewayContractAddress, fillerAddress string) ([]Fill, error)
QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (fillTx *string, filler *string, blockTimestamp time.Time, err error)
QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (*OrderFillEvent, time.Time, error)
Balance(ctx context.Context, address, denom string) (*big.Int, error)
OrderExists(ctx context.Context, gatewayContractAddress, orderID string, blockNumber *big.Int) (exists bool, amount *big.Int, err error)
IsOrderRefunded(ctx context.Context, gatewayContractAddress, orderID string) (bool, string, error)
Expand Down
112 changes: 91 additions & 21 deletions shared/bridges/cctp/cosmos_bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/skip-mev/go-fast-solver/shared/contracts/fast_transfer_gateway"
"github.com/skip-mev/go-fast-solver/shared/txexecutor/cosmos"
"math/big"
"strconv"
"strings"
"time"

sdkgrpc "github.com/cosmos/cosmos-sdk/types/grpc"
"google.golang.org/grpc/metadata"

"github.com/skip-mev/go-fast-solver/shared/contracts/fast_transfer_gateway"
"github.com/skip-mev/go-fast-solver/shared/txexecutor/cosmos"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/skip-mev/go-fast-solver/db/gen/db"
"github.com/skip-mev/go-fast-solver/ordersettler/types"

"cosmossdk.io/math"
wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
"github.com/avast/retry-go/v4"
abcitypes "github.com/cometbft/cometbft/abci/types"
rpcclient "github.com/cometbft/cometbft/rpc/client"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
Expand Down Expand Up @@ -414,48 +416,116 @@ func (c *CosmosBridgeClient) InitiateBatchSettlement(ctx context.Context, batch
return txHash, base64.StdEncoding.EncodeToString(txBytes), nil
}

func (c *CosmosBridgeClient) QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (*string, *string, time.Time, error) {
wasmQueryClient := wasmtypes.NewQueryClient(c.grpcClient)
type OrderFillEvent struct {
Filler string
FillAmount *big.Int
TxHash string
}

// QueryOrderFillEvent gets order fill information. Note that the time
// stamp being returned is the block time that the query for the order fill
// event occurred at. This is necessary in order to determine if an order
// is timed out based on this call. If the order fill is not found on
// chain, the order fill event and error will be nil, while the timestamp
// is the ts of the block that the query for the fill occurred in. This is
// due to the fact that the node we are querying could be lagging behind
// others, and a fill has actually occurred on chain but our node has not
// caught up to the latest height, and therefore the order should not yet
// be timed out (if the time at that height is behind the timeout timestamp
// of the order).
func (c *CosmosBridgeClient) QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (*OrderFillEvent, time.Time, error) {
var header metadata.MD
resp, err := wasmQueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
resp, err := wasmtypes.NewQueryClient(c.grpcClient).SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
Address: gatewayContractAddress,
QueryData: []byte(fmt.Sprintf(`{"order_fill":{"order_id":"%s"}}`, orderID)),
}, grpc.Header(&header))
if err != nil {
if strings.Contains(err.Error(), "not found") {
blockHeight := header.Get(sdkgrpc.GRPCBlockHeightHeader)
blockHeightInt, err := strconv.ParseInt(blockHeight[0], 10, 64)
ts, err := c.blockTimeFromHeightHeader(ctx, header)
if err != nil {
return nil, nil, time.Time{}, fmt.Errorf("parsing block height: %w", err)
return nil, time.Time{}, fmt.Errorf("fetching time stamp from query header: %w", err)
}

headerResp, err := c.rpcClient.Header(ctx, &blockHeightInt)
if err != nil {
return nil, nil, time.Time{}, fmt.Errorf("fetching block header at height %d: %w", blockHeightInt, err)
}

return nil, nil, headerResp.Header.Time, nil
return nil, ts, nil
}
return nil, nil, time.Time{}, fmt.Errorf("failed to query smart contract state: %w", err)
return nil, time.Time{}, fmt.Errorf("querying for order fill of order %s at gateway %s: %w", orderID, gatewayContractAddress, err)
}

var fill struct {
Filler string `json:"filler"`
OrderID string `json:"order_id"`
}
if err := json.Unmarshal(resp.Data, &fill); err != nil {
return nil, nil, time.Time{}, fmt.Errorf("failed to unmarshal response: %w", err)
return nil, time.Time{}, fmt.Errorf("failed to unmarshal response: %w", err)
}

query := fmt.Sprintf("wasm.action='order_filled' AND wasm.order_id='%s'", orderID)
searchResult, err := c.rpcClient.TxSearch(ctx, query, false, nil, nil, "")
if err != nil {
return nil, time.Time{}, fmt.Errorf("searching for order fill tx for order %s at gateway %s: %w", orderID, gatewayContractAddress, err)
}
if searchResult.TotalCount != 1 {
return nil, time.Time{}, fmt.Errorf("expected only 1 tx to be returned from search for order filled events with order id %s at gateway %s, but instead got %d", orderID, gatewayContractAddress, searchResult.TotalCount)
}
tx := searchResult.Txs[0]

fillAmount, err := parseAmountFromFillTx(tx.TxResult, fill.Filler, gatewayContractAddress)
if err != nil {
return nil, time.Time{}, fmt.Errorf("parsing fill amount from fill tx with hash %s: %w", tx.Hash.String(), err)
}

ts, err := c.blockTimeFromHeightHeader(ctx, header)
if err != nil {
return nil, time.Time{}, fmt.Errorf("fetching time stamp from query header: %w", err)
}

return &OrderFillEvent{Filler: fill.Filler, FillAmount: fillAmount, TxHash: tx.Hash.String()}, ts, nil
}

func (c *CosmosBridgeClient) blockTimeFromHeightHeader(ctx context.Context, header metadata.MD) (time.Time, error) {
blockHeight := header.Get(sdkgrpc.GRPCBlockHeightHeader)
blockHeightInt, err := strconv.ParseInt(blockHeight[0], 10, 64)
if err != nil {
return nil, nil, time.Time{}, fmt.Errorf("parsing block height: %w", err)
return time.Time{}, fmt.Errorf("parsing block height: %w", err)
}

headerResp, err := c.rpcClient.Header(ctx, &blockHeightInt)
if err != nil {
return nil, nil, time.Time{}, fmt.Errorf("fetching block header at height %d: %w", blockHeightInt, err)
return time.Time{}, fmt.Errorf("fetching block header at height %d: %w", blockHeightInt, err)
}

return headerResp.Header.Time, nil
}

func parseAmountFromFillTx(tx abcitypes.ExecTxResult, filler string, gatewayContractAddress string) (*big.Int, error) {
containsKV := func(event abcitypes.Event, key, value string) bool {
for _, attribute := range event.GetAttributes() {
if attribute.GetKey() == key && attribute.GetValue() == value {
return true
}
}
return false
}

for _, event := range tx.GetEvents() {
if event.GetType() != "transfer" {
continue
}

if containsKV(event, "recipient", gatewayContractAddress) && containsKV(event, "sender", filler) {
for _, attribute := range event.GetAttributes() {
if attribute.GetKey() == "amount" {
fillAmount, err := sdk.ParseCoinNormalized(attribute.GetValue())
if err != nil {
return nil, fmt.Errorf("parsing amount string %s to coin: %w", attribute.GetValue(), err)
}
return fillAmount.Amount.BigInt(), nil
}
}
return nil, fmt.Errorf("found event with correct recipient and sender but no amount transferred")
}
}
return &[]string{"txhash"}[0], &fill.Filler, headerResp.Header.Time, nil // TODO query for the actual txhash once the event is implemented

return nil, fmt.Errorf("could not find transfer event where recipient is %s and sender is %s", gatewayContractAddress, filler)
}

func (c *CosmosBridgeClient) IsOrderRefunded(ctx context.Context, gatewayContractAddress, orderID string) (bool, string, error) {
Expand Down
4 changes: 2 additions & 2 deletions shared/bridges/cctp/evm_bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (c *EVMBridgeClient) IsOrderRefunded(ctx context.Context, gatewayContractAd
return false, "", nil
}

func (c *EVMBridgeClient) QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (*string, *string, time.Time, error) {
return nil, nil, time.Time{}, errors.New("not implemented")
func (c *EVMBridgeClient) QueryOrderFillEvent(ctx context.Context, gatewayContractAddress, orderID string) (*OrderFillEvent, time.Time, error) {
return nil, time.Time{}, errors.New("not implemented")
}

func (c *EVMBridgeClient) ShouldRetryTx(ctx context.Context, txHash string, submitTime pgtype.Timestamp, txExpirationHeight *uint64) (bool, error) {
Expand Down

0 comments on commit ad9cc0a

Please sign in to comment.