Skip to content

Commit

Permalink
add manual updates actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazelmat committed Feb 7, 2024
1 parent af2e35f commit 30ad788
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 1 deletion.
180 changes: 180 additions & 0 deletions dags/extract_manual_actors_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from utils.utils import load_table, normalize_nom, normalize_url, normalize_phone_number, normalize_email, \
find_differences, save_to_database


def read_data_from_postgres(**kwargs):
table_name = kwargs["table_name"]
pg_hook = PostgresHook(postgres_conn_id='lvao-preprod')
engine = pg_hook.get_sqlalchemy_engine()
df = load_table(table_name, engine)
return df


def normalize_and_find_differences(**kwargs):
ti = kwargs['ti']
df_act = ti.xcom_pull(task_ids='load_imported_actors')
df_rev_act = ti.xcom_pull(task_ids='load_imported_revision_actors')

columns_to_exclude = ["identifiant_unique", "statut", "cree_le", "modifie_le"]
normalization_map = {
"nom": normalize_nom,
"nom_commercial": normalize_nom,
"ville": normalize_nom,
"url": normalize_url,
"adresse": normalize_nom,
"adresse_complement": normalize_nom,
"email": normalize_email,
"telephone": normalize_phone_number,
}

df_differences = find_differences(df_act, df_rev_act, columns_to_exclude, normalization_map)

return df_differences


def save_results_to_database(**kwargs):
df_cleaned = kwargs['ti'].xcom_pull(task_ids='normalize_and_find_differences')
pg_hook = PostgresHook(postgres_conn_id='lvao-preprod')
engine = pg_hook.get_sqlalchemy_engine()
save_to_database(df_cleaned, "lvao_manual_actors_updates", engine)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 2, 7),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'manual_actor_updates',
default_args=default_args,
description='DAG for manually updated LVAO actors data',
schedule_interval=None,
)

t1 = PythonOperator(
task_id='load_imported_actors',
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur"},
dag=dag,
)

t2 = PythonOperator(
task_id='load_imported_revision_actors',
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur"},
dag=dag,
)

t3 = PythonOperator(
task_id='normalize_and_find_differences',
python_callable=normalize_and_find_differences,
provide_context=True,
dag=dag,
)

t4 = PythonOperator(
task_id='save_results_to_database',
python_callable=save_results_to_database,
provide_context=True,
dag=dag,
)

[t1, t2] >> t3 >> t4
























































































61 changes: 60 additions & 1 deletion utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def apply_normalization(df, normalization_map):
"""
# print(df['adresse'])

#df = normalize_address(df)
# df = normalize_address(df)
# print(df['adresse'])
for column, normalize_func in normalization_map.items():
if column in df.columns:
Expand Down Expand Up @@ -183,3 +183,62 @@ def normalize_url(url):
except Exception as e:
print(f"Error parsing URL {url}: {e}")
return None


def find_differences(
df_act, df_rev_act, columns_to_exclude, normalization_map
):
# Join the DataFrames on 'identifiant_unique'
df_merged = pd.merge(
df_act, df_rev_act, on="identifiant_unique", suffixes=("_act", "_rev_act")
)
df_merged["code_postal_act"] = pd.to_numeric(
df_merged["code_postal_act"], errors="coerce"
).astype(pd.Int64Dtype())
df_merged["code_postal_rev_act"] = pd.to_numeric(
df_merged["code_postal_rev_act"], errors="coerce"
).astype(pd.Int64Dtype())

suffixes = ["_act", "_rev_act"]

# Apply the functions to the respective columns
for base_col, func in normalization_map.items():
for suffix in suffixes:
col = base_col + suffix
if col in df_merged.columns:
df_merged[col] = df_merged[col].apply(func)
else:
print(f"Column {col} not found in DataFrame.")
# Initialize a DataFrame to hold the differences
df_differences = pd.DataFrame()
df_differences["identifiant_unique"] = df_merged["identifiant_unique"]

columns_to_exclude = columns_to_exclude

for col in df_act.columns:
if col not in columns_to_exclude:
# Define the column names for act and rev_act
col_act = col + "_act"
col_rev_act = col + "_rev_act"

# Create masks for non-empty and differing values
mask_non_empty = (
df_merged[col_rev_act].notnull()
& (df_merged[col_act] != "")
& (df_merged[col_rev_act] != "")
)
mask_different = df_merged[col_act] != df_merged[col_rev_act]

# Apply masks and add to df_differences
df_differences[col] = df_merged[col_rev_act].where(
mask_non_empty & mask_different, None
)

# Remove rows where all elements are None (no differences found)
df_differences = df_differences.dropna(
how="all",
subset=[
col for col in df_differences.columns if col != "identifiant_unique"
],
)
return df_differences

0 comments on commit 30ad788

Please sign in to comment.