-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A way to manage dags from s3 with preprod and prod version (#6)
- Loading branch information
Showing
16 changed files
with
464 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,4 +5,8 @@ | |
airflow_settings.yaml | ||
__pycache__/ | ||
astro | ||
logs | ||
logs | ||
dags/development/ | ||
dags/preprod/ | ||
dags/production/ | ||
dags/utils/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
LOCAL_DAGS_FOLDER=/opt/airflow/development |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import logging | ||
import shutil | ||
from datetime import datetime, timedelta | ||
from pathlib import Path | ||
|
||
import airflow.configuration as conf | ||
import decouple | ||
from airflow import DAG | ||
from airflow.models import DagBag | ||
from airflow.operators.python import PythonOperator | ||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook | ||
|
||
|
||
def download_dags_from_s3(): | ||
dags_dirs = ["preprod", "production"] | ||
dags_folder = conf.get("core", "dags_folder") | ||
local_dags_folder = decouple.config("LOCAL_DAGS_FOLDER", cast=str, default="") | ||
if local_dags_folder: | ||
environment = "development" | ||
dags_dirs = [environment] | ||
logging.warning("Skipping download_dags_from_s3 in development environment") | ||
logging.warning(f"Copying dags from development to {dags_folder}") | ||
home = Path(dags_folder).parent | ||
# copy all from HOME/development to dags_folder/development | ||
source = Path(str(local_dags_folder)) | ||
destination = Path(dags_folder, environment) | ||
shutil.rmtree(destination, ignore_errors=True) | ||
shutil.copytree(source, destination) | ||
else: | ||
s3_hook = S3Hook(aws_conn_id="s3dags") | ||
bucket_name = "qfdmo-airflow-dags" | ||
keys = s3_hook.list_keys(bucket_name) | ||
for key in keys: | ||
logging.warning(f"Downloading {key} from S3 to {dags_folder}") | ||
file_path = Path(dags_folder, key) | ||
file_path.unlink(missing_ok=True) | ||
parent_folder = file_path.parent | ||
parent_folder.mkdir(parents=True, exist_ok=True) | ||
s3_hook.download_file( | ||
key, | ||
bucket_name=bucket_name, | ||
local_path=parent_folder, | ||
preserve_file_name=True, | ||
use_autogenerated_subdir=False, | ||
) | ||
for subdir in dags_dirs: | ||
logging.warning(f"Loading dags from {subdir}") | ||
dag_bag = DagBag(Path(dags_folder, subdir)) | ||
if dag_bag: | ||
for dag_id, dag in dag_bag.dags.items(): | ||
globals()[subdir + "_" + dag_id] = dag | ||
|
||
|
||
default_args = { | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"start_date": datetime(2022, 1, 1), | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
"retries": 1, | ||
"retry_delay": timedelta(minutes=5), | ||
} | ||
|
||
with DAG( | ||
"download_dags_from_s3", | ||
default_args=default_args, | ||
description="DAG to download dags from S3", | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) as dag: | ||
|
||
download_dags = PythonOperator( | ||
task_id="download_dags_from_s3", python_callable=download_dags_from_s3, dag=dag | ||
) | ||
|
||
download_dags |
File renamed without changes.
Oops, something went wrong.