From d3f0ea1873afe34a83b47df53bba321e0f85cb2f Mon Sep 17 00:00:00 2001 From: Andrea Guzzo Date: Fri, 3 Jun 2022 11:54:47 +0200 Subject: [PATCH] fixed some stuff after pycon :) --- airflow/dags/jobs/check_stuff.py | 4 ++-- airflow/dags/jobs/extract_csv.py | 7 ++++--- airflow/dags/jobs/import_db.py | 14 ++++++++++---- airflow/dags/serata_2.py | 22 ++++++++++++---------- docker-compose.app.yml | 2 +- 5 files changed, 29 insertions(+), 20 deletions(-) diff --git a/airflow/dags/jobs/check_stuff.py b/airflow/dags/jobs/check_stuff.py index a0d0716..a9bf16e 100644 --- a/airflow/dags/jobs/check_stuff.py +++ b/airflow/dags/jobs/check_stuff.py @@ -9,8 +9,8 @@ def check_variables(state, ti): print(f"Data folder: {config.DATA_FOLDER}") TESTONE = Variable.get("TESTONE") - TESTONE_NEW = Variable.set("TESTONE_NEW", state) + # TESTONE_NEW = Variable.set("TESTONE_NEW", state) print(f"TESTONE: {TESTONE}") - print(f"TESTONE: {TESTONE_NEW}") + # print(f"TESTONE: {TESTONE_NEW}") ti.xcom_push(key="TESTONE", value=TESTONE) diff --git a/airflow/dags/jobs/extract_csv.py b/airflow/dags/jobs/extract_csv.py index e1d5b6b..38368f6 100644 --- a/airflow/dags/jobs/extract_csv.py +++ b/airflow/dags/jobs/extract_csv.py @@ -92,8 +92,9 @@ def launch_ospedali(file_name: str, ti): df_ospedali = rename_columns(df_ospedali) # save the result - result = utils.save_result(df_ospedali, "ospedali_result.csv") + filename = "ospedali_result.csv" + utils.save_result(df_ospedali, filename) - ti.xcom_push(key="ospedali_result", value="ospedali_result.csv") + ti.xcom_push(key="ospedali_dataset", value=filename) - return result + # return result diff --git a/airflow/dags/jobs/import_db.py b/airflow/dags/jobs/import_db.py index 2d99099..4ef76a0 100644 --- a/airflow/dags/jobs/import_db.py +++ b/airflow/dags/jobs/import_db.py @@ -10,15 +10,19 @@ def insert_db(ti): try: cxn = db.establish_db_connection() - ospedali_dataset = ti.xcom_pull(key="ospedali_dataset") + # ospedali_dataset = ti.xcom_pull(key="ospedali_dataset") popolazione_dataset = ti.xcom_pull( key="popolazione_dataset", task_ids="popolazione_extractor" ) performance_dataset = ti.xcom_pull(key="performance_dataset") - datasets = [ospedali_dataset, popolazione_dataset, performance_dataset] + # ospedali_dataset + datasets = [popolazione_dataset, performance_dataset] + + print(f"Dataset names: f{datasets}") for dataset in datasets: + print(f"Inserting dataset: {dataset}") # read the dataset file_path = os.path.join(config.DATA_FOLDER, dataset) @@ -32,7 +36,9 @@ def insert_db(ti): # insert the dataset into the sql table df.to_sql(dataset_name, con=cxn, if_exists="append", index=False) - cxn.dispose() + print(f"Dataset: {dataset} inserted successfully") + cxn.dispose() + print("All dataset inserted") except Exception as message: - raise Exception(f"Impossible to insert into db: {message}") + raise Exception(f"ERROR: Impossible to insert into db: {message}") diff --git a/airflow/dags/serata_2.py b/airflow/dags/serata_2.py index 5b4bb49..a0e8cec 100644 --- a/airflow/dags/serata_2.py +++ b/airflow/dags/serata_2.py @@ -7,10 +7,11 @@ from airflow.contrib.sensors.file_sensor import FileSensor from airflow.dags.jobs.extract_blob import launch_blob -from airflow.dags.jobs.check_stuff import check_variables from airflow.dags.jobs.import_db import insert_db from airflow.dags.jobs.extract_db import extract_db +from airflow.dags.jobs.check_stuff import check_variables + FILE_PATH = "/opt/airflow/data" default_args = { @@ -40,12 +41,13 @@ def _failure_callback(context): default_args=default_args, ) -run_variables_check = PythonOperator( - task_id="variable_check", - python_callable=check_variables, - dag=dag, - op_kwargs={"state": "state_test"}, -) +# VARIABLE CHECK +# run_variables_check = PythonOperator( +# task_id="variable_check", +# python_callable=check_variables, +# dag=dag, +# op_kwargs={"state": "state_test"}, +# ) run_ospedali_extractor = BashOperator( task_id="ospedali_extractor", @@ -101,9 +103,9 @@ def _failure_callback(context): last_op = DummyOperator(task_id="last_task", dag=dag) +# >> run_variables_check ( start_op - >> run_variables_check >> run_ospedali_extractor >> sensor_extract_ospedali >> mid_op @@ -111,9 +113,9 @@ def _failure_callback(context): >> last_op ) +# >> run_variables_check ( start_op - >> run_variables_check >> run_popolazione_extractor >> sensor_extract_popolazione >> mid_op @@ -121,9 +123,9 @@ def _failure_callback(context): >> last_op ) +# >> run_variables_check ( start_op - >> run_variables_check >> run_performance_extractor >> sensor_extract_performance >> mid_op diff --git a/docker-compose.app.yml b/docker-compose.app.yml index 033d5b4..dce0577 100644 --- a/docker-compose.app.yml +++ b/docker-compose.app.yml @@ -9,7 +9,7 @@ services: environment: VERBOSITY: ${VERBOSITY:-INFO} ports: - - "8501:8501" + - "8505:8081" # volumes: # #persist the data of the dasbhboard # - ${DEPLOY_VOLUMES_DIR:-/opt/streamlit}:/streamlit/data