Skip to content

Commit

Permalink
Add EIP-1559 fields (cli, airflow, dataflow) (#96)
Browse files Browse the repository at this point in the history
* Add fields introduced by EIP-1559

This commit is similar to the EIP-1559 pull request for ethereum-etl: blockchain-etl/ethereum-etl#256

Fields added:

 - base_fee_per_gas (block) - base fee per gas in protocol, which can move up or down each block according to a formula which is a function of gas used in parent block and gas target (block gas limit divided by elasticity multiplier) of parent block.
 - max_fee_per_gas (tx) - total fee which covers both the priority fee and the block's network fee per gas
 - max_priority_fee_per_gas (tx) - maximum fee per gas tx senders are willing to give to miners to incentivize them to include their transaction
 - transaction_type (tx) - an envelope for future transaction types
 - effective_gas_price (receipt) - a replacement for gasUsed field

* update 'miner' field in web3_response

* update existing test data for EIP-1559 fields

* add cli tests for EIP-1559 block

* Airflow changes for EIP-1559

* Dataflow changes for EIP-1559

* update balances calculation to match ethereum-etl

* bump package version

* fix effective_gas_price -> receipt_effective_gas_price

* delete unused folder `dags/resources/stages/load/`

* fix missing `pytz` dependency error

* add `block_timestamp` to raw transactions schema

* Fix/add ds suffix

Based on nansen-ai/evmchain-etl/pull/55

Add ds postfix for load and enrich tasks
Add expiration for temp raw tables
Remove unused enrichment sql
Support load_all_partitions for export files missing EIP-1559 fields

Co-authored-by: Jerry <[email protected]>
  • Loading branch information
charlielewisme and cffls authored Dec 19, 2022
1 parent b5481fa commit fabc1aa
Show file tree
Hide file tree
Showing 79 changed files with 862 additions and 620 deletions.
35 changes: 31 additions & 4 deletions airflow/dags/polygonetl_airflow/build_load_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def add_load_tasks(task, file_format, allow_quoted_newlines=False):
dag=dag
)

def load_task():
def load_task(ds, **kwargs):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
schema_path = os.path.join(dags_folder, 'resources/stages/raw/schemas/{task}.json'.format(task=task))
Expand All @@ -119,13 +119,35 @@ def load_task():
job_config.ignore_unknown_values = True

export_location_uri = 'gs://{bucket}/export'.format(bucket=output_bucket)
uri = '{export_location_uri}/{task}/*.{file_format}'.format(
export_location_uri=export_location_uri, task=task, file_format=file_format)
table_ref = client.dataset(dataset_name_raw).table(task)
if load_all_partitions:
# Support export files that are missing EIP-1559 fields (exported before EIP-1559 upgrade)
job_config.allow_jagged_rows = True

uri = "{export_location_uri}/{task}/*.{file_format}".format(
export_location_uri=export_location_uri,
task=task,
file_format=file_format,
)
table_ref = client.dataset(dataset_name_raw).table(task)
else:
uri = "{export_location_uri}/{task}/block_date={ds}/*.{file_format}".format(
export_location_uri=export_location_uri,
task=task,
ds=ds,
file_format=file_format,
)
table_name = f'{task}_{ds.replace("-", "_")}'
table_ref = client.dataset(dataset_name_raw).table(table_name)

load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
submit_bigquery_job(load_job, job_config)
assert load_job.state == 'DONE'

if not load_all_partitions:
table = client.get_table(table_ref)
table.expires = datetime.now() + timedelta(days=3)
client.update_table(table, ["expires"])

load_operator = PythonOperator(
task_id='load_{task}'.format(task=task),
python_callable=load_task,
Expand All @@ -142,6 +164,11 @@ def enrich_task(ds, **kwargs):
template_context['ds'] = ds
template_context['params'] = environment

if load_all_partitions or always_load_all_partitions:
template_context["params"]["ds_postfix"] = ""
else:
template_context["params"]["ds_postfix"] = "_" + ds.replace("-", "_")

client = bigquery.Client()

# Need to use a temporary table because bq query sets field modes to NULLABLE and descriptions to null
Expand Down
7 changes: 6 additions & 1 deletion airflow/dags/resources/stages/enrich/schemas/blocks.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@
"name": "transaction_count",
"type": "INT64",
"description": "The number of transactions in the block"
},
{
"name": "base_fee_per_gas",
"type": "INT64",
"description": "Protocol base fee per gas, which can move up or down"
}
]
]
22 changes: 21 additions & 1 deletion airflow/dags/resources/stages/enrich/schemas/transactions.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,25 @@
"type": "STRING",
"mode": "REQUIRED",
"description": "Hash of the block where this transaction was in"
},
{
"name": "max_fee_per_gas",
"type": "INT64",
"description": "Total fee that covers both base and priority fees"
},
{
"name": "max_priority_fee_per_gas",
"type": "INT64",
"description": "Fee given to miners to incentivize them to include the transaction"
},
{
"name": "transaction_type",
"type": "INT64",
"description": "Transaction type. One of 0 (Legacy), 1 (Legacy), 2 (EIP-1559)"
},
{
"name": "receipt_effective_gas_price",
"type": "INT64",
"description": "The actual value per gas deducted from the senders account. Replacement of gas_price after EIP-1559"
}
]
]
30 changes: 0 additions & 30 deletions airflow/dags/resources/stages/enrich/sqls/amended_tokens.sql

