Skip to content

Commit

Permalink
Merge pull request #6 from galipnik/SIANXKE-343-deactivate-periodic-t…
Browse files Browse the repository at this point in the history
…ask-after-n-executions

SIANXKE-343: Deactivate task after n executions
  • Loading branch information
nezhar authored Oct 29, 2023
2 parents 1ff5935 + 6af928b commit 27ebaad
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def _handle_options(self):
def periodic_tasks_for_processing():
return PeriodicFutureTask.objects.filter(is_active=True)

@staticmethod
def number_of_corresponding_single_tasks(p_task):
return FutureTask.objects.filter(periodic_parent_task_id=p_task.pk).count()

def update_last_task_creation(self):
now = timezone.now()
for p_task in self.periodic_tasks_for_processing():
Expand All @@ -48,6 +52,14 @@ def handle_tick(self):
relevant_dts = croniter_range(last_population, now, p_task.cron_string)

for dt in relevant_dts:
if (
p_task.max_number_of_executions is not None
and self.number_of_corresponding_single_tasks(p_task)
>= p_task.max_number_of_executions
):
p_task.is_active = False
break

dt_format = "%Y-%m-%d %H:%M%z"
task_id = f"{p_task.periodic_task_id} ({dt.strftime(dt_format)})"
FutureTask.objects.create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.2.22 on 2023-10-20 12:20

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_future_tasks", "0004_futuretask_status_interrupted"),
]

operations = [
migrations.AddField(
model_name="periodicfuturetask",
name="max_number_of_executions",
field=models.IntegerField(
blank=True, null=True, verbose_name="Maximal number of executions"
),
),
]
13 changes: 10 additions & 3 deletions django_future_tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class PeriodicFutureTask(models.Model):
)
cron_string = CronField()
is_active = models.BooleanField(_("Active"), default=True)
max_number_of_executions = models.IntegerField(
_("Maximal number of executions"), null=True, blank=True
)
__original_is_active = None
last_task_creation = models.DateTimeField(
_("Last single task creation"),
Expand All @@ -91,14 +94,18 @@ class PeriodicFutureTask(models.Model):
)

def next_planned_execution(self):
if self.is_active:
if not self.is_active or (
self.max_number_of_executions is not None
and FutureTask.objects.filter(periodic_parent_task=self.pk).count()
>= self.max_number_of_executions
):
return None
else:
now = datetime.datetime.now()
return format(
croniter.croniter(self.cron_string, now).get_next(datetime.datetime),
settings.DATETIME_FORMAT,
)
else:
return None

def cron_humnan_readable(self):
descriptor = ExpressionDescriptor(
Expand Down
40 changes: 29 additions & 11 deletions tests/testapp/tests/test_periodic_future_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from tests.core import settings
from tests.testapp.mixins import PopulatePeriodicTaskCommandMixin

SLEEP_TIME = 1.1
SLEEP_TIME = 1.5


class TestPeriodicFutureTasks(PopulatePeriodicTaskCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

last_task_creation = timezone.now() - timedelta(hours=2)
self.original_last_task_creation = timezone.now() - timedelta(hours=2)

self.task_active = PeriodicFutureTask.objects.create(
periodic_task_id="periodic task",
Expand All @@ -24,26 +24,35 @@ def setUp(self):
)

p_task = PeriodicFutureTask.objects.get(pk=self.task_active.pk)
p_task.last_task_creation = last_task_creation
p_task.last_task_creation = self.original_last_task_creation
p_task.save()

self.task_not_active = PeriodicFutureTask.objects.create(
periodic_task_id="periodic inactive task",
type=settings.FUTURE_TASK_TYPE_ONE,
cron_string="42 * * * *",
last_task_creation=last_task_creation,
is_active=False,
)

self.task_max_one_execution = PeriodicFutureTask.objects.create(
periodic_task_id="periodic task with maximal one execution",
type=settings.FUTURE_TASK_TYPE_ONE,
cron_string="42 * * * *",
max_number_of_executions=1,
)

p_task = PeriodicFutureTask.objects.get(pk=self.task_max_one_execution.pk)
p_task.last_task_creation = self.original_last_task_creation
p_task.save()

def test_periodic_future_task_populate_active_task(self):
p_task = PeriodicFutureTask.objects.get(pk=self.task_active.pk)
last_task_creation = p_task.last_task_creation

# Make sure that task population has been processed.
time.sleep(SLEEP_TIME)

p_task.refresh_from_db()
self.assertTrue(p_task.last_task_creation > last_task_creation)
self.assertTrue(p_task.last_task_creation > self.original_last_task_creation)
self.assertEqual(
FutureTask.objects.filter(periodic_parent_task_id=p_task.pk).count(), 2
)
Expand All @@ -54,27 +63,36 @@ def test_periodic_future_task_populate_active_task(self):

def test_periodic_future_task_populate_inactive_task(self):
p_task = PeriodicFutureTask.objects.get(pk=self.task_not_active.pk)
last_task_creation = p_task.last_task_creation

# Make sure that task population has been processed.
time.sleep(SLEEP_TIME)

p_task.refresh_from_db()
self.assertEqual(p_task.last_task_creation, last_task_creation)
self.assertFalse(
FutureTask.objects.filter(periodic_parent_task_id=p_task.pk).exists()
)

def test_periodic_future_task_deactivation_and_activation(self):
p_task = PeriodicFutureTask.objects.get(pk=self.task_active.pk)
last_task_creation = p_task.last_task_creation

p_task.is_active = False
p_task.save()
p_task.refresh_from_db()
self.assertEqual(p_task.last_task_creation, last_task_creation)
self.assertEqual(p_task.last_task_creation, self.original_last_task_creation)

p_task.is_active = True
p_task.save()
p_task.refresh_from_db()
self.assertTrue(p_task.last_task_creation > last_task_creation)
self.assertTrue(p_task.last_task_creation > self.original_last_task_creation)

def test_periodic_future_task_max_one_exection(self):
p_task = PeriodicFutureTask.objects.get(pk=self.task_max_one_execution.pk)

# Make sure that task population has been processed.
time.sleep(SLEEP_TIME)

p_task.refresh_from_db()
self.assertEqual(
FutureTask.objects.filter(periodic_parent_task_id=p_task.pk).count(), 1
)
self.assertFalse(p_task.is_active)

0 comments on commit 27ebaad

Please sign in to comment.