Skip to content

Commit

Permalink
fixed some stuff after pycon :)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeyDi committed Jun 3, 2022
1 parent c93da53 commit d3f0ea1
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
4 changes: 2 additions & 2 deletions airflow/dags/jobs/check_stuff.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def check_variables(state, ti):
print(f"Data folder: {config.DATA_FOLDER}")

TESTONE = Variable.get("TESTONE")
TESTONE_NEW = Variable.set("TESTONE_NEW", state)
# TESTONE_NEW = Variable.set("TESTONE_NEW", state)
print(f"TESTONE: {TESTONE}")
print(f"TESTONE: {TESTONE_NEW}")
# print(f"TESTONE: {TESTONE_NEW}")

ti.xcom_push(key="TESTONE", value=TESTONE)
7 changes: 4 additions & 3 deletions airflow/dags/jobs/extract_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ def launch_ospedali(file_name: str, ti):
df_ospedali = rename_columns(df_ospedali)

# save the result
result = utils.save_result(df_ospedali, "ospedali_result.csv")
filename = "ospedali_result.csv"
utils.save_result(df_ospedali, filename)

ti.xcom_push(key="ospedali_result", value="ospedali_result.csv")
ti.xcom_push(key="ospedali_dataset", value=filename)

return result
# return result
14 changes: 10 additions & 4 deletions airflow/dags/jobs/import_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ def insert_db(ti):
try:
cxn = db.establish_db_connection()

ospedali_dataset = ti.xcom_pull(key="ospedali_dataset")
# ospedali_dataset = ti.xcom_pull(key="ospedali_dataset")
popolazione_dataset = ti.xcom_pull(
key="popolazione_dataset", task_ids="popolazione_extractor"
)
performance_dataset = ti.xcom_pull(key="performance_dataset")

datasets = [ospedali_dataset, popolazione_dataset, performance_dataset]
# ospedali_dataset
datasets = [popolazione_dataset, performance_dataset]

print(f"Dataset names: f{datasets}")

for dataset in datasets:
print(f"Inserting dataset: {dataset}")

# read the dataset
file_path = os.path.join(config.DATA_FOLDER, dataset)
Expand All @@ -32,7 +36,9 @@ def insert_db(ti):
# insert the dataset into the sql table
df.to_sql(dataset_name, con=cxn, if_exists="append", index=False)

cxn.dispose()
print(f"Dataset: {dataset} inserted successfully")

cxn.dispose()
print("All dataset inserted")
except Exception as message:
raise Exception(f"Impossible to insert into db: {message}")
raise Exception(f"ERROR: Impossible to insert into db: {message}")
22 changes: 12 additions & 10 deletions airflow/dags/serata_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from airflow.contrib.sensors.file_sensor import FileSensor

from airflow.dags.jobs.extract_blob import launch_blob
from airflow.dags.jobs.check_stuff import check_variables
from airflow.dags.jobs.import_db import insert_db
from airflow.dags.jobs.extract_db import extract_db

from airflow.dags.jobs.check_stuff import check_variables

FILE_PATH = "/opt/airflow/data"

default_args = {
Expand Down Expand Up @@ -40,12 +41,13 @@ def _failure_callback(context):
default_args=default_args,
)

run_variables_check = PythonOperator(
task_id="variable_check",
python_callable=check_variables,
dag=dag,
op_kwargs={"state": "state_test"},
)
# VARIABLE CHECK
# run_variables_check = PythonOperator(
# task_id="variable_check",
# python_callable=check_variables,
# dag=dag,
# op_kwargs={"state": "state_test"},
# )

run_ospedali_extractor = BashOperator(
task_id="ospedali_extractor",
Expand Down Expand Up @@ -101,29 +103,29 @@ def _failure_callback(context):
last_op = DummyOperator(task_id="last_task", dag=dag)


# >> run_variables_check
(
start_op
>> run_variables_check
>> run_ospedali_extractor
>> sensor_extract_ospedali
>> mid_op
>> save_result_db
>> last_op
)

# >> run_variables_check
(
start_op
>> run_variables_check
>> run_popolazione_extractor
>> sensor_extract_popolazione
>> mid_op
>> save_result_db
>> last_op
)

# >> run_variables_check
(
start_op
>> run_variables_check
>> run_performance_extractor
>> sensor_extract_performance
>> mid_op
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
environment:
VERBOSITY: ${VERBOSITY:-INFO}
ports:
- "8501:8501"
- "8505:8081"
# volumes:
# #persist the data of the dasbhboard
# - ${DEPLOY_VOLUMES_DIR:-/opt/streamlit}:/streamlit/data
Expand Down

0 comments on commit d3f0ea1

Please sign in to comment.