Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Token Transfers #1176

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions notebooks/adhoc/token_transfers_dev.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare data reader for a given chain and date"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from op_analytics.coreutils.duckdb_inmem import init_client\n",
"from op_analytics.coreutils.partitioned.reader import DataReader\n",
"from op_analytics.coreutils.partitioned.location import DataLocation\n",
"from op_analytics.datapipeline.etl.intermediate.construct import construct_data_readers\n",
"\n",
"from op_analytics.datapipeline.models.compute.udfs import create_duckdb_macros\n",
"\n",
"\n",
"# Define the input data range.\n",
"read_batches: list[DataReader] = construct_data_readers(\n",
" chains=[\"op\"],\n",
" models=[\"token_transfers\"],\n",
" range_spec=\"@20241030:+1\",\n",
" read_from=DataLocation.GCS\n",
")\n",
"\n",
"\n",
"# Select input for one date and build the intermediate model inputs.\n",
"batch = read_batches[0]\n",
"\n",
"\n",
"duckdb_client = init_client()\n",
"create_duckdb_macros(duckdb_client)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the model\n",
"\n",
"This automatically registers the model outputs as duckdb tables."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from op_analytics.datapipeline.models.compute.testutils import execute_model_in_memory\n",
"\n",
"\n",
"execute_model_in_memory(\n",
" duckdb_client=duckdb_client,\n",
" model=\"token_transfers\",\n",
" data_reader=batch,\n",
")\n",
"\n",
"# The duckdb database will have the following:\n",
"# - input tables\n",
"# - views used by the model\n",
"# - model outputs\n",
"#\n",
"# You can use duckdb to inspect any of the above results.\n",
"duckdb_client.sql(\"SHOW TABLES\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Verify model results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"duckdb_client.sql(\"SELECT *, value_64/1e18 AS value_native FROM native_transfers_v1 LIMIT 10\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"duckdb_client.sql(\"SELECT * FROM erc20_transfers_v1 LIMIT 10\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### You can also convert the results to dataframes to inspect them in more familiar ways"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"duckdb_client.sql(\"SELECT * FROM native_transfers_v1 LIMIT 10\").pl().head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
56 changes: 56 additions & 0 deletions src/op_analytics/datapipeline/models/code/token_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import duckdb
from duckdb.typing import VARCHAR, BIGINT # Import DuckDB's type annotations

from op_analytics.datapipeline.models.compute.querybuilder import TemplatedSQLQuery
from op_analytics.datapipeline.models.compute.registry import register_model
from op_analytics.datapipeline.models.compute.types import NamedRelations
from eth_abi import decode


@register_model(
input_datasets=["ingestion/traces_v1", "ingestion/logs_v1", "ingestion/transactions_v1"],
expected_outputs=["native_transfers_v1", "erc20_transfers_v1"],
auxiliary_views=[
TemplatedSQLQuery(template_name="native_transfers", context={}),
TemplatedSQLQuery(template_name="filtered_raw_erc20_transfer_logs", context={}),
],
)
def token_transfers(duckdb_client: duckdb.DuckDBPyConnection) -> NamedRelations:
def decode_erc20_transfer(topics: str, data: str):
"""
Decode ERC-20 transfer log data into structured fields.
:param topics: JSON-like string representing log topics.
:param data: Hexadecimal string of ABI-encoded data.
:return: Tuple of (from_address, to_address, value).
"""
import json

topics = json.loads(topics) # Convert JSON string to list
if len(topics) < 3:
raise ValueError("Insufficient topics to decode transfer.")

from_address = decode(["address"], bytes.fromhex(topics[1][2:]))[0]
to_address = decode(["address"], bytes.fromhex(topics[2][2:]))[0]
value = decode(["uint256"], bytes.fromhex(data[2:]))[0]
return from_address, to_address, value

# Register the function in DuckDB
duckdb_client.create_function(
"decode_erc20_transfer",
decode_erc20_transfer,
parameters=[VARCHAR, VARCHAR], # Input types
return_type=[VARCHAR, VARCHAR, BIGINT], # Output types: 3 separate columns
)

erc20_logs_decoded = duckdb_client.view("""
SELECT
decode_erc20_transfer(topics, data).from_address AS from_address,
decode_erc20_transfer(topics, data).to_address AS to_address,
decode_erc20_transfer(topics, data).value AS value
FROM filtered_raw_erc20_transfer_logs l
""")

return {
"native_transfers_v1": duckdb_client.view("native_transfers"),
"erc20_transfers_v1": erc20_logs_decoded,
}
2 changes: 1 addition & 1 deletion src/op_analytics/datapipeline/models/compute/udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def create_duckdb_macros(duckdb_client: duckdb.DuckDBPyConnection):
--Get the method id for input data. This is the first 4 bytes, or first 10
-- string characters for binary data that is encoded as a hex string.
CREATE OR REPLACE MACRO hexstr_method_id(x)
AS substring(x,1,10)
AS substring(x,1,10);
""")


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SELECT
l.*
, t.from_address AS tx_from_address
, t.to_address AS tx_to_address
, hexstr_method_id(t.input) AS tx_method_id

FROM ingestion_logs_v1 AS l
INNER JOIN ingestion_transactions_v1 AS t
ON
l.block_number = t.block_number
AND l.chain_id = t.chain_id
AND l.transaction_hash = t.hash

WHERE
l.topic0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND t.receipt_status = 1 -- only successful transactions emit event logs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SELECT
l.*
, t.from_address AS tx_from_address
, t.to_address AS tx_to_address
, hexstr_method_id(t.input) AS tx_method_id

FROM ingestion_logs_v1 AS l
INNER JOIN ingestion_transactions_v1 AS t
ON
l.block_number = t.block_number
AND l.chain_id = t.chain_id
AND l.transaction_hash = t.hash
WHERE
l.topic0 = '{{ topic0_filter }}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
SELECT
tr.network
, tr.chain_id
, tr.chain
, tr.dt
, tr.block_timestamp
, tr.block_number
, tr.transaction_hash
, tr.transaction_index
, tr.from_address AS transfer_from_address
, tr.to_address AS transfer_to_address
, hexstr_method_id(tr.input) AS trace_method_id
, tr.value_64
, tr.value_lossless
, tr.trace_type
, tr.call_type
, tr.gas AS trace_gas_limit
, tr.gas_used AS trace_gas_used
, tr.subtraces
, tr.trace_address
, t.from_address AS tx_from_address
, t.to_address AS tx_to_address
, hexstr_method_id(t.input) AS tx_method_id

FROM ingestion_traces_v1 AS tr
INNER JOIN ingestion_transactions_v1 AS t
ON
tr.transaction_hash = t.hash
AND tr.block_number = t.block_number
AND tr.chain_id = t.chain_id
WHERE
(tr.call_type NOT IN ('delegatecall', 'callcode', 'staticcall') OR tr.call_type = '')
AND t.receipt_status = 1
AND tr.status = 1
AND tr.value_lossless != '0'
Loading