Skip to content

Commit

Permalink
Account for change in UTC offset when calculating next schedule (#35887)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Standish <[email protected]>
  • Loading branch information
uranusjr and dstandish authored Dec 6, 2023
1 parent ace97c0 commit c4549d7
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 23 deletions.
25 changes: 22 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,16 +848,35 @@ def date_range(
return [info.logical_date for info in it]

def is_fixed_time_schedule(self):
"""Figures out if the schedule has a fixed time (e.g. 3 AM every day).
Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.
This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
Do not try to understand what this actually means. It is old logic that
should not be used anywhere.
"""
warnings.warn(
"`DAG.is_fixed_time_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
try:
return not self.timetable._should_fix_dst
except AttributeError:

from airflow.timetables._cron import CronMixin

if not isinstance(self.timetable, CronMixin):
return True

from croniter import croniter

cron = croniter(self.timetable._expression)
next_a = cron.get_next(datetime.datetime)
next_b = cron.get_next(datetime.datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour

def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
Expand Down
44 changes: 24 additions & 20 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import datetime
from functools import cached_property
from typing import TYPE_CHECKING, Any

from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
Expand All @@ -32,21 +31,32 @@
from pendulum import DateTime


def _is_schedule_fixed(expression: str) -> bool:
"""Figures out if the schedule has a fixed time (e.g. 3 AM every day).
def _covers_every_hour(cron: croniter) -> bool:
"""Check whether the given cron runs at least once an hour.
:return: True if the schedule has a fixed time, False if not.
This indicates whether we need to implement a workaround for (what I call)
the "fold hour problem". Folding happens when a region switches time
backwards, usually as a part of ending a DST period, causing a block of time
to occur twice in the wall clock. This is indicated by the ``fold`` flag on
datetime.
Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.
As an example, Switzerland in 2023 ended DST on 3am (wall clock time, UTC+2)
by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if
the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1, folded)
instead of 3:30am.
This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
While this technically happens for all runs (in such a timezone), we only
really care about runs that happen at least once an hour, and can
provide a somewhat reasonable rationale to skip the fold hour for things
such as ``*/2`` (every two hour). So we try to *minially* peak into croniter
internals to work around the issue.
The check is simple since croniter internally normalizes things to ``*``.
More edge cases can be added later as needed.
See also: https://github.com/kiorky/croniter/issues/56.
"""
cron = croniter(expression)
next_a = cron.get_next(datetime.datetime)
next_b = cron.get_next(datetime.datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour
return cron.expanded[1] == ["*"]


class CronMixin:
Expand Down Expand Up @@ -91,18 +101,12 @@ def validate(self) -> None:
except (CroniterBadCronError, CroniterBadDateError) as e:
raise AirflowTimetableInvalid(str(e))

@cached_property
def _should_fix_dst(self) -> bool:
# This is lazy so instantiating a schedule does not immediately raise
# an exception. Validity is checked with validate() during DAG-bagging.
return not _is_schedule_fixed(self._expression)

def _get_next(self, current: DateTime) -> DateTime:
"""Get the first schedule after specified time, with DST fixed."""
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_next(datetime.datetime)
if not self._should_fix_dst:
if not _covers_every_hour(cron):
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = scheduled - naive
return convert_to_utc(current.in_timezone(self._timezone) + delta)
Expand All @@ -112,7 +116,7 @@ def _get_prev(self, current: DateTime) -> DateTime:
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_prev(datetime.datetime)
if not self._should_fix_dst:
if not _covers_every_hour(cron):
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = naive - scheduled
return convert_to_utc(current.in_timezone(self._timezone) - delta)
Expand Down
Loading

0 comments on commit c4549d7

Please sign in to comment.