diff --git a/dags/extract_manual_actors_updates.py b/dags/extract_manual_actors_updates.py new file mode 100644 index 0000000..981a25e --- /dev/null +++ b/dags/extract_manual_actors_updates.py @@ -0,0 +1,180 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from utils.utils import load_table, normalize_nom, normalize_url, normalize_phone_number, normalize_email, \ + find_differences, save_to_database + + +def read_data_from_postgres(**kwargs): + table_name = kwargs["table_name"] + pg_hook = PostgresHook(postgres_conn_id='lvao-preprod') + engine = pg_hook.get_sqlalchemy_engine() + df = load_table(table_name, engine) + return df + + +def normalize_and_find_differences(**kwargs): + ti = kwargs['ti'] + df_act = ti.xcom_pull(task_ids='load_imported_actors') + df_rev_act = ti.xcom_pull(task_ids='load_imported_revision_actors') + + columns_to_exclude = ["identifiant_unique", "statut", "cree_le", "modifie_le"] + normalization_map = { + "nom": normalize_nom, + "nom_commercial": normalize_nom, + "ville": normalize_nom, + "url": normalize_url, + "adresse": normalize_nom, + "adresse_complement": normalize_nom, + "email": normalize_email, + "telephone": normalize_phone_number, + } + + df_differences = find_differences(df_act, df_rev_act, columns_to_exclude, normalization_map) + + return df_differences + + +def save_results_to_database(**kwargs): + df_cleaned = kwargs['ti'].xcom_pull(task_ids='normalize_and_find_differences') + pg_hook = PostgresHook(postgres_conn_id='lvao-preprod') + engine = pg_hook.get_sqlalchemy_engine() + save_to_database(df_cleaned, "lvao_manual_actors_updates", engine) + + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 2, 7), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'manual_actor_updates', + default_args=default_args, + description='DAG for manually updated LVAO actors data', + schedule_interval=None, +) + +t1 = PythonOperator( + task_id='load_imported_actors', + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_acteur"}, + dag=dag, +) + +t2 = PythonOperator( + task_id='load_imported_revision_actors', + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_revisionacteur"}, + dag=dag, +) + +t3 = PythonOperator( + task_id='normalize_and_find_differences', + python_callable=normalize_and_find_differences, + provide_context=True, + dag=dag, +) + +t4 = PythonOperator( + task_id='save_results_to_database', + python_callable=save_results_to_database, + provide_context=True, + dag=dag, +) + +[t1, t2] >> t3 >> t4 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utils/utils.py b/utils/utils.py index 0670744..962c362 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -59,7 +59,7 @@ def apply_normalization(df, normalization_map): """ # print(df['adresse']) - #df = normalize_address(df) + # df = normalize_address(df) # print(df['adresse']) for column, normalize_func in normalization_map.items(): if column in df.columns: @@ -183,3 +183,62 @@ def normalize_url(url): except Exception as e: print(f"Error parsing URL {url}: {e}") return None + + +def find_differences( + df_act, df_rev_act, columns_to_exclude, normalization_map +): + # Join the DataFrames on 'identifiant_unique' + df_merged = pd.merge( + df_act, df_rev_act, on="identifiant_unique", suffixes=("_act", "_rev_act") + ) + df_merged["code_postal_act"] = pd.to_numeric( + df_merged["code_postal_act"], errors="coerce" + ).astype(pd.Int64Dtype()) + df_merged["code_postal_rev_act"] = pd.to_numeric( + df_merged["code_postal_rev_act"], errors="coerce" + ).astype(pd.Int64Dtype()) + + suffixes = ["_act", "_rev_act"] + + # Apply the functions to the respective columns + for base_col, func in normalization_map.items(): + for suffix in suffixes: + col = base_col + suffix + if col in df_merged.columns: + df_merged[col] = df_merged[col].apply(func) + else: + print(f"Column {col} not found in DataFrame.") + # Initialize a DataFrame to hold the differences + df_differences = pd.DataFrame() + df_differences["identifiant_unique"] = df_merged["identifiant_unique"] + + columns_to_exclude = columns_to_exclude + + for col in df_act.columns: + if col not in columns_to_exclude: + # Define the column names for act and rev_act + col_act = col + "_act" + col_rev_act = col + "_rev_act" + + # Create masks for non-empty and differing values + mask_non_empty = ( + df_merged[col_rev_act].notnull() + & (df_merged[col_act] != "") + & (df_merged[col_rev_act] != "") + ) + mask_different = df_merged[col_act] != df_merged[col_rev_act] + + # Apply masks and add to df_differences + df_differences[col] = df_merged[col_rev_act].where( + mask_non_empty & mask_different, None + ) + + # Remove rows where all elements are None (no differences found) + df_differences = df_differences.dropna( + how="all", + subset=[ + col for col in df_differences.columns if col != "identifiant_unique" + ], + ) + return df_differences