diff --git a/django_future_tasks/management/commands/process_future_tasks.py b/django_future_tasks/management/commands/process_future_tasks.py index 738f0aa..edf68bf 100644 --- a/django_future_tasks/management/commands/process_future_tasks.py +++ b/django_future_tasks/management/commands/process_future_tasks.py @@ -47,7 +47,7 @@ def _handle_options(self, options): def tasks_for_processing(): return FutureTask.objects.filter( eta__lte=timezone.now(), status=FutureTask.FUTURE_TASK_STATUS_OPEN - ) + ).order_by("eta") @staticmethod def _convert_exception_args(args): diff --git a/requirements.txt b/requirements.txt index 9697fbe..659e920 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ # Development dependencies coverage>=7.2.3,<7.3 +time-machine>=2.15.0,<2.17.0 # Linters and formatters pre-commit>=3.2.2,<3.3 diff --git a/tests/core/settings.py b/tests/core/settings.py index da5b0d6..250c3f7 100644 --- a/tests/core/settings.py +++ b/tests/core/settings.py @@ -103,12 +103,14 @@ FUTURE_TASK_TYPE_TWO = "task_two" FUTURE_TASK_TYPE_ERROR = "task_error" FUTURE_TASK_TYPE_INTERRUPTION = "task_interruption" +FUTURE_TASK_TYPE_ETA_ORDERING = "task_eta_ordering" FUTURE_TASK_TYPES = ( (FUTURE_TASK_TYPE_ONE, "Task 1"), (FUTURE_TASK_TYPE_TWO, "Task 2"), (FUTURE_TASK_TYPE_ERROR, "Task Error"), (FUTURE_TASK_TYPE_INTERRUPTION, "Task Interruption"), + (FUTURE_TASK_TYPE_ETA_ORDERING, "Task ETA Ordering"), ) STATIC_URL = "/static/" diff --git a/tests/testapp/handlers.py b/tests/testapp/handlers.py index 758a409..926bca6 100644 --- a/tests/testapp/handlers.py +++ b/tests/testapp/handlers.py @@ -1,5 +1,6 @@ import time from sys import intern +from time import monotonic_ns from django.dispatch import receiver @@ -9,7 +10,7 @@ @receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ONE)) def my_task_function1(sender, instance, **kwargs): - time.sleep(0.5) + pass @receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_TWO)) @@ -25,3 +26,9 @@ def my_task_function_error(sender, instance, **kwargs): @receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_INTERRUPTION)) def my_task_function_interruption(sender, instance, **kwargs): time.sleep(10) + + +@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ETA_ORDERING)) +def my_task_function_eta_ordering(sender, instance, **kwargs): + instance.result = monotonic_ns() + instance.save() diff --git a/tests/testapp/tests/test_future_tasks.py b/tests/testapp/tests/test_future_tasks.py index e0875d6..3a715b2 100644 --- a/tests/testapp/tests/test_future_tasks.py +++ b/tests/testapp/tests/test_future_tasks.py @@ -2,7 +2,9 @@ import signal import time from datetime import timedelta +from timeit import default_timer +import time_machine from django.core.management import call_command from django.test import TestCase, TransactionTestCase from django.utils import timezone @@ -11,138 +13,162 @@ from tests.core import settings from tests.testapp.mixins import ProcessTasksCommandMixin -SLEEP_TIME = 2.2 +class WaitForTaskStatusTimeout(Exception): + pass -class TestFutureTasks(ProcessTasksCommandMixin, TransactionTestCase): - def setUp(self): - super().setUp() - today = timezone.now() - yesterday = today - timedelta(days=1) - tomorrow = today + timedelta(days=1) - - self.task1 = FutureTask.objects.create( - task_id="task1", eta=yesterday, type=settings.FUTURE_TASK_TYPE_ONE - ) +def _wait_for_task_status(task, status, tick_seconds=0.1, timeout_seconds=2): + start_time = default_timer() + while task.status != status: + if default_timer() - start_time >= timeout_seconds: + raise WaitForTaskStatusTimeout( + f"Timeout while waiting for task status. Actual: '{task.status}' Expected: '{status}'" + ) + task.refresh_from_db() + time.sleep(tick_seconds) - self.task2 = FutureTask.objects.create( - task_id="task2", eta=tomorrow, type=settings.FUTURE_TASK_TYPE_TWO - ) - self.task_error = FutureTask.objects.create( - task_id="task_error", eta=yesterday, type=settings.FUTURE_TASK_TYPE_ERROR +class TestProcessFutureTasks(ProcessTasksCommandMixin, TransactionTestCase): + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_eta_now(self): + start_time = default_timer() + task = FutureTask.objects.create( + task_id="task", eta=timezone.now(), type=settings.FUTURE_TASK_TYPE_ONE ) - - def test_future_task_process_task(self): - task = FutureTask.objects.get(pk=self.task1.pk) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - - # Make sure that task has been processed. - time.sleep(SLEEP_TIME) - - task.refresh_from_db() - self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE) + _wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE) + end_time = default_timer() self.assertIsNotNone(task.execution_time) - self.assertGreater(task.execution_time, 0.5) - self.assertLess(task.execution_time, 1) - - def test_future_task_no_task_to_process(self): - task = FutureTask.objects.get(pk=self.task2.pk) + self.assertGreater(task.execution_time, 0.0) + self.assertLess(task.execution_time, end_time - start_time) + + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_eta_future(self): + task = FutureTask.objects.create( + task_id="task", + eta=timezone.now() + timedelta(microseconds=1), + type=settings.FUTURE_TASK_TYPE_TWO, + ) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - - # Make sure that task has been processed. - time.sleep(SLEEP_TIME) - + try: + _wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE) + except WaitForTaskStatusTimeout: + pass task.refresh_from_db() self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - def test_future_task_process_error(self): - task = FutureTask.objects.get(pk=self.task_error.pk) + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_error(self): + task = FutureTask.objects.create( + task_id="task", eta=timezone.now(), type=settings.FUTURE_TASK_TYPE_ERROR + ) + print(FutureTask.objects.all()) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - - # Make sure that task has been processed. - time.sleep(SLEEP_TIME) - - task.refresh_from_db() - self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR) + _wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_ERROR) self.assertEqual(task.result["args"], ["task error"]) - -class TestFutureTaskInterruption(ProcessTasksCommandMixin, TransactionTestCase): - def setUp(self): - super().setUp() - - yesterday = timezone.now() - timedelta(days=1) - - self.task1 = FutureTask.objects.create( - task_id="task", eta=yesterday, type=settings.FUTURE_TASK_TYPE_INTERRUPTION + @time_machine.travel("2024-01-01 00:00 +0000", tick=True) + def test_process_future_tasks_eta_ordering(self): + _now = timezone.now() + task_late = FutureTask.objects.create( + task_id="task_late", + eta=_now, + type=settings.FUTURE_TASK_TYPE_ETA_ORDERING, + ) + task_early = FutureTask.objects.create( + task_id="task_early", + eta=_now - timedelta(microseconds=1), + type=settings.FUTURE_TASK_TYPE_ETA_ORDERING, + ) + self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN) + self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN) + _wait_for_task_status(task_late, FutureTask.FUTURE_TASK_STATUS_DONE) + _wait_for_task_status(task_early, FutureTask.FUTURE_TASK_STATUS_DONE) + self.assertGreater(task_late.result, task_early.result) + + +class TestProcessFutureTasksInterruption(ProcessTasksCommandMixin, TransactionTestCase): + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_future_task_process_interruption(self): + task = FutureTask.objects.create( + task_id="task", + eta=timezone.now(), + type=settings.FUTURE_TASK_TYPE_INTERRUPTION, ) - - def test_future_task_process_task(self): - task = FutureTask.objects.get(pk=self.task1.pk) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - - # Make sure that task is in progression. - time.sleep(SLEEP_TIME) - + _wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS) pid = os.getpid() os.kill(pid, signal.SIGINT) - task.refresh_from_db() self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_INTERRUPTED) class TestFutureTasksOnetimeRun(TestCase): - def setUp(self): - super().setUp() - - today = timezone.now() - yesterday = today - timedelta(days=1) - tomorrow = today + timedelta(days=1) + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_onetimerun_no_task(self): + call_command("process_future_tasks", onetimerun=True) - self.task1 = FutureTask.objects.create( - task_id="onetimerun_task1", - eta=yesterday, + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_onetimerun_eta_now(self): + start_time = default_timer() + task = FutureTask.objects.create( + task_id="task", + eta=timezone.now(), type=settings.FUTURE_TASK_TYPE_ONE, ) - - self.task2 = FutureTask.objects.create( - task_id="onetimerun_task2", eta=tomorrow, type=settings.FUTURE_TASK_TYPE_TWO - ) - - self.task_error = FutureTask.objects.create( - task_id="onetimerun_task_error", - eta=yesterday, - type=settings.FUTURE_TASK_TYPE_ERROR, - ) - - def test_future_task_process_task_onetimerun(self): - task = FutureTask.objects.get(pk=self.task1.pk) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - call_command("process_future_tasks", onetimerun=True) - + end_time = default_timer() task.refresh_from_db() self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE) - self.assertGreater(task.execution_time, 0.5) - self.assertLess(task.execution_time, 1) - - def test_future_task_no_task_to_process_onetimerun(self): - task = FutureTask.objects.get(pk=self.task2.pk) + self.assertIsNotNone(task.execution_time) + self.assertGreater(task.execution_time, 0.0) + self.assertLess(task.execution_time, end_time - start_time) + + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_onetimerun_eta_future(self): + _now = timezone.now() + task = FutureTask.objects.create( + task_id="task", + eta=_now + timedelta(microseconds=1), + type=settings.FUTURE_TASK_TYPE_TWO, + ) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - call_command("process_future_tasks", onetimerun=True) - task.refresh_from_db() self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - def test_future_task_process_error_onetimerun(self): - task = FutureTask.objects.get(pk=self.task_error.pk) + @time_machine.travel("2024-01-01 00:00 +0000", tick=False) + def test_process_future_tasks_onetimerun_error(self): + _now = timezone.now() + task = FutureTask.objects.create( + task_id="task", + eta=_now, + type=settings.FUTURE_TASK_TYPE_ERROR, + ) self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN) - call_command("process_future_tasks", onetimerun=True) - task.refresh_from_db() self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR) self.assertEqual(task.result["args"], ["task error"]) + + @time_machine.travel("2024-01-01 00:00 +0000", tick=True) + def test_process_future_tasks_onetimerun_eta_ordering(self): + _now = timezone.now() + task_late = FutureTask.objects.create( + task_id="task_late", + eta=_now, + type=settings.FUTURE_TASK_TYPE_ETA_ORDERING, + ) + task_early = FutureTask.objects.create( + task_id="task_early", + eta=_now - timedelta(microseconds=1), + type=settings.FUTURE_TASK_TYPE_ETA_ORDERING, + ) + self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN) + self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN) + call_command("process_future_tasks", onetimerun=True) + task_late.refresh_from_db() + task_early.refresh_from_db() + self.assertGreater(task_late.result, task_early.result)