From 88e1e078e29a6504b2fc33afaa1aaffdf787f359 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Fri, 29 Mar 2024 15:52:00 +0100 Subject: [PATCH 1/6] allow to download dags from s3 --- .gitignore | 4 +- README.md | 13 +- .../extract_manual_actors_updates.py | 0 {dags => dags.backup}/preprocess_actors.py | 0 .../preprod_create_final_actors.py | 0 .../production_create_final_actors.py | 0 dags/.airflowignore | 0 dags/download_dags.py | 61 ++++ development/create_final_actors.py | 292 ++++++++++++++++++ development/download_dags.py | 1 + {dags => development}/utils/__init__.py | 0 {dags => development}/utils/utils.py | 56 +++- docker-compose.yaml | 2 +- requirements.txt | 1 - 14 files changed, 409 insertions(+), 21 deletions(-) rename {dags => dags.backup}/extract_manual_actors_updates.py (100%) rename {dags => dags.backup}/preprocess_actors.py (100%) rename {dags => dags.backup}/preprod_create_final_actors.py (100%) rename {dags => dags.backup}/production_create_final_actors.py (100%) delete mode 100644 dags/.airflowignore create mode 100644 dags/download_dags.py create mode 100755 development/create_final_actors.py create mode 120000 development/download_dags.py rename {dags => development}/utils/__init__.py (100%) rename {dags => development}/utils/utils.py (85%) diff --git a/.gitignore b/.gitignore index 0ed0ee6..bdd58e4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ airflow_settings.yaml __pycache__/ astro -logs \ No newline at end of file +logs +dags/preprod/ +dags/production/ \ No newline at end of file diff --git a/README.md b/README.md index 5d00078..0e98951 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,18 @@ AIRFLOW__LOGGING__ENCRYPT_S3_LOGS=false Attention à ajouter le paramètre enfpoint_url pour le stockage Cellar de CleverCloud +## déploiement des dags en preprod et en prod + +les dags sont déployés sur un bucket s3, dans un dossier au nom de l'environnement sur clevercloud : + +- s3://qfdmo-dags/preprod +- s3://qfdmo-dags/production + +Airflow est déployé avecun seul DAG `doswnload_dags_from_s3` qui télécharge les dags de preprod et de production à partir des repo s3. + ## Reste à faire -- [ ] Aujourd'hui, on a 1 seule buket de log pour tout les environnement +- [ ] Aujourd'hui, on a 1 seule bucket de log pour tout les environnements - [ ] Strategie pour publier des dag de preprod et de prod en les identifiant et en permettant des config différentes +- [ ] Déployer les dags sur le s3 de preprod quand on pousse le code dans la branche main +- [ ] Déployer les dags sur le s3 de production quand on tag le repo avec un tags de release (format vx.y.z) diff --git a/dags/extract_manual_actors_updates.py b/dags.backup/extract_manual_actors_updates.py similarity index 100% rename from dags/extract_manual_actors_updates.py rename to dags.backup/extract_manual_actors_updates.py diff --git a/dags/preprocess_actors.py b/dags.backup/preprocess_actors.py similarity index 100% rename from dags/preprocess_actors.py rename to dags.backup/preprocess_actors.py diff --git a/dags/preprod_create_final_actors.py b/dags.backup/preprod_create_final_actors.py similarity index 100% rename from dags/preprod_create_final_actors.py rename to dags.backup/preprod_create_final_actors.py diff --git a/dags/production_create_final_actors.py b/dags.backup/production_create_final_actors.py similarity index 100% rename from dags/production_create_final_actors.py rename to dags.backup/production_create_final_actors.py diff --git a/dags/.airflowignore b/dags/.airflowignore deleted file mode 100644 index e69de29..0000000 diff --git a/dags/download_dags.py b/dags/download_dags.py new file mode 100644 index 0000000..7d6de32 --- /dev/null +++ b/dags/download_dags.py @@ -0,0 +1,61 @@ +import logging +from datetime import datetime, timedelta +from pathlib import Path + +import airflow.configuration as conf +from airflow import DAG +from airflow.models import DagBag +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +dags_dirs = ["preprod", "production"] + + +def download_dags_from_s3(): + s3_hook = S3Hook(aws_conn_id="s3dags") + bucket_name = "qfdmo-airflow-dags" + keys = s3_hook.list_keys(bucket_name) + for key in keys: + dags_folder = conf.get("core", "dags_folder") + logging.warning(f"Downloading {key} from S3 to {dags_folder}") + file_path = Path(dags_folder, key) + file_path.unlink(missing_ok=True) + parent_folder = file_path.parent + parent_folder.mkdir(parents=True, exist_ok=True) + s3_hook.download_file( + key, + bucket_name=bucket_name, + local_path=parent_folder, + preserve_file_name=True, + use_autogenerated_subdir=False, + ) + for subdir in dags_dirs: + dag_bag = DagBag(Path(dags_folder, subdir)) + if dag_bag: + for dag_id, dag in dag_bag.dags.items(): + globals()[subdir + "_" + dag_id] = dag + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2022, 1, 1), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +with DAG( + "download_dags_from_s3", + default_args=default_args, + description="DAG to download dags from S3", + schedule_interval=timedelta(days=1), + catchup=False, +) as dag: + + download_dags = PythonOperator( + task_id="download_dags_from_s3", python_callable=download_dags_from_s3, dag=dag + ) + + download_dags diff --git a/development/create_final_actors.py b/development/create_final_actors.py new file mode 100755 index 0000000..14d84c9 --- /dev/null +++ b/development/create_final_actors.py @@ -0,0 +1,292 @@ +from datetime import datetime, timedelta +from importlib import import_module +from pathlib import Path + +import pandas as pd +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook + +env = Path(__file__).parent.name +utils = import_module(f"{env}.utils.utils") + + +def read_data_from_postgres(**kwargs): + table_name = kwargs["table_name"] + pg_hook = PostgresHook(postgres_conn_id=utils.get_db_conn_id(__file__)) + engine = pg_hook.get_sqlalchemy_engine() + df = utils.load_table(table_name, engine) + return df + + +def apply_corrections(**kwargs): + df_normalized_actors = kwargs["ti"].xcom_pull(task_ids="load_actors") + df_manual_actor_updates = kwargs["ti"].xcom_pull(task_ids="load_revision_actors") + + df_normalized_actors = df_normalized_actors.set_index("identifiant_unique") + df_manual_actor_updates = df_manual_actor_updates.set_index("identifiant_unique") + + df_normalized_actors.update(df_manual_actor_updates) + + return df_normalized_actors.reset_index() + + +def apply_corrections_ps(**kwargs): + df_propositionservice = kwargs["ti"].xcom_pull(task_ids="load_propositionservice") + df_manual_propositionservice_updates = kwargs["ti"].xcom_pull( + task_ids="load_revision_propositionservice" + ) + df_manual_propositionservice_updates = df_manual_propositionservice_updates.rename( + columns={"revision_acteur_id": "acteur_id"} + ) + df_ps_sous_categories = kwargs["ti"].xcom_pull(task_ids="load_ps_sous_categories") + df_manual_propositionservice_sous_categories_updates = kwargs["ti"].xcom_pull( + task_ids="load_revision_ps_sous_categories" + ) + df_manual_propositionservice_sous_categories_updates = ( + df_manual_propositionservice_sous_categories_updates.rename( + columns={"revisionpropositionservice_id": "propositionservice_id"} + ) + ) + + common_acteur_ids = df_propositionservice[ + df_propositionservice["acteur_id"].isin( + df_manual_propositionservice_updates["acteur_id"] + ) + ]["acteur_id"].unique() + + df_ps_updated = pd.concat( + [ + df_propositionservice[ + ~df_propositionservice["acteur_id"].isin(common_acteur_ids) + ], + df_manual_propositionservice_updates, + ], + ignore_index=True, + ) + rps_ids = df_manual_propositionservice_updates["id"].unique() + only_ps_ids = df_propositionservice[ + ~df_propositionservice["acteur_id"].isin(common_acteur_ids) + ]["id"].unique() + + matching_rpssc_rows = df_manual_propositionservice_sous_categories_updates[ + df_manual_propositionservice_sous_categories_updates[ + "propositionservice_id" + ].isin(rps_ids) + ] + matching_pssc_rows = df_ps_sous_categories[ + df_ps_sous_categories["propositionservice_id"].isin(only_ps_ids) + ] + df_sous_categories_updated = pd.concat( + [matching_rpssc_rows, matching_pssc_rows], ignore_index=True + ) + + return { + "df_ps_updated": df_ps_updated, + "df_sous_categories_updated": df_sous_categories_updated, + } + + +def write_data_to_postgres(**kwargs): + df_normalized_corrected_actors = kwargs["ti"].xcom_pull( + task_ids="apply_corrections_actors" + ) + task_output = kwargs["ti"].xcom_pull( + task_ids="apply_corrections_propositionservice" + ) + df_ps_updated = task_output["df_ps_updated"] + df_sous_categories_updated = task_output["df_sous_categories_updated"] + df_sous_categories_updated.rename( + columns={"propositionservice_id": "displayedpropositionservice_id"}, + inplace=True, + ) + + pg_hook = PostgresHook(postgres_conn_id=utils.get_db_conn_id(__file__)) + engine = pg_hook.get_sqlalchemy_engine() + + original_table_name_actor = "qfdmo_displayedacteur" + temp_table_name_actor = "qfdmo_displayedacteurtemp" + + original_table_name_ps = "qfdmo_displayedpropositionservice" + temp_table_name_ps = "qfdmo_displayedpropositionservicetemp" + + original_table_name_pssc = "qfdmo_displayedpropositionservice_sous_categories" + temp_table_name_pssc = "qfdmo_displayedpropositionservicetemp_sous_categories" + + with engine.connect() as conn: + conn.execute(f"DELETE FROM {temp_table_name_pssc}") + conn.execute(f"DELETE FROM {temp_table_name_ps}") + conn.execute(f"DELETE FROM {temp_table_name_actor}") + + df_normalized_corrected_actors[ + [ + "identifiant_unique", + "nom", + "adresse", + "adresse_complement", + "code_postal", + "ville", + "url", + "email", + "location", + "telephone", + "nom_commercial", + "nom_officiel", + "label_reparacteur", + "siret", + "identifiant_externe", + "acteur_type_id", + "statut", + "source_id", + "cree_le", + "modifie_le", + "naf_principal", + "commentaires", + "horaires", + "description", + ] + ].to_sql( + temp_table_name_actor, + engine, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_ps_updated[["id", "acteur_service_id", "action_id", "acteur_id"]].to_sql( + temp_table_name_ps, + engine, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_sous_categories_updated[ + ["displayedpropositionservice_id", "souscategorieobjet_id"] + ].to_sql( + temp_table_name_pssc, + engine, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + conn.execute( + f"ALTER TABLE {original_table_name_actor} RENAME TO {original_table_name_actor}_old" + ) + conn.execute( + f"ALTER TABLE {temp_table_name_actor} RENAME TO {original_table_name_actor}" + ) + conn.execute( + f"ALTER TABLE {original_table_name_actor}_old RENAME TO {temp_table_name_actor}" + ) + + conn.execute( + f"ALTER TABLE {original_table_name_ps} RENAME TO {original_table_name_ps}_old" + ) + conn.execute( + f"ALTER TABLE {temp_table_name_ps} RENAME TO {original_table_name_ps}" + ) + conn.execute( + f"ALTER TABLE {original_table_name_ps}_old RENAME TO {temp_table_name_ps}" + ) + + conn.execute( + f"ALTER TABLE {original_table_name_pssc} RENAME TO {original_table_name_pssc}_old" + ) + conn.execute( + f"ALTER TABLE {temp_table_name_pssc} RENAME TO {original_table_name_pssc}" + ) + conn.execute( + f"ALTER TABLE {original_table_name_pssc}_old RENAME TO {temp_table_name_pssc}" + ) + + print("Table swap completed successfully.") + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 2, 7), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +dag = DAG( + utils.get_dag_name(__file__, "apply_adresse_corrections"), + default_args=default_args, + description="DAG for applying correction on normalized actors and propositionservice", + schedule_interval=None, +) + +read_actors = PythonOperator( + task_id="load_actors", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_acteur"}, + dag=dag, +) + +read_ps = PythonOperator( + task_id="load_propositionservice", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_propositionservice"}, + dag=dag, +) + +read_revision_actor = PythonOperator( + task_id="load_revision_actors", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_revisionacteur"}, + dag=dag, +) + +read_revision_ps = PythonOperator( + task_id="load_revision_propositionservice", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_revisionpropositionservice"}, + dag=dag, +) + +read_revision_sc = PythonOperator( + task_id="load_revision_ps_sous_categories", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_revisionpropositionservice_sous_categories"}, + dag=dag, +) + +read_sc = PythonOperator( + task_id="load_ps_sous_categories", + python_callable=read_data_from_postgres, + op_kwargs={"table_name": "qfdmo_propositionservice_sous_categories"}, + dag=dag, +) + +apply_corr = PythonOperator( + task_id="apply_corrections_actors", + python_callable=apply_corrections, + provide_context=True, + dag=dag, +) + +apply_corr_ps = PythonOperator( + task_id="apply_corrections_propositionservice", + python_callable=apply_corrections_ps, + provide_context=True, + dag=dag, +) + +write_pos = PythonOperator( + task_id="write_data_to_postgres", + python_callable=write_data_to_postgres, + provide_context=True, + dag=dag, +) + +[read_actors, read_revision_actor] >> apply_corr +[read_ps, read_revision_ps, read_sc, read_revision_sc] >> apply_corr_ps +[apply_corr, apply_corr_ps] >> write_pos diff --git a/development/download_dags.py b/development/download_dags.py new file mode 120000 index 0000000..bd85439 --- /dev/null +++ b/development/download_dags.py @@ -0,0 +1 @@ +dags/download_dags.py \ No newline at end of file diff --git a/dags/utils/__init__.py b/development/utils/__init__.py similarity index 100% rename from dags/utils/__init__.py rename to development/utils/__init__.py diff --git a/dags/utils/utils.py b/development/utils/utils.py similarity index 85% rename from dags/utils/utils.py rename to development/utils/utils.py index 3ae1ac2..e90c6c9 100644 --- a/dags/utils/utils.py +++ b/development/utils/utils.py @@ -1,10 +1,11 @@ -import io import csv +import io +import re +from pathlib import Path from urllib.parse import urlparse -import requests import pandas as pd -import re +import requests def send_batch_to_api(batch): @@ -13,7 +14,9 @@ def send_batch_to_api(batch): """ output = io.StringIO() writer = csv.writer(output) - writer.writerow(["identifiant_unique", "adresse", "adresse_complement", "code_postal", "ville"]) + writer.writerow( + ["identifiant_unique", "adresse", "adresse_complement", "code_postal", "ville"] + ) for element in batch: row = element.values() writer.writerow(row) @@ -21,7 +24,10 @@ def send_batch_to_api(batch): response = requests.post( "https://api-adresse.data.gouv.fr/search/csv/", files={"data": output.getvalue()}, - data={"columns": ["adresse", "ville", "adresse_complement"], "postcode": "code_postal"}, + data={ + "columns": ["adresse", "ville", "adresse_complement"], + "postcode": "code_postal", + }, ) reader = csv.DictReader(io.StringIO(response.text)) return [row for row in reader] @@ -81,9 +87,17 @@ def normalize_address(df, batch_size=10000): end_idx = start_idx + batch_size # Extract relevant columns for address normalization - address_data = df.loc[start_idx:end_idx, - ["identifiant_unique", "adresse", "adresse_complement", "code_postal", "ville"]] - address_list = address_data.to_dict(orient='records') + address_data = df.loc[ + start_idx:end_idx, + [ + "identifiant_unique", + "adresse", + "adresse_complement", + "code_postal", + "ville", + ], + ] + address_list = address_data.to_dict(orient="records") # Send data to Ban API and receive normalized data normalized_data = send_batch_to_api(address_list) @@ -185,9 +199,7 @@ def normalize_url(url): return None -def find_differences( - df_act, df_rev_act, columns_to_exclude, normalization_map -): +def find_differences(df_act, df_rev_act, columns_to_exclude, normalization_map): # Join the DataFrames on 'identifiant_unique' df_merged = pd.merge( df_act, df_rev_act, on="identifiant_unique", suffixes=("_act", "_rev_act") @@ -223,9 +235,9 @@ def find_differences( # Create masks for non-empty and differing values mask_non_empty = ( - df_merged[col_rev_act].notnull() - & (df_merged[col_act] != "") - & (df_merged[col_rev_act] != "") + df_merged[col_rev_act].notnull() + & (df_merged[col_act] != "") + & (df_merged[col_rev_act] != "") ) mask_different = df_merged[col_act] != df_merged[col_rev_act] @@ -237,8 +249,18 @@ def find_differences( # Remove rows where all elements are None (no differences found) df_differences = df_differences.dropna( how="all", - subset=[ - col for col in df_differences.columns if col != "identifiant_unique" - ], + subset=[col for col in df_differences.columns if col != "identifiant_unique"], ) return df_differences + + +def get_environment(dag_filepath): + return Path(dag_filepath).parent.name + + +def get_dag_name(dag_filepath, dag_name): + return get_environment(dag_filepath) + "_" + dag_name + + +def get_db_conn_id(dag_filepath): + return "lvao-" + get_environment(dag_filepath) diff --git a/docker-compose.yaml b/docker-compose.yaml index f577515..9579033 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,7 +23,7 @@ x-airflow-common: _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/development:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" diff --git a/requirements.txt b/requirements.txt index 7cf7e16..cfe03da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ # Astro Runtime includes the following pre-installed providers packages: https://docs.astronomer.io/astro/runtime-image-architecture#provider-packages pandas apache-airflow-providers-postgres - From 1f471a4087236c55dfc81328a725289da3c292d2 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Mon, 1 Apr 2024 22:58:21 +0200 Subject: [PATCH 2/6] manage download_dags for all env --- .gitignore | 4 ++- airflow-scheduler.Dockerfile | 3 +++ airflow-webserver.Dockerfile | 3 +++ dags/download_dags.py | 52 ++++++++++++++++++++++++------------ development/__init__.py | 0 development/download_dags.py | 1 - docker-compose.yaml | 4 ++- requirements.txt | 1 + 8 files changed, 48 insertions(+), 20 deletions(-) create mode 100644 development/__init__.py delete mode 120000 development/download_dags.py diff --git a/.gitignore b/.gitignore index bdd58e4..0bb00ca 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,7 @@ airflow_settings.yaml __pycache__/ astro logs +dags/development/ dags/preprod/ -dags/production/ \ No newline at end of file +dags/production/ +dags/utils/ diff --git a/airflow-scheduler.Dockerfile b/airflow-scheduler.Dockerfile index e1c0fb8..14dfbd6 100644 --- a/airflow-scheduler.Dockerfile +++ b/airflow-scheduler.Dockerfile @@ -4,6 +4,9 @@ FROM apache/airflow:2.8.2 RUN chown -R ${AIRFLOW_UID:-50000}:0 /opt/airflow USER ${AIRFLOW_UID:-50000}:0 +COPY ./requirements.txt /opt/airflow/requirements.txt +RUN pip install -r /opt/airflow/requirements.txt + # Copy the dags, logs, config, and plugins directories to the appropriate locations COPY ./dags/ /opt/airflow/dags/ COPY ./config/ /opt/airflow/config/ diff --git a/airflow-webserver.Dockerfile b/airflow-webserver.Dockerfile index 7992850..5e06f3d 100644 --- a/airflow-webserver.Dockerfile +++ b/airflow-webserver.Dockerfile @@ -4,6 +4,9 @@ FROM apache/airflow:2.8.2 RUN chown -R ${AIRFLOW_UID:-50000}:0 /opt/airflow USER ${AIRFLOW_UID:-50000}:0 +COPY ./requirements.txt /opt/airflow/requirements.txt +RUN pip install -r /opt/airflow/requirements.txt + # Copy the dags, logs, config, and plugins directories to the appropriate locations COPY ./dags/ /opt/airflow/dags/ diff --git a/dags/download_dags.py b/dags/download_dags.py index 7d6de32..95cce6b 100644 --- a/dags/download_dags.py +++ b/dags/download_dags.py @@ -1,8 +1,10 @@ import logging +import shutil from datetime import datetime, timedelta from pathlib import Path import airflow.configuration as conf +import decouple from airflow import DAG from airflow.models import DagBag from airflow.operators.python import PythonOperator @@ -12,24 +14,40 @@ def download_dags_from_s3(): - s3_hook = S3Hook(aws_conn_id="s3dags") - bucket_name = "qfdmo-airflow-dags" - keys = s3_hook.list_keys(bucket_name) - for key in keys: - dags_folder = conf.get("core", "dags_folder") - logging.warning(f"Downloading {key} from S3 to {dags_folder}") - file_path = Path(dags_folder, key) - file_path.unlink(missing_ok=True) - parent_folder = file_path.parent - parent_folder.mkdir(parents=True, exist_ok=True) - s3_hook.download_file( - key, - bucket_name=bucket_name, - local_path=parent_folder, - preserve_file_name=True, - use_autogenerated_subdir=False, - ) + dags_folder = conf.get("core", "dags_folder") + if ( + decouple.config("ENVIRONMENT") is not None + and decouple.config("ENVIRONMENT") == "DEVELOPMENT" + ): + dags_dirs = ["development"] + logging.warning("Skipping download_dags_from_s3 in development environment") + logging.warning(f"Copying dags from development to {dags_folder}") + home = Path(dags_folder).parent + # copy all from HOME/development to dags_folder/development + for subdir in dags_dirs: + source = Path(home, subdir) + destination = Path(dags_folder, subdir) + shutil.rmtree(destination, ignore_errors=True) + shutil.copytree(source, destination) + else: + s3_hook = S3Hook(aws_conn_id="s3dags") + bucket_name = "qfdmo-airflow-dags" + keys = s3_hook.list_keys(bucket_name) + for key in keys: + logging.warning(f"Downloading {key} from S3 to {dags_folder}") + file_path = Path(dags_folder, key) + file_path.unlink(missing_ok=True) + parent_folder = file_path.parent + parent_folder.mkdir(parents=True, exist_ok=True) + s3_hook.download_file( + key, + bucket_name=bucket_name, + local_path=parent_folder, + preserve_file_name=True, + use_autogenerated_subdir=False, + ) for subdir in dags_dirs: + logging.warning(f"Loading dags from {subdir}") dag_bag = DagBag(Path(dags_folder, subdir)) if dag_bag: for dag_id, dag in dag_bag.dags.items(): diff --git a/development/__init__.py b/development/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/development/download_dags.py b/development/download_dags.py deleted file mode 120000 index bd85439..0000000 --- a/development/download_dags.py +++ /dev/null @@ -1 +0,0 @@ -dags/download_dags.py \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 9579033..f1c5cd6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -4,6 +4,7 @@ x-airflow-common: environment: &airflow-common-env AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CORE__DAGS_FOLDER: '/opt/airflow/dags' AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' @@ -23,7 +24,8 @@ x-airflow-common: _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - - ${AIRFLOW_PROJ_DIR:-.}/development:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/development:/opt/airflow/development + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" diff --git a/requirements.txt b/requirements.txt index cfe03da..282ea9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ # Astro Runtime includes the following pre-installed providers packages: https://docs.astronomer.io/astro/runtime-image-architecture#provider-packages pandas apache-airflow-providers-postgres +python-decouple From 22b825be99ae6c22927c8a1fb809eaba3c45ba9e Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 2 Apr 2024 17:45:44 +0200 Subject: [PATCH 3/6] from local folder --- dags/download_dags.py | 21 +++++++++------------ docker-compose.yaml | 2 +- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/dags/download_dags.py b/dags/download_dags.py index 95cce6b..8170e65 100644 --- a/dags/download_dags.py +++ b/dags/download_dags.py @@ -10,25 +10,22 @@ from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -dags_dirs = ["preprod", "production"] - def download_dags_from_s3(): + dags_dirs = ["preprod", "production"] dags_folder = conf.get("core", "dags_folder") - if ( - decouple.config("ENVIRONMENT") is not None - and decouple.config("ENVIRONMENT") == "DEVELOPMENT" - ): - dags_dirs = ["development"] + local_dags_folder = decouple.config("LOCAL_DAGS_FOLDER", cast=str, default="") + if local_dags_folder: + environment = "development" + dags_dirs = [environment] logging.warning("Skipping download_dags_from_s3 in development environment") logging.warning(f"Copying dags from development to {dags_folder}") home = Path(dags_folder).parent # copy all from HOME/development to dags_folder/development - for subdir in dags_dirs: - source = Path(home, subdir) - destination = Path(dags_folder, subdir) - shutil.rmtree(destination, ignore_errors=True) - shutil.copytree(source, destination) + source = Path(str(local_dags_folder)) + destination = Path(dags_folder, environment) + shutil.rmtree(destination, ignore_errors=True) + shutil.copytree(source, destination) else: s3_hook = S3Hook(aws_conn_id="s3dags") bucket_name = "qfdmo-airflow-dags" diff --git a/docker-compose.yaml b/docker-compose.yaml index f1c5cd6..89a4ac8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,10 +24,10 @@ x-airflow-common: _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - - ${AIRFLOW_PROJ_DIR:-.}/development:/opt/airflow/development - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + - ~/workspace/beta.gouv.fr/qfdmo-dags/development:/opt/airflow/development user: "${AIRFLOW_UID:-50000}:0" extra_hosts: - "host.docker.internal:host-gateway" From 05cafe2a7c123fb0f454cd79ca3b8cb15c809ae4 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 3 Apr 2024 14:39:34 +0200 Subject: [PATCH 4/6] update docs and env --- README.md | 26 +++++++++++++++++++++++++- dags/.env.template | 1 + development/create_final_actors.py | 3 ++- docker-compose.yaml | 10 +++++----- 4 files changed, 33 insertions(+), 7 deletions(-) create mode 100644 dags/.env.template diff --git a/README.md b/README.md index 0e98951..de6f11d 100644 --- a/README.md +++ b/README.md @@ -63,11 +63,35 @@ les dags sont déployés sur un bucket s3, dans un dossier au nom de l'environne - s3://qfdmo-dags/preprod - s3://qfdmo-dags/production +Cette copie est faite via la CI/CD github action + Airflow est déployé avecun seul DAG `doswnload_dags_from_s3` qui télécharge les dags de preprod et de production à partir des repo s3. +### Déploiement des DAGs en environnement de développement + +En environnement de développement, on précisera l'emplacement des DAGs avec la variable d'environnement AIRFLOW_DAGS_LOCAL_FOLDER avant le lancement des container docker. Par exemple : + +```sh +export AIRFLOW_DAGS_LOCAL_FOLDER=$HOME/workspace/beta.gouv.fr/qfdmo-dags/development +``` + +Ce dossier est monté dans les containers docker à l'emplacement `/opt/airflow/development` + +Puis copier les variable d'environnement dags/.env.template vers dags/.env + +```sh +cp .env.template .env +``` + +Enfin, lancer les containers docker + +```sh +docker compose up +``` + ## Reste à faire - [ ] Aujourd'hui, on a 1 seule bucket de log pour tout les environnements -- [ ] Strategie pour publier des dag de preprod et de prod en les identifiant et en permettant des config différentes +- [ ] Strategie pour publier des dags de preprod et de prod en les identifiant et en permettant des config différentes - [ ] Déployer les dags sur le s3 de preprod quand on pousse le code dans la branche main - [ ] Déployer les dags sur le s3 de production quand on tag le repo avec un tags de release (format vx.y.z) diff --git a/dags/.env.template b/dags/.env.template new file mode 100644 index 0000000..af77a3b --- /dev/null +++ b/dags/.env.template @@ -0,0 +1 @@ +LOCAL_DAGS_FOLDER=/opt/airflow/development diff --git a/development/create_final_actors.py b/development/create_final_actors.py index 14d84c9..7e007f2 100755 --- a/development/create_final_actors.py +++ b/development/create_final_actors.py @@ -142,7 +142,8 @@ def write_data_to_postgres(**kwargs): "modifie_le", "naf_principal", "commentaires", - "horaires", + "horaires_osm", + "horaires_description", "description", ] ].to_sql( diff --git a/docker-compose.yaml b/docker-compose.yaml index 89a4ac8..6fc9f2d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,10 +14,10 @@ x-airflow-common: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' # Uncomment the next 4 lines to store logs in S3 - AIRFLOW__LOGGING__REMOTE_LOGGING: 'true' - AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://qfdmo-airflow-logs" - AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 's3logs' - AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "false" + # AIRFLOW__LOGGING__REMOTE_LOGGING: 'true' + # AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://qfdmo-airflow-logs" + # AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 's3logs' + # AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "false" AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. @@ -27,7 +27,7 @@ x-airflow-common: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - - ~/workspace/beta.gouv.fr/qfdmo-dags/development:/opt/airflow/development + - $AIRFLOW_DAGS_LOCAL_FOLDER:/opt/airflow/development user: "${AIRFLOW_UID:-50000}:0" extra_hosts: - "host.docker.internal:host-gateway" From 3be0948f687d7c44e92171b87c891200ee1cddeb Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 3 Apr 2024 14:42:12 +0200 Subject: [PATCH 5/6] CI/CD to deploy dags to s3 --- .github/workflows/push_dags_to_preprod.yml | 24 ++++++++++++++++++++++ .github/workflows/push_dags_to_prod.yml | 24 ++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 .github/workflows/push_dags_to_preprod.yml create mode 100644 .github/workflows/push_dags_to_prod.yml diff --git a/.github/workflows/push_dags_to_preprod.yml b/.github/workflows/push_dags_to_preprod.yml new file mode 100644 index 0000000..c954447 --- /dev/null +++ b/.github/workflows/push_dags_to_preprod.yml @@ -0,0 +1,24 @@ +name: Push DAGs to S3 + +on: + - pull_request + +env: + S3_HOST: https://cellar-c2.services.clever-cloud.com + FOLDER_SOURCE: development + S3_BUCKET_DESTINATION: s3://qfdmo-airflow-dags/preprod/ + AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_KEY }} + +jobs: + backup-production: + name: Copy local folder to Preprod s3 bucket + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Install AWS CLI dependencies + run: | + pip install awscli + - name: Push to S3 bucket preprod + run: aws --endpoint-url ${{ env.S3_HOST }} s3 sync --delete --quiet ${{ env.FOLDER_SOURCE }} ${{ env.S3_BUCKET_DESTINATION }} diff --git a/.github/workflows/push_dags_to_prod.yml b/.github/workflows/push_dags_to_prod.yml new file mode 100644 index 0000000..9343c58 --- /dev/null +++ b/.github/workflows/push_dags_to_prod.yml @@ -0,0 +1,24 @@ +name: Push DAGs to S3 + +on: + push: + tags: + - "v*.*.*" + +env: + S3_HOST: https://cellar-c2.services.clever-cloud.com + FOLDER_SOURCE: development/ + S3_BUCKET_DESTINATION: s3://qfdmo-airflow-dags/production/ + AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_KEY }} + +jobs: + backup-production: + name: Copy local folder to Production s3 bucket + runs-on: ubuntu-latest + steps: + - name: Install AWS CLI dependencies + run: | + pip install awscli + - name: Backup + run: aws --endpoint-url ${{ env.S3_HOST }} s3 sync --quiet ${{ env.FOLDER_SOURCE }} ${{ env.S3_BUCKET_DESTINATION }} From e859b38d78c89ff17119b0eced3ebe8d0890b04c Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 3 Apr 2024 16:33:04 +0200 Subject: [PATCH 6/6] remove useless CI/CD --- .github/workflows/push_dags_to_preprod.yml | 24 ---------------------- .github/workflows/push_dags_to_prod.yml | 24 ---------------------- README.md | 2 +- 3 files changed, 1 insertion(+), 49 deletions(-) delete mode 100644 .github/workflows/push_dags_to_preprod.yml delete mode 100644 .github/workflows/push_dags_to_prod.yml diff --git a/.github/workflows/push_dags_to_preprod.yml b/.github/workflows/push_dags_to_preprod.yml deleted file mode 100644 index c954447..0000000 --- a/.github/workflows/push_dags_to_preprod.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: Push DAGs to S3 - -on: - - pull_request - -env: - S3_HOST: https://cellar-c2.services.clever-cloud.com - FOLDER_SOURCE: development - S3_BUCKET_DESTINATION: s3://qfdmo-airflow-dags/preprod/ - AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_KEY }} - -jobs: - backup-production: - name: Copy local folder to Preprod s3 bucket - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: Install AWS CLI dependencies - run: | - pip install awscli - - name: Push to S3 bucket preprod - run: aws --endpoint-url ${{ env.S3_HOST }} s3 sync --delete --quiet ${{ env.FOLDER_SOURCE }} ${{ env.S3_BUCKET_DESTINATION }} diff --git a/.github/workflows/push_dags_to_prod.yml b/.github/workflows/push_dags_to_prod.yml deleted file mode 100644 index 9343c58..0000000 --- a/.github/workflows/push_dags_to_prod.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: Push DAGs to S3 - -on: - push: - tags: - - "v*.*.*" - -env: - S3_HOST: https://cellar-c2.services.clever-cloud.com - FOLDER_SOURCE: development/ - S3_BUCKET_DESTINATION: s3://qfdmo-airflow-dags/production/ - AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_KEY }} - -jobs: - backup-production: - name: Copy local folder to Production s3 bucket - runs-on: ubuntu-latest - steps: - - name: Install AWS CLI dependencies - run: | - pip install awscli - - name: Backup - run: aws --endpoint-url ${{ env.S3_HOST }} s3 sync --quiet ${{ env.FOLDER_SOURCE }} ${{ env.S3_BUCKET_DESTINATION }} diff --git a/README.md b/README.md index de6f11d..1e9d7ca 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ Airflow est déployé avecun seul DAG `doswnload_dags_from_s3` qui télécharge En environnement de développement, on précisera l'emplacement des DAGs avec la variable d'environnement AIRFLOW_DAGS_LOCAL_FOLDER avant le lancement des container docker. Par exemple : ```sh -export AIRFLOW_DAGS_LOCAL_FOLDER=$HOME/workspace/beta.gouv.fr/qfdmo-dags/development +export AIRFLOW_DAGS_LOCAL_FOLDER=$HOME/workspace/beta.gouv.fr/quefairedemesobjets/dags ``` Ce dossier est monté dans les containers docker à l'emplacement `/opt/airflow/development`