Skip to content

Commit

Permalink
[celery#305]: Improving abstract models implementation.
Browse files Browse the repository at this point in the history
Added a `helpers` module into `models` containing the functions
`taskresult_model()` and `groupresult_model()`.

  * `taskresult_model()`: will try to find the custom model using a
    dotted path defined under the constant
`CELERY_RESULTS_TASKRESULT_MODEL` in the settings of the user's project
  * `groupresult_model()` will try to do the same using under the
    constant `CELERY_RESULTS_GROUPRESULT_MODEL`.

By default if these attributes are not found `django-celery-results`
will use the default models (`models.TaskResult` and
`models.GroupResult`).

Updated database backend in order to use custom models for `TaskResult
and `GroupResult` it they're present.

Instead to import explicitely the `TaskResult` and the `GroupResult`
(default models from `django-celery-results`) we make use of the model
helpers to load the right classes, the custom ones if they're present
otherwise we use the default ones.

Getting data from `task_kwargs` to extend the `task_properties` and be
able to store them into the database (using the custom models).

First of all we need a way to get data from `task_kwargs` (or somewhere
else) just before a `task_result` record is created, evaluate that data
and find the right values that will be used to fill the new fields
defined in the custom model.

So for this purpose we added a settings module to
`django-celery-results` which will hold default settings, the first
setting that will contain is a function in charge to get a callback from
the settings of the user project. This callback will be feeded by the
task `task_kwargs`, which will be intercepted in
`DatabaseBackend._get_extended_properties` just before the `task_kwargs`
are encoded by `encode_content()` method and send it to the
`store_result` method from the object manager of `TaskModel`
(Custom/Default one).

To end, we must to extend the arguments of the `store_result` method
from the `TaskResult` Manager adding `**extra_fields` that will make us
able to send extra data to the custom model, when it's defined.

---
Resolves celery#305

Fixes celery#314
  • Loading branch information
diegocastrum committed Aug 10, 2022
1 parent aba39e3 commit 16be793
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ cover/
.cache/
htmlcov/
coverage.xml
.vscode
5 changes: 4 additions & 1 deletion django_celery_results/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
ALLOW_EDITS = False
pass

from .models import GroupResult, TaskResult
from .models.helpers import taskresult_model, groupresult_model

GroupResult = groupresult_model()
TaskResult = taskresult_model()


class TaskResultAdmin(admin.ModelAdmin):
Expand Down
12 changes: 8 additions & 4 deletions django_celery_results/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from kombu.exceptions import DecodeError

from ..models import ChordCounter
from ..models import GroupResult as GroupResultModel
from ..models import TaskResult
from ..models.helpers import taskresult_model, groupresult_model
from ..settings import extend_task_props_callback

EXCEPTIONS_TO_CATCH = (InterfaceError,)

Expand All @@ -29,8 +29,8 @@
class DatabaseBackend(BaseDictBackend):
"""The Django database backend, using models to store task state."""

TaskModel = TaskResult
GroupModel = GroupResultModel
TaskModel = taskresult_model()
GroupModel = groupresult_model()
subpolling_interval = 0.5

def exception_safe_to_retry(self, exc):
Expand Down Expand Up @@ -79,6 +79,10 @@ def _get_extended_properties(self, request, traceback):
# task protocol 1
task_kwargs = getattr(request, 'kwargs', None)

# TODO: We assuming that task protocol 1 could be always in use. :/
extended_props.update(
extend_task_props_callback(getattr(request, 'kwargs', None)))

# Encode input arguments
if task_args is not None:
_, _, task_args = self.encode_content(task_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.conf import settings
from django.db import connections, models, router, transaction

from ..utils import now, raw_delete
from .utils import now, raw_delete

W_ISOLATION_REP = """
Polling results with transaction isolation level 'repeatable-read'
Expand Down Expand Up @@ -121,7 +121,7 @@ def store_result(self, content_type, content_encoding,
traceback=None, meta=None,
periodic_task_name=None,
task_name=None, task_args=None, task_kwargs=None,
worker=None, using=None):
worker=None, using=None, **extra_fields):
"""Store the result and status of a task.
Arguments:
Expand All @@ -143,6 +143,7 @@ def store_result(self, content_type, content_encoding,
exception (only passed if the task failed).
meta (str): Serialized result meta data (this contains e.g.
children).
**extra_fields (dict): Extra (model)fields to store.
Keyword Arguments:
-----------------
Expand All @@ -163,7 +164,8 @@ def store_result(self, content_type, content_encoding,
'task_name': task_name,
'task_args': task_args,
'task_kwargs': task_kwargs,
'worker': worker
'worker': worker,
**extra_fields
}
obj, created = self.using(using).get_or_create(task_id=task_id,
defaults=fields)
Expand Down
2 changes: 1 addition & 1 deletion django_celery_results/models/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django.db import models
from django.utils.translation import gettext_lazy as _

from . import managers
from .. import managers

ALL_STATES = sorted(states.ALL_STATES)
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))
Expand Down
6 changes: 6 additions & 0 deletions django_celery_results/models/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Meta(AbstractTaskResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"


class ChordCounter(models.Model):
Expand Down Expand Up @@ -48,6 +49,10 @@ class ChordCounter(models.Model):
)
)

class Meta:
app_label = "django_celery_results"


def group_result(self, app=None):
"""Return the GroupResult of self.
Expand All @@ -71,3 +76,4 @@ class Meta(AbstractGroupResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"
47 changes: 47 additions & 0 deletions django_celery_results/models/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

from .generic import TaskResult, GroupResult

def taskresult_model():
"""Return the TaskResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_TASKRESULT_MODEL'):
return TaskResult

try:
return apps.get_model(
settings.CELERY_RESULTS_TASKRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_TASKRESULT_MODEL}' that has not "
"been installed"
)

def groupresult_model():
"""Return the GroupResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):
return GroupResult

try:
return apps.get_model(
settings.CELERY_RESULTS_GROUPRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_GROUPRESULT_MODEL}' that has not "
"been installed"
)
16 changes: 16 additions & 0 deletions django_celery_results/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from django.conf import settings


def get_callback_function(settings_name, default=None):
"""Return the callback function for the given settings name."""

callback = getattr(settings, settings_name, None)
if callback is None:
return default

if callable(callback):
return callback

extend_task_props_callback = get_callback_function(
"CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK"
)

0 comments on commit 16be793

Please sign in to comment.