Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix atomic transaction not routing to the the correct DB in DatabaseBackend.on_chord_part_return transaction.atomic #427

Merged
merged 6 commits into from
May 4, 2024

Conversation

gianbot
Copy link
Contributor

@gianbot gianbot commented Apr 5, 2024

Hi,
when using chord with Database result backend and multiple databases with a Database Router we encounter this error:
select_for_update cannot be used outside of a transaction.
This depends on the fact that in DatabaseBackend.on_chord_part_return a transaction.atomic is created on the default database and not on the one associated with ChordCounter.

We write this simple fix for handle also this case.

Setting we configured:

DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "data",
    },
    "celery_result": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "results",
    },
}
DATABASE_ROUTERS = [
    "app.db_routers.DjangoCeleryResultRouter",
]

Router class:

class DjangoCeleryResultRouter:
    route_app_labels = {"django_celery_results"}

    def db_for_read(self, model, **hints):
        if model._meta.app_label in self.route_app_labels:
            return "celery_result"
        return None

    def db_for_write(self, model, **hints):
        if model._meta.app_label in self.route_app_labels:
            return "celery_result"
        return None

    def allow_relation(self, obj1, obj2, **hints):
        if obj1._meta.app_label in self.route_app_labels or obj2._meta.app_label in self.route_app_labels:
            return True
        return None

    def allow_migrate(self, db, app_label, model_name=None, **hints):
        if app_label in self.route_app_labels:
            return db == "celery_result"
        return None

@gianbot gianbot marked this pull request as ready for review April 9, 2024 10:38
Copy link
Contributor

@AllexVeldman AllexVeldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I did not consider multi-db setups when I wrote this.

I think this does warrant a testcase or two, specifically the case where the read and write db are not the same.
Maybe you could pull some inspiration from https://github.com/celery/django-celery-results/blob/main/t/unit/test_models.py

@@ -246,7 +246,7 @@ def on_chord_part_return(self, request, state, result, **kwargs):
if not gid or not tid:
return
call_callback = False
with transaction.atomic():
with transaction.atomic(using=ChordCounter.objects.db):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseManager.objects.db returns the django.db.router.db_for_read() which in your situation works since it returns the same db alias as the db_for_write().

https://github.com/django/django/blob/2be37b253341cfd1f1363c533e6f896230f047a7/django/db/models/manager.py#L143

Consider using django.db.router.db_for_write(ChordCounter) instead as we will be updating the ChordCounter.count value within the transaction.

@AllexVeldman
Copy link
Contributor

related: #422

@gianbot
Copy link
Contributor Author

gianbot commented Apr 18, 2024

Hi @AllexVeldman ,
we are working on the tests and will update you as soon as possible.

Comment on lines +936 to +940
def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no DATABASE_ROUTERS defined, this test does not show the correct db is selected.
Actually I think due to the way Djangos DATABASES.TEST.MIRROR option works this test will never fail for the reasons it is created.

To make this test find anything, you might need to add a 3rd database in t.proj.settings, which does not mirror "default". Then configure a db router for this test to route this app to the new db.
Then you can assert in this test the ChordCounter exists when using the new db and does not exist when using "default" (or "secondary", should not matter).

Comment on lines 925 to 976
class ChordPartReturnTestCase(TransactionTestCase):
databases = "__all__"

def setUp(self):
super().setUp()
self.app.conf.result_serializer = 'json'
self.app.conf.result_backend = (
'django_celery_results.backends:DatabaseBackend')
self.app.conf.result_extended = True
self.b = DatabaseBackend(app=self.app)

def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
gid = uuid()
tid1 = uuid()
tid2 = uuid()
subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
group = GroupResult(id=gid, results=subtasks)
self.b.apply_chord(group, self.add.s())

chord_counter = ChordCounter.objects.using(
"secondary"
).get(group_id=gid)
assert chord_counter.count == 2

request = mock.MagicMock()
request.id = subtasks[0].id
request.group = gid
request.task = "my_task"
request.args = ["a", 1, "password"]
request.kwargs = {"c": 3, "d": "e", "password": "password"}
request.argsrepr = "argsrepr"
request.kwargsrepr = "kwargsrepr"
request.hostname = "celery@ip-0-0-0-0"
request.properties = {"periodic_task_name": "my_periodic_task"}
request.ignore_result = False
result = {"foo": "baz"}

self.b.mark_as_done(tid1, result, request=request)

chord_counter.refresh_from_db()
assert chord_counter.count == 1

self.b.mark_as_done(tid2, result, request=request)

with pytest.raises(ChordCounter.DoesNotExist):
ChordCounter.objects.using("secondary").get(group_id=gid)

request.chord.delay.assert_called_once()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In t.proj.settings.py, add this to the DATABASES:

        'read-only': {
            'ENGINE': 'django.db.backends.postgresql',
            'HOST': 'localhost',
            'NAME': 'other',
            'USER': 'postgres',
            'PASSWORD': 'devpass',
            'OPTIONS': {
                'connect_timeout': 1000,
                'options': '-c default_transaction_read_only=on',
            },
            'TEST': {
                'MIRROR': 'default',
            },
        }

This will create a read-only "mirror" connection to the default database.

The you can add the router to route read/writes accordingly:

Suggested change
class ChordPartReturnTestCase(TransactionTestCase):
databases = "__all__"
def setUp(self):
super().setUp()
self.app.conf.result_serializer = 'json'
self.app.conf.result_backend = (
'django_celery_results.backends:DatabaseBackend')
self.app.conf.result_extended = True
self.b = DatabaseBackend(app=self.app)
def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
gid = uuid()
tid1 = uuid()
tid2 = uuid()
subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
group = GroupResult(id=gid, results=subtasks)
self.b.apply_chord(group, self.add.s())
chord_counter = ChordCounter.objects.using(
"secondary"
).get(group_id=gid)
assert chord_counter.count == 2
request = mock.MagicMock()
request.id = subtasks[0].id
request.group = gid
request.task = "my_task"
request.args = ["a", 1, "password"]
request.kwargs = {"c": 3, "d": "e", "password": "password"}
request.argsrepr = "argsrepr"
request.kwargsrepr = "kwargsrepr"
request.hostname = "celery@ip-0-0-0-0"
request.properties = {"periodic_task_name": "my_periodic_task"}
request.ignore_result = False
result = {"foo": "baz"}
self.b.mark_as_done(tid1, result, request=request)
chord_counter.refresh_from_db()
assert chord_counter.count == 1
self.b.mark_as_done(tid2, result, request=request)
with pytest.raises(ChordCounter.DoesNotExist):
ChordCounter.objects.using("secondary").get(group_id=gid)
request.chord.delay.assert_called_once()
class DjangoCeleryResultRouter:
route_app_labels = {"django_celery_results"}
def db_for_read(self, model, **hints):
"""Route read access to the read-only database"""
if model._meta.app_label in self.route_app_labels:
return "read-only"
return None
def db_for_write(self, model, **hints):
"""Route write access to the default database"""
if model._meta.app_label in self.route_app_labels:
return "default"
return None
class ChordPartReturnTestCase(TransactionTestCase):
databases = {"default", "read-only"}
def setUp(self):
super().setUp()
self.app.conf.result_serializer = 'json'
self.app.conf.result_backend = (
'django_celery_results.backends:DatabaseBackend')
self.app.conf.result_extended = True
self.b = DatabaseBackend(app=self.app)
def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
with self.settings(DATABASE_ROUTERS=[DjangoCeleryResultRouter()]):
gid = uuid()
tid1 = uuid()
tid2 = uuid()
subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
group = GroupResult(id=gid, results=subtasks)
assert ChordCounter.objects.count() == 0
assert ChordCounter.objects.using("read-only").count() == 0
assert ChordCounter.objects.using("default").count() == 0
self.b.apply_chord(group, self.add.s())
# Check if the ChordCounter was created in the correct database
assert ChordCounter.objects.count() == 1
assert ChordCounter.objects.using("read-only").count() == 1
assert ChordCounter.objects.using("default").count() == 1
chord_counter = ChordCounter.objects.get(group_id=gid)
assert chord_counter.count == 2
request = mock.MagicMock()
request.id = subtasks[0].id
request.group = gid
request.task = "my_task"
request.args = ["a", 1, "password"]
request.kwargs = {"c": 3, "d": "e", "password": "password"}
request.argsrepr = "argsrepr"
request.kwargsrepr = "kwargsrepr"
request.hostname = "celery@ip-0-0-0-0"
request.properties = {"periodic_task_name": "my_periodic_task"}
request.ignore_result = False
result = {"foo": "baz"}
self.b.mark_as_done(tid1, result, request=request)
chord_counter.refresh_from_db()
assert chord_counter.count == 1
self.b.mark_as_done(tid2, result, request=request)
with pytest.raises(ChordCounter.DoesNotExist):
ChordCounter.objects.get(group_id=gid)
request.chord.delay.assert_called_once()

These changes will have the test fail in the current state of this branch with:

django.db.transaction.TransactionManagementError: select_for_update cannot be used outside of a transaction

Which is what this PR is trying to solve.

Change to using=router.db_for_write() in database.py and the test will pass.

Note that this test will only fail on Postgres, but since select_for_update does not work for sqlite and CI runs with postgres this should not be an issue.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @AllexVeldman

@gianbot
Copy link
Contributor Author

gianbot commented May 3, 2024

Hi @AllexVeldman ,
thanks for the feedback. We have updated the tests following the suggestions.
Can you check if that's ok?

@auvipy auvipy merged commit d72cad3 into celery:main May 4, 2024
21 checks passed
@auvipy
Copy link
Member

auvipy commented May 4, 2024

thanks both of you

@gianbot
Copy link
Contributor Author

gianbot commented Jun 21, 2024

Hi @auvipy,
when do you plan to release the next release on PyPI that also contains this change?

@auvipy
Copy link
Member

auvipy commented Jun 22, 2024

Yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants