diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e93d5a55a0755..654bc85cae8b8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -848,16 +848,35 @@ def date_range( return [info.logical_date for info in it] def is_fixed_time_schedule(self): + """Figures out if the schedule has a fixed time (e.g. 3 AM every day). + + Detection is done by "peeking" the next two cron trigger time; if the + two times have the same minute and hour value, the schedule is fixed, + and we *don't* need to perform the DST fix. + + This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). + + Do not try to understand what this actually means. It is old logic that + should not be used anywhere. + """ warnings.warn( "`DAG.is_fixed_time_schedule()` is deprecated.", category=RemovedInAirflow3Warning, stacklevel=2, ) - try: - return not self.timetable._should_fix_dst - except AttributeError: + + from airflow.timetables._cron import CronMixin + + if not isinstance(self.timetable, CronMixin): return True + from croniter import croniter + + cron = croniter(self.timetable._expression) + next_a = cron.get_next(datetime.datetime) + next_b = cron.get_next(datetime.datetime) + return next_b.minute == next_a.minute and next_b.hour == next_a.hour + def following_schedule(self, dttm): """ Calculate the following schedule for this dag in UTC. diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index f9b8efa465914..9f5878b197232 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -17,7 +17,6 @@ from __future__ import annotations import datetime -from functools import cached_property from typing import TYPE_CHECKING, Any from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException @@ -32,21 +31,32 @@ from pendulum import DateTime -def _is_schedule_fixed(expression: str) -> bool: - """Figures out if the schedule has a fixed time (e.g. 3 AM every day). +def _covers_every_hour(cron: croniter) -> bool: + """Check whether the given cron runs at least once an hour. - :return: True if the schedule has a fixed time, False if not. + This indicates whether we need to implement a workaround for (what I call) + the "fold hour problem". Folding happens when a region switches time + backwards, usually as a part of ending a DST period, causing a block of time + to occur twice in the wall clock. This is indicated by the ``fold`` flag on + datetime. - Detection is done by "peeking" the next two cron trigger time; if the - two times have the same minute and hour value, the schedule is fixed, - and we *don't* need to perform the DST fix. + As an example, Switzerland in 2023 ended DST on 3am (wall clock time, UTC+2) + by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if + the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1, folded) + instead of 3:30am. - This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). + While this technically happens for all runs (in such a timezone), we only + really care about runs that happen at least once an hour, and can + provide a somewhat reasonable rationale to skip the fold hour for things + such as ``*/2`` (every two hour). So we try to *minially* peak into croniter + internals to work around the issue. + + The check is simple since croniter internally normalizes things to ``*``. + More edge cases can be added later as needed. + + See also: https://github.com/kiorky/croniter/issues/56. """ - cron = croniter(expression) - next_a = cron.get_next(datetime.datetime) - next_b = cron.get_next(datetime.datetime) - return next_b.minute == next_a.minute and next_b.hour == next_a.hour + return cron.expanded[1] == ["*"] class CronMixin: @@ -91,18 +101,12 @@ def validate(self) -> None: except (CroniterBadCronError, CroniterBadDateError) as e: raise AirflowTimetableInvalid(str(e)) - @cached_property - def _should_fix_dst(self) -> bool: - # This is lazy so instantiating a schedule does not immediately raise - # an exception. Validity is checked with validate() during DAG-bagging. - return not _is_schedule_fixed(self._expression) - def _get_next(self, current: DateTime) -> DateTime: """Get the first schedule after specified time, with DST fixed.""" naive = make_naive(current, self._timezone) cron = croniter(self._expression, start_time=naive) scheduled = cron.get_next(datetime.datetime) - if not self._should_fix_dst: + if not _covers_every_hour(cron): return convert_to_utc(make_aware(scheduled, self._timezone)) delta = scheduled - naive return convert_to_utc(current.in_timezone(self._timezone) + delta) @@ -112,7 +116,7 @@ def _get_prev(self, current: DateTime) -> DateTime: naive = make_naive(current, self._timezone) cron = croniter(self._expression, start_time=naive) scheduled = cron.get_prev(datetime.datetime) - if not self._should_fix_dst: + if not _covers_every_hour(cron): return convert_to_utc(make_aware(scheduled, self._timezone)) delta = naive - scheduled return convert_to_utc(current.in_timezone(self._timezone) - delta) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 596c274cf7152..928ae83ab3d02 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -253,3 +253,308 @@ def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expec restriction=TimeRestriction(None, None, True), ) assert info == expected_info + + +class TestCronIntervalDst: + """Test cron interval timetable can correctly enter a DST boundary. + + Zurich (Switzerland) is chosen since it is +1/+2 DST, making it a bit easier + to get my head around the mental timezone conversion. + + In 2023, DST entered on 26th Mar, 2am local clocks (1am UTC) were turned + forward to 3am. DST exited on 29th Oct, 3am local clocks (1am UTC) were + turned backward to 2am (making the 2:XX hour fold). + """ + + def test_entering_exact(self) -> None: + timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") + restriction = TimeRestriction( + earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Last run before DST. Interval starts and ends on 2am UTC (local time is +1). + next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 24, 2, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), + ) + + # Crossing the DST switch. Interval starts on 2am UTC (local time +1) + # but ends on 1am UTC (local time is +2). + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + ) + + # In DST. Interval starts and ends on 1am UTC (local time is +2). + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 27, 1, tz=TIMEZONE), + ) + + def test_entering_skip(self) -> None: + timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") + restriction = TimeRestriction( + earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Last run before DST. Interval starts and ends on 1am UTC (local time is +1). + next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 24, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), + ) + + # Crossing the DST switch. Interval starts on 1am UTC (local time +1) + # and ends on 1am UTC (local time is +2) since the 2am wall clock time + # does not logically exist due to entering DST. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + ) + + # In DST. Interval starts on 1am UTC (local time is +2 but 2am local + # time is not possible) and ends on 0am UTC. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 27, 0, tz=TIMEZONE), + ) + + def test_exiting_exact(self) -> None: + timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") + restriction = TimeRestriction( + earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Last run in DST. Interval starts and ends on 1am UTC (local time is +2). + next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 27, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), + ) + + # Crossing the DST switch. Interval starts on 1am UTC (local time +2) + # and ends on 2am UTC (local time +1). + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), + ) + + # Out of DST. Interval starts and ends on 2am UTC (local time is +1). + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), + pendulum.datetime(2023, 10, 30, 2, tz=TIMEZONE), + ) + + def test_exiting_fold(self) -> None: + timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") + restriction = TimeRestriction( + earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Last run before folding. Interval starts and ends on 0am UTC (local + # time is +2). + next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 27, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), + ) + + # Account for folding. Interval starts on 0am UTC (local time +2) and + # ends on 1am UTC (local time +1). There are two 2am local times on the + # 29th due to folding. We end on the second one (fold=1. There's no + # logical reason here; this is simply what Airflow has been doing since + # a long time ago, and there's no point breaking it. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), + ) + + # Stepping out of DST. Interval starts from the folded 2am local time + # (1am UTC out of DST) since that is when the previous interval ended. + # It ends at 1am UTC (local time is +1) normally. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 30, 1, tz=TIMEZONE), + ) + + +class TestCronIntervalDstNonTrivial: + """These tests are similar to TestCronIntervalDst but with a different cron. + + The original test cases are from apache/airflow#7999. In 2020 at Los Angeles, + DST started on 8th Mar; 10am UTC was turned from 2am UTC-8 to 3am UTC-7. + """ + + def test_7_to_8_entering(self): + timetable = CronDataIntervalTimetable("0 7-8 * * *", timezone="America/Los_Angeles") + restriction = TimeRestriction( + earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Triggers as expected before the interval touches the DST transition. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), + ) + + # This interval ends an hour early since it includes the DST switch! + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + ) + + # We're fully into DST so the interval is as expected. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 8 + 7, tz=TIMEZONE), + ) + + def test_7_and_9_entering(self): + timetable = CronDataIntervalTimetable("0 7,9 * * *", timezone="America/Los_Angeles") + restriction = TimeRestriction( + earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + latest=None, + catchup=True, + ) + + # Triggers as expected before the interval touches the DST transition. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), + ) + + # This interval ends an hour early since it includes the DST switch! + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + ) + + # We're fully into DST so the interval is as expected. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 9 + 7, tz=TIMEZONE), + ) + + +def test_fold_scheduling(): + timetable = CronDataIntervalTimetable("*/30 * * * *", timezone="Europe/Zurich") + restriction = TimeRestriction( + earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), # Locally 1:30 (DST). + latest=None, + catchup=True, + ) + + # Still in DST, acting normally. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), # Locally 2am (DST). + ) + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), # Locally 2:30 (DST). + ) + + # Crossing into fold. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), # Locally 2am (fold, not DST). + ) + + # In the "fold zone". + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), # Locally 2am (fold, not DST). + ) + + # Stepping out of fold. + next_info = timetable.next_dagrun_info( + last_automated_data_interval=next_info.data_interval, + restriction=restriction, + ) + assert next_info and next_info.data_interval == DataInterval( + pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 2, 0, tz=TIMEZONE), # Locally 3am (not DST). + )