This file was deleted.

22 changes: 13 additions & 9 deletions airflow/dags/resources/stages/enrich/sqls/balances.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
with double_entry_book as (
-- debits
select to_address as address, CAST(value AS FLOAT64) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
and to_address is not null
Expand All @@ -10,27 +10,31 @@ with double_entry_book as (
union all
-- credits
select from_address as address, -CAST(value AS FLOAT64) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
and from_address is not null
and status = 1
and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
union all
-- transaction fees debits
select miner as address, sum(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions` as transactions
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks` as blocks on blocks.number = transactions.block_number
select
miner as address,
sum(cast(receipt_gas_used as numeric) * cast((receipt_effective_gas_price - coalesce(base_fee_per_gas, 0)) as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}` as transactions
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks{{params.ds_postfix}}` as blocks on blocks.number = transactions.block_number
where true
and date(transactions.block_timestamp) <= '{{ds}}'
group by blocks.miner
group by blocks.number, blocks.miner
union all
-- transaction fees credits
select from_address as address, -(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions`
select
from_address as address,
-(cast(receipt_gas_used as numeric) * cast(receipt_effective_gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
)
select address, sum(value) as eth_balance
from double_entry_book
group by address
group by address
5 changes: 3 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ SELECT
blocks.extra_data,
blocks.gas_limit,
blocks.gas_used,
blocks.transaction_count
FROM {{params.dataset_name_raw}}.blocks AS blocks
blocks.transaction_count,
blocks.base_fee_per_gas
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/contracts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.contracts AS contracts
JOIN {{params.dataset_name_raw}}.blocks AS blocks ON contracts.block_number = blocks.number
FROM {{params.dataset_name_raw}}.contracts{{params.ds_postfix}} AS contracts
JOIN {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks ON contracts.block_number = blocks.number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/resources/stages/enrich/sqls/logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.logs AS logs ON blocks.number = logs.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.logs{{params.ds_postfix}} AS logs ON blocks.number = logs.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
{% endif %}
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ insert (
extra_data,
gas_limit,
gas_used,
transaction_count
transaction_count,
base_fee_per_gas
) values (
timestamp,
number,
Expand All @@ -39,7 +40,8 @@ insert (
extra_data,
gas_limit,
gas_used,
transaction_count
transaction_count,
base_fee_per_gas
)
when not matched by source and date(timestamp) = '{{ds}}' then
delete
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ insert (
receipt_status,
block_timestamp,
block_number,
block_hash
block_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
transaction_type,
receipt_effective_gas_price
) values (
`hash`,
nonce,
Expand All @@ -37,7 +41,11 @@ insert (
receipt_status,
block_timestamp,
block_number,
block_hash
block_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
transaction_type,
receipt_effective_gas_price
)
when not matched by source and date(block_timestamp) = '{{ds}}' then
delete
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/token_transfers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.token_transfers AS token_transfers ON blocks.number = token_transfers.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.token_transfers{{params.ds_postfix}} AS token_transfers ON blocks.number = token_transfers.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/tokens.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.tokens AS tokens ON blocks.number = tokens.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.tokens{{params.ds_postfix}} AS tokens ON blocks.number = tokens.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/resources/stages/enrich/sqls/traces.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.traces AS traces ON blocks.number = traces.block_number
JOIN {{params.dataset_name_raw}}.transactions AS transactions
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.traces{{params.ds_postfix}} AS traces ON blocks.number = traces.block_number
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions
ON traces.transaction_index = transactions.transaction_index
and traces.block_number = transactions.block_number
where true
Expand Down
12 changes: 8 additions & 4 deletions airflow/dags/resources/stages/enrich/sqls/transactions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ SELECT
receipts.status AS receipt_status,
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.transactions AS transactions ON blocks.number = transactions.block_number
JOIN {{params.dataset_name_raw}}.receipts AS receipts ON transactions.hash = receipts.transaction_hash
blocks.hash AS block_hash,
transactions.max_fee_per_gas,
transactions.max_priority_fee_per_gas,
transactions.transaction_type,
receipts.effective_gas_price as receipt_effective_gas_price
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions ON blocks.number = transactions.block_number
JOIN {{params.dataset_name_raw}}.receipts{{params.ds_postfix}} AS receipts ON transactions.hash = receipts.transaction_hash
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
Loading

0 comments on commit fabc1aa

Please sign in to comment.