Skip to content

Commit

Permalink
Réorganisation du dag create_final_acteur (#1131)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok authored Dec 19, 2024
1 parent 5283c1e commit ca46c0d
Show file tree
Hide file tree
Showing 30 changed files with 1,459 additions and 1,114 deletions.
Empty file.
180 changes: 180 additions & 0 deletions dags/compute_acteurs/dags/create_final_actors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.airflow_logic import (
apply_corrections_acteur_task,
compute_parent_ps_task,
compute_ps_task,
db_data_write_task,
deduplicate_acteur_serivces_task,
deduplicate_acteur_sources_task,
deduplicate_labels_task,
merge_acteur_services_task,
merge_labels_task,
)
from utils.db_tasks import read_data_from_postgres

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 2, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

# Retry settings for reading tasks
read_retry_count = 5
read_retry_interval = timedelta(minutes=2)

dag = DAG(
dag_id="compute_carte_acteur",
dag_display_name="Rafraîchir les acteurs affichés sur la carte",
default_args=default_args,
description=(
"Ce DAG récupère les données des acteurs et des propositions de services et"
" applique les corrections. De plus, il déduplique les acteurs déclarés par"
" plusieurs sources en cumulant leur services, sources et propositions"
" services."
),
schedule=None,
)

load_acteur_task = PythonOperator(
task_id="load_acteur",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_propositionservice_task = PythonOperator(
task_id="load_propositionservice",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_propositionservice"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_revisionacteur_task = PythonOperator(
task_id="load_revisionacteur",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_revisionpropositionservice_task = PythonOperator(
task_id="load_revisionpropositionservice",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionpropositionservice"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_revisionpropositionservice_sous_categories_task = PythonOperator(
task_id="load_revisionpropositionservice_sous_categories",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionpropositionservice_sous_categories"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_propositionservice_sous_categories_task = PythonOperator(
task_id="load_propositionservice_sous_categories",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_propositionservice_sous_categories"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_acteur_labels_task = PythonOperator(
task_id="load_acteur_labels",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur_labels"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_acteur_acteur_services_task = PythonOperator(
task_id="load_acteur_acteur_services",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur_acteur_services"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_revisionacteur_labels_task = PythonOperator(
task_id="load_revisionacteur_labels",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur_labels"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

load_revisionacteur_acteur_services_task = PythonOperator(
task_id="load_revisionacteur_acteur_services",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur_acteur_services"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)


apply_corrections_acteur_task_instance = apply_corrections_acteur_task(dag)
compute_ps_task_instance = compute_ps_task(dag)
compute_parent_ps_task_instance = compute_parent_ps_task(dag)
deduplicate_acteur_serivces_task_instance = deduplicate_acteur_serivces_task(dag)
deduplicate_acteur_sources_task_instance = deduplicate_acteur_sources_task(dag)
deduplicate_labels_task_instance = deduplicate_labels_task(dag)
merge_acteur_services_task_instance = merge_acteur_services_task(dag)
merge_labels_task_instance = merge_labels_task(dag)
db_data_write_task_instance = db_data_write_task(dag)


load_acteur_task >> apply_corrections_acteur_task_instance
[
load_propositionservice_task,
load_revisionpropositionservice_task,
load_propositionservice_sous_categories_task,
load_revisionpropositionservice_sous_categories_task,
] >> compute_ps_task_instance
[
load_revisionacteur_task,
load_acteur_labels_task,
load_revisionacteur_labels_task,
] >> merge_labels_task_instance
[
load_revisionacteur_task,
load_acteur_acteur_services_task,
load_revisionacteur_acteur_services_task,
] >> merge_acteur_services_task_instance
apply_corrections_acteur_task_instance >> compute_parent_ps_task_instance
apply_corrections_acteur_task_instance >> deduplicate_acteur_sources_task_instance
(compute_ps_task_instance >> compute_parent_ps_task_instance)
(
merge_labels_task_instance
>> apply_corrections_acteur_task_instance
>> deduplicate_labels_task_instance
)
(
merge_acteur_services_task_instance
>> apply_corrections_acteur_task_instance
>> deduplicate_acteur_serivces_task_instance
)
deduplicate_acteur_sources_task_instance >> db_data_write_task_instance
compute_parent_ps_task_instance >> db_data_write_task_instance
deduplicate_labels_task_instance >> db_data_write_task_instance
deduplicate_acteur_serivces_task_instance >> db_data_write_task_instance
9 changes: 9 additions & 0 deletions dags/compute_acteurs/tasks/airflow_logic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .apply_corrections_acteur_task import * # noqa
from .compute_parent_ps_task import * # noqa
from .compute_ps_task import * # noqa
from .db_data_write_task import * # noqa
from .deduplicate_acteur_services_task import * # noqa
from .deduplicate_acteur_sources_task import * # noqa
from .deduplicate_labels_task import * # noqa
from .merge_acteur_services_task import * # noqa
from .merge_labels_task import * # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import apply_corrections_acteur
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def apply_corrections_acteur_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="apply_corrections_acteur",
python_callable=apply_corrections_acteur_wrapper,
dag=dag,
)


def apply_corrections_acteur_wrapper(**kwargs):
df_acteur = kwargs["ti"].xcom_pull(task_ids="load_acteur")
df_revisionacteur = kwargs["ti"].xcom_pull(task_ids="load_revisionacteur")

log.preview("df_acteur", df_acteur)
log.preview("df_revisionacteur", df_revisionacteur)

return apply_corrections_acteur(
df_acteur=df_acteur, df_revisionacteur=df_revisionacteur
)
40 changes: 40 additions & 0 deletions dags/compute_acteurs/tasks/airflow_logic/compute_parent_ps_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import compute_parent_ps
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def compute_parent_ps_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="deduplicate_propositionservices",
python_callable=compute_parent_ps_wrapper,
dag=dag,
)


def compute_parent_ps_wrapper(**kwargs):
df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_children"
]
dfs_ps = kwargs["ti"].xcom_pull(task_ids="compute_ps")
df_propositionservice_merged = dfs_ps["df_propositionservice_merged"]
df_propositionservice_sous_categories_merged = dfs_ps[
"df_propositionservice_sous_categories_merged"
]

log.preview("df_children", df_children)
log.preview("df_propositionservice", df_propositionservice_merged)
log.preview(
"df_propositionservice_sous_categories",
df_propositionservice_sous_categories_merged,
)

return compute_parent_ps(
df_children=df_children,
df_ps=df_propositionservice_merged,
df_ps_sscat=df_propositionservice_sous_categories_merged,
)
49 changes: 49 additions & 0 deletions dags/compute_acteurs/tasks/airflow_logic/compute_ps_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import compute_ps
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def compute_ps_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="compute_ps",
python_callable=compute_ps_wrapper,
dag=dag,
)


def compute_ps_wrapper(**kwargs):
df_propositionservice = kwargs["ti"].xcom_pull(task_ids="load_propositionservice")
df_revisionpropositionservice = kwargs["ti"].xcom_pull(
task_ids="load_revisionpropositionservice"
)
df_propositionservice_sous_categories = kwargs["ti"].xcom_pull(
task_ids="load_propositionservice_sous_categories"
)
df_revisionpropositionservice_sous_categories = kwargs["ti"].xcom_pull(
task_ids="load_revisionpropositionservice_sous_categories"
)
df_revisionacteur = kwargs["ti"].xcom_pull(task_ids="load_revisionacteur")

log.preview("df_propositionservice", df_propositionservice)
log.preview("df_revisionpropositionservice", df_revisionpropositionservice)
log.preview(
"df_propositionservice_sous_categories", df_propositionservice_sous_categories
)
log.preview(
"df_revisionpropositionservice_sous_categories",
df_revisionpropositionservice_sous_categories,
)
log.preview("df_revisionacteur", df_revisionacteur)

return compute_ps(
df_propositionservice=df_propositionservice,
df_revisionpropositionservice=df_revisionpropositionservice,
df_propositionservice_sous_categories=df_propositionservice_sous_categories,
df_revisionpropositionservice_sous_categories=df_revisionpropositionservice_sous_categories,
df_revisionacteur=df_revisionacteur,
)
53 changes: 53 additions & 0 deletions dags/compute_acteurs/tasks/airflow_logic/db_data_write_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import db_data_write
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def db_data_write_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="write_data_to_postgres",
python_callable=db_data_write_wrapper,
dag=dag,
)


def db_data_write_wrapper(**kwargs):
df_acteur_merged = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_acteur_merged"
]
df_labels_updated = kwargs["ti"].xcom_pull(task_ids="deduplicate_labels")
df_acteur_services_updated = kwargs["ti"].xcom_pull(
task_ids="deduplicate_acteur_serivces"
)
df_acteur_sources_updated = kwargs["ti"].xcom_pull(
task_ids="deduplicate_acteur_sources"
)
task_output = kwargs["ti"].xcom_pull(task_ids="deduplicate_propositionservices")
df_propositionservice_merged = task_output["df_final_ps_updated"]
df_propositionservice_sous_categories_merged = task_output[
"df_final_sous_categories"
]

log.preview("df_acteur_merged", df_acteur_merged)
log.preview("df_labels_updated", df_labels_updated)
log.preview("df_acteur_services_updated", df_acteur_services_updated)
log.preview("df_acteur_sources_updated", df_acteur_sources_updated)
log.preview("df_propositionservice_merged", df_propositionservice_merged)
log.preview(
"df_propositionservice_sous_categories_merged",
df_propositionservice_sous_categories_merged,
)

return db_data_write(
df_acteur_merged=df_acteur_merged,
df_labels_updated=df_labels_updated,
df_acteur_services_updated=df_acteur_services_updated,
df_acteur_sources_updated=df_acteur_sources_updated,
df_propositionservice_merged=df_propositionservice_merged,
df_propositionservice_sous_categories_merged=df_propositionservice_sous_categories_merged,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import deduplicate_acteur_serivces
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def deduplicate_acteur_serivces_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="deduplicate_acteur_serivces",
python_callable=deduplicate_acteur_serivces_wrapper,
dag=dag,
)


def deduplicate_acteur_serivces_wrapper(**kwargs):

df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_children"
]
df_merge_acteur_services = kwargs["ti"].xcom_pull(task_ids="merge_acteur_services")

log.preview("df_children", df_children)
log.preview("df_merged_relationship", df_merge_acteur_services)

return deduplicate_acteur_serivces(
df_children=df_children,
df_merge_acteur_services=df_merge_acteur_services,
)
Loading

0 comments on commit ca46c0d

Please sign in to comment.