Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A way to manage dags from s3 with preprod and prod version #6

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
airflow_settings.yaml
__pycache__/
astro
logs
logs
dags/development/
dags/preprod/
dags/production/
dags/utils/
39 changes: 37 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,42 @@ AIRFLOW__LOGGING__ENCRYPT_S3_LOGS=false

Attention à ajouter le paramètre enfpoint_url pour le stockage Cellar de CleverCloud

## déploiement des dags en preprod et en prod

les dags sont déployés sur un bucket s3, dans un dossier au nom de l'environnement sur clevercloud :

- s3://qfdmo-dags/preprod
- s3://qfdmo-dags/production

Cette copie est faite via la CI/CD github action

Airflow est déployé avecun seul DAG `doswnload_dags_from_s3` qui télécharge les dags de preprod et de production à partir des repo s3.

### Déploiement des DAGs en environnement de développement

En environnement de développement, on précisera l'emplacement des DAGs avec la variable d'environnement AIRFLOW_DAGS_LOCAL_FOLDER avant le lancement des container docker. Par exemple :

```sh
export AIRFLOW_DAGS_LOCAL_FOLDER=$HOME/workspace/beta.gouv.fr/quefairedemesobjets/dags
```

Ce dossier est monté dans les containers docker à l'emplacement `/opt/airflow/development`

Puis copier les variable d'environnement dags/.env.template vers dags/.env

```sh
cp .env.template .env
```

Enfin, lancer les containers docker

```sh
docker compose up
```

## Reste à faire

- [ ] Aujourd'hui, on a 1 seule buket de log pour tout les environnement
- [ ] Strategie pour publier des dag de preprod et de prod en les identifiant et en permettant des config différentes
- [ ] Aujourd'hui, on a 1 seule bucket de log pour tout les environnements
- [ ] Strategie pour publier des dags de preprod et de prod en les identifiant et en permettant des config différentes
- [ ] Déployer les dags sur le s3 de preprod quand on pousse le code dans la branche main
- [ ] Déployer les dags sur le s3 de production quand on tag le repo avec un tags de release (format vx.y.z)
3 changes: 3 additions & 0 deletions airflow-scheduler.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ FROM apache/airflow:2.8.2
RUN chown -R ${AIRFLOW_UID:-50000}:0 /opt/airflow
USER ${AIRFLOW_UID:-50000}:0

COPY ./requirements.txt /opt/airflow/requirements.txt
RUN pip install -r /opt/airflow/requirements.txt

# Copy the dags, logs, config, and plugins directories to the appropriate locations
COPY ./dags/ /opt/airflow/dags/
COPY ./config/ /opt/airflow/config/
Expand Down
3 changes: 3 additions & 0 deletions airflow-webserver.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ FROM apache/airflow:2.8.2
RUN chown -R ${AIRFLOW_UID:-50000}:0 /opt/airflow
USER ${AIRFLOW_UID:-50000}:0

COPY ./requirements.txt /opt/airflow/requirements.txt
RUN pip install -r /opt/airflow/requirements.txt

# Copy the dags, logs, config, and plugins directories to the appropriate locations
COPY ./dags/ /opt/airflow/dags/

Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions dags/.env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
LOCAL_DAGS_FOLDER=/opt/airflow/development
76 changes: 76 additions & 0 deletions dags/download_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
import shutil
from datetime import datetime, timedelta
from pathlib import Path

import airflow.configuration as conf
import decouple
from airflow import DAG
from airflow.models import DagBag
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


def download_dags_from_s3():
dags_dirs = ["preprod", "production"]
dags_folder = conf.get("core", "dags_folder")
local_dags_folder = decouple.config("LOCAL_DAGS_FOLDER", cast=str, default="")
if local_dags_folder:
environment = "development"
dags_dirs = [environment]
logging.warning("Skipping download_dags_from_s3 in development environment")
logging.warning(f"Copying dags from development to {dags_folder}")
home = Path(dags_folder).parent
# copy all from HOME/development to dags_folder/development
source = Path(str(local_dags_folder))
destination = Path(dags_folder, environment)
shutil.rmtree(destination, ignore_errors=True)
shutil.copytree(source, destination)
else:
s3_hook = S3Hook(aws_conn_id="s3dags")
bucket_name = "qfdmo-airflow-dags"
keys = s3_hook.list_keys(bucket_name)
for key in keys:
logging.warning(f"Downloading {key} from S3 to {dags_folder}")
file_path = Path(dags_folder, key)
file_path.unlink(missing_ok=True)
parent_folder = file_path.parent
parent_folder.mkdir(parents=True, exist_ok=True)
s3_hook.download_file(
key,
bucket_name=bucket_name,
local_path=parent_folder,
preserve_file_name=True,
use_autogenerated_subdir=False,
)
for subdir in dags_dirs:
logging.warning(f"Loading dags from {subdir}")
dag_bag = DagBag(Path(dags_folder, subdir))
if dag_bag:
for dag_id, dag in dag_bag.dags.items():
globals()[subdir + "_" + dag_id] = dag


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2022, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"download_dags_from_s3",
default_args=default_args,
description="DAG to download dags from S3",
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:

download_dags = PythonOperator(
task_id="download_dags_from_s3", python_callable=download_dags_from_s3, dag=dag
)

download_dags
File renamed without changes.
Loading