Skip to content

Commit

Permalink
Process future tasks by eta ordering + use time machine in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
christophbuermann committed Nov 5, 2024
1 parent bb51d2d commit b39f8bf
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def tasks_for_processing():
return FutureTask.objects.filter(
eta__lte=timezone.now(),
status=FutureTask.FUTURE_TASK_STATUS_OPEN,
)
).order_by("eta")

@staticmethod
def _convert_exception_args(args):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ django-cronfield>=0.2.0,<0.3

# Linters and formatters
pre-commit>=4.0.1,<4.1
time-machine>=2.15.0,<2.17.0
2 changes: 2 additions & 0 deletions tests/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@
FUTURE_TASK_TYPE_TWO = "task_two"
FUTURE_TASK_TYPE_ERROR = "task_error"
FUTURE_TASK_TYPE_INTERRUPTION = "task_interruption"
FUTURE_TASK_TYPE_ETA_ORDERING = "task_eta_ordering"

FUTURE_TASK_TYPES = (
(FUTURE_TASK_TYPE_ONE, "Task 1"),
(FUTURE_TASK_TYPE_TWO, "Task 2"),
(FUTURE_TASK_TYPE_ERROR, "Task Error"),
(FUTURE_TASK_TYPE_INTERRUPTION, "Task Interruption"),
(FUTURE_TASK_TYPE_ETA_ORDERING, "Task ETA Ordering"),
)

STATIC_URL = "/static/"
Expand Down
9 changes: 8 additions & 1 deletion tests/testapp/handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from sys import intern
from time import monotonic_ns

from django.dispatch import receiver

Expand All @@ -9,7 +10,7 @@

@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ONE))
def my_task_function1(sender, instance, **kwargs):
time.sleep(0.5)
pass


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_TWO))
Expand All @@ -25,3 +26,9 @@ def my_task_function_error(sender, instance, **kwargs):
@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_INTERRUPTION))
def my_task_function_interruption(sender, instance, **kwargs):
time.sleep(10)


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ETA_ORDERING))
def my_task_function_eta_ordering(sender, instance, **kwargs):
instance.result = monotonic_ns()
instance.save()
216 changes: 118 additions & 98 deletions tests/testapp/tests/test_future_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import signal
import time
from datetime import timedelta
from timeit import default_timer

import time_machine
from django.core.management import call_command
from django.test import TestCase, TransactionTestCase
from django.utils import timezone
Expand All @@ -11,148 +13,166 @@
from tests.core import settings
from tests.testapp.mixins import ProcessTasksCommandMixin

SLEEP_TIME = 2.2

class WaitForTaskStatusTimeout(Exception):
pass

class TestFutureTasks(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

today = timezone.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)

self.task1 = FutureTask.objects.create(
task_id="task1",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ONE,
)
def _wait_for_task_status(task, status, tick_seconds=0.1, timeout_seconds=3):
start_time = default_timer()
while task.status != status:
if default_timer() - start_time >= timeout_seconds:
raise WaitForTaskStatusTimeout(
f"Timeout while waiting for task status. Actual: '{task.status}' Expected: '{status}'",
)
task.refresh_from_db()
time.sleep(tick_seconds)

self.task2 = FutureTask.objects.create(
task_id="task2",
eta=tomorrow,
type=settings.FUTURE_TASK_TYPE_TWO,
)

self.task_error = FutureTask.objects.create(
task_id="task_error",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ERROR,
class TestProcessFutureTasks(ProcessTasksCommandMixin, TransactionTestCase):
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_eta_now(self):
start_time = default_timer()
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_ONE,
)

def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE)
end_time = default_timer()
self.assertIsNotNone(task.execution_time)
self.assertGreater(task.execution_time, 0.5)
self.assertLess(task.execution_time, 1)
self.assertGreater(task.execution_time, 0.0)
self.assertLess(task.execution_time, end_time - start_time)

def test_future_task_no_task_to_process(self):
task = FutureTask.objects.get(pk=self.task2.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_eta_future(self):
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now() + timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_TWO,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

try:
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE)
except WaitForTaskStatusTimeout:
pass
task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_error(self):
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_ERROR,
)
print(FutureTask.objects.all())
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])

@time_machine.travel("2024-01-01 00:00 +0000", tick=True)
def test_process_future_tasks_eta_ordering(self):
_now = timezone.now()
task_late = FutureTask.objects.create(
task_id="task_late",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
task_early = FutureTask.objects.create(
task_id="task_early",
eta=_now - timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
_wait_for_task_status(task_late, FutureTask.FUTURE_TASK_STATUS_DONE)
_wait_for_task_status(task_early, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertGreater(task_late.result, task_early.result)

class TestFutureTaskInterruption(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

yesterday = timezone.now() - timedelta(days=1)

self.task1 = FutureTask.objects.create(
class TestProcessFutureTasksInterruption(ProcessTasksCommandMixin, TransactionTestCase):
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_future_task_process_interruption(self):
task = FutureTask.objects.create(
task_id="task",
eta=yesterday,
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_INTERRUPTION,
)

def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task is in progression.
time.sleep(SLEEP_TIME)

_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS)
pid = os.getpid()
os.kill(pid, signal.SIGINT)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_INTERRUPTED)


class TestFutureTasksOnetimeRun(TestCase):
def setUp(self):
super().setUp()

today = timezone.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_no_task(self):
call_command("process_future_tasks", onetimerun=True)

self.task1 = FutureTask.objects.create(
task_id="onetimerun_task1",
eta=yesterday,
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_eta_now(self):
start_time = default_timer()
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_ONE,
)

self.task2 = FutureTask.objects.create(
task_id="onetimerun_task2",
eta=tomorrow,
type=settings.FUTURE_TASK_TYPE_TWO,
)

self.task_error = FutureTask.objects.create(
task_id="onetimerun_task_error",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ERROR,
)

def test_future_task_process_task_onetimerun(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

end_time = default_timer()
task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertGreater(task.execution_time, 0.5)
self.assertLess(task.execution_time, 1)
self.assertIsNotNone(task.execution_time)
self.assertGreater(task.execution_time, 0.0)
self.assertLess(task.execution_time, end_time - start_time)

def test_future_task_no_task_to_process_onetimerun(self):
task = FutureTask.objects.get(pk=self.task2.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_eta_future(self):
_now = timezone.now()
task = FutureTask.objects.create(
task_id="task",
eta=_now + timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_TWO,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error_onetimerun(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_error(self):
_now = timezone.now()
task = FutureTask.objects.create(
task_id="task",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ERROR,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])

@time_machine.travel("2024-01-01 00:00 +0000", tick=True)
def test_process_future_tasks_onetimerun_eta_ordering(self):
_now = timezone.now()
task_late = FutureTask.objects.create(
task_id="task_late",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
task_early = FutureTask.objects.create(
task_id="task_early",
eta=_now - timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
call_command("process_future_tasks", onetimerun=True)
task_late.refresh_from_db()
task_early.refresh_from_db()
self.assertGreater(task_late.result, task_early.result)

0 comments on commit b39f8bf

Please sign in to comment.