-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix atomic transaction not routing to the the correct DB in DatabaseBackend.on_chord_part_return transaction.atomic #427
Fix atomic transaction not routing to the the correct DB in DatabaseBackend.on_chord_part_return transaction.atomic #427
Conversation
… transaction.atomic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I did not consider multi-db setups when I wrote this.
I think this does warrant a testcase or two, specifically the case where the read and write db are not the same.
Maybe you could pull some inspiration from https://github.com/celery/django-celery-results/blob/main/t/unit/test_models.py
@@ -246,7 +246,7 @@ def on_chord_part_return(self, request, state, result, **kwargs): | |||
if not gid or not tid: | |||
return | |||
call_callback = False | |||
with transaction.atomic(): | |||
with transaction.atomic(using=ChordCounter.objects.db): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BaseManager.objects.db
returns the django.db.router.db_for_read()
which in your situation works since it returns the same db alias as the db_for_write()
.
Consider using django.db.router.db_for_write(ChordCounter)
instead as we will be updating the ChordCounter.count
value within the transaction.
related: #422 |
Hi @AllexVeldman , |
def test_on_chord_part_return_multiple_databases(self): | ||
""" | ||
Test if the ChordCounter is properly decremented and the callback is | ||
triggered after all chord parts have returned with multiple databases | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there is no DATABASE_ROUTERS defined, this test does not show the correct db is selected.
Actually I think due to the way Djangos DATABASES.TEST.MIRROR option works this test will never fail for the reasons it is created.
To make this test find anything, you might need to add a 3rd database in t.proj.settings
, which does not mirror "default". Then configure a db router for this test to route this app to the new db.
Then you can assert in this test the ChordCounter exists when using the new db and does not exist when using "default" (or "secondary", should not matter).
t/unit/backends/test_database.py
Outdated
class ChordPartReturnTestCase(TransactionTestCase): | ||
databases = "__all__" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.app.conf.result_serializer = 'json' | ||
self.app.conf.result_backend = ( | ||
'django_celery_results.backends:DatabaseBackend') | ||
self.app.conf.result_extended = True | ||
self.b = DatabaseBackend(app=self.app) | ||
|
||
def test_on_chord_part_return_multiple_databases(self): | ||
""" | ||
Test if the ChordCounter is properly decremented and the callback is | ||
triggered after all chord parts have returned with multiple databases | ||
""" | ||
gid = uuid() | ||
tid1 = uuid() | ||
tid2 = uuid() | ||
subtasks = [AsyncResult(tid1), AsyncResult(tid2)] | ||
group = GroupResult(id=gid, results=subtasks) | ||
self.b.apply_chord(group, self.add.s()) | ||
|
||
chord_counter = ChordCounter.objects.using( | ||
"secondary" | ||
).get(group_id=gid) | ||
assert chord_counter.count == 2 | ||
|
||
request = mock.MagicMock() | ||
request.id = subtasks[0].id | ||
request.group = gid | ||
request.task = "my_task" | ||
request.args = ["a", 1, "password"] | ||
request.kwargs = {"c": 3, "d": "e", "password": "password"} | ||
request.argsrepr = "argsrepr" | ||
request.kwargsrepr = "kwargsrepr" | ||
request.hostname = "celery@ip-0-0-0-0" | ||
request.properties = {"periodic_task_name": "my_periodic_task"} | ||
request.ignore_result = False | ||
result = {"foo": "baz"} | ||
|
||
self.b.mark_as_done(tid1, result, request=request) | ||
|
||
chord_counter.refresh_from_db() | ||
assert chord_counter.count == 1 | ||
|
||
self.b.mark_as_done(tid2, result, request=request) | ||
|
||
with pytest.raises(ChordCounter.DoesNotExist): | ||
ChordCounter.objects.using("secondary").get(group_id=gid) | ||
|
||
request.chord.delay.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In t.proj.settings.py
, add this to the DATABASES:
'read-only': {
'ENGINE': 'django.db.backends.postgresql',
'HOST': 'localhost',
'NAME': 'other',
'USER': 'postgres',
'PASSWORD': 'devpass',
'OPTIONS': {
'connect_timeout': 1000,
'options': '-c default_transaction_read_only=on',
},
'TEST': {
'MIRROR': 'default',
},
}
This will create a read-only "mirror" connection to the default database.
The you can add the router to route read/writes accordingly:
class ChordPartReturnTestCase(TransactionTestCase): | |
databases = "__all__" | |
def setUp(self): | |
super().setUp() | |
self.app.conf.result_serializer = 'json' | |
self.app.conf.result_backend = ( | |
'django_celery_results.backends:DatabaseBackend') | |
self.app.conf.result_extended = True | |
self.b = DatabaseBackend(app=self.app) | |
def test_on_chord_part_return_multiple_databases(self): | |
""" | |
Test if the ChordCounter is properly decremented and the callback is | |
triggered after all chord parts have returned with multiple databases | |
""" | |
gid = uuid() | |
tid1 = uuid() | |
tid2 = uuid() | |
subtasks = [AsyncResult(tid1), AsyncResult(tid2)] | |
group = GroupResult(id=gid, results=subtasks) | |
self.b.apply_chord(group, self.add.s()) | |
chord_counter = ChordCounter.objects.using( | |
"secondary" | |
).get(group_id=gid) | |
assert chord_counter.count == 2 | |
request = mock.MagicMock() | |
request.id = subtasks[0].id | |
request.group = gid | |
request.task = "my_task" | |
request.args = ["a", 1, "password"] | |
request.kwargs = {"c": 3, "d": "e", "password": "password"} | |
request.argsrepr = "argsrepr" | |
request.kwargsrepr = "kwargsrepr" | |
request.hostname = "celery@ip-0-0-0-0" | |
request.properties = {"periodic_task_name": "my_periodic_task"} | |
request.ignore_result = False | |
result = {"foo": "baz"} | |
self.b.mark_as_done(tid1, result, request=request) | |
chord_counter.refresh_from_db() | |
assert chord_counter.count == 1 | |
self.b.mark_as_done(tid2, result, request=request) | |
with pytest.raises(ChordCounter.DoesNotExist): | |
ChordCounter.objects.using("secondary").get(group_id=gid) | |
request.chord.delay.assert_called_once() | |
class DjangoCeleryResultRouter: | |
route_app_labels = {"django_celery_results"} | |
def db_for_read(self, model, **hints): | |
"""Route read access to the read-only database""" | |
if model._meta.app_label in self.route_app_labels: | |
return "read-only" | |
return None | |
def db_for_write(self, model, **hints): | |
"""Route write access to the default database""" | |
if model._meta.app_label in self.route_app_labels: | |
return "default" | |
return None | |
class ChordPartReturnTestCase(TransactionTestCase): | |
databases = {"default", "read-only"} | |
def setUp(self): | |
super().setUp() | |
self.app.conf.result_serializer = 'json' | |
self.app.conf.result_backend = ( | |
'django_celery_results.backends:DatabaseBackend') | |
self.app.conf.result_extended = True | |
self.b = DatabaseBackend(app=self.app) | |
def test_on_chord_part_return_multiple_databases(self): | |
""" | |
Test if the ChordCounter is properly decremented and the callback is | |
triggered after all chord parts have returned with multiple databases | |
""" | |
with self.settings(DATABASE_ROUTERS=[DjangoCeleryResultRouter()]): | |
gid = uuid() | |
tid1 = uuid() | |
tid2 = uuid() | |
subtasks = [AsyncResult(tid1), AsyncResult(tid2)] | |
group = GroupResult(id=gid, results=subtasks) | |
assert ChordCounter.objects.count() == 0 | |
assert ChordCounter.objects.using("read-only").count() == 0 | |
assert ChordCounter.objects.using("default").count() == 0 | |
self.b.apply_chord(group, self.add.s()) | |
# Check if the ChordCounter was created in the correct database | |
assert ChordCounter.objects.count() == 1 | |
assert ChordCounter.objects.using("read-only").count() == 1 | |
assert ChordCounter.objects.using("default").count() == 1 | |
chord_counter = ChordCounter.objects.get(group_id=gid) | |
assert chord_counter.count == 2 | |
request = mock.MagicMock() | |
request.id = subtasks[0].id | |
request.group = gid | |
request.task = "my_task" | |
request.args = ["a", 1, "password"] | |
request.kwargs = {"c": 3, "d": "e", "password": "password"} | |
request.argsrepr = "argsrepr" | |
request.kwargsrepr = "kwargsrepr" | |
request.hostname = "celery@ip-0-0-0-0" | |
request.properties = {"periodic_task_name": "my_periodic_task"} | |
request.ignore_result = False | |
result = {"foo": "baz"} | |
self.b.mark_as_done(tid1, result, request=request) | |
chord_counter.refresh_from_db() | |
assert chord_counter.count == 1 | |
self.b.mark_as_done(tid2, result, request=request) | |
with pytest.raises(ChordCounter.DoesNotExist): | |
ChordCounter.objects.get(group_id=gid) | |
request.chord.delay.assert_called_once() |
These changes will have the test fail in the current state of this branch with:
django.db.transaction.TransactionManagementError: select_for_update cannot be used outside of a transaction
Which is what this PR is trying to solve.
Change to using=router.db_for_write()
in database.py and the test will pass.
Note that this test will only fail on Postgres, but since select_for_update does not work for sqlite and CI runs with postgres this should not be an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @AllexVeldman
Hi @AllexVeldman , |
thanks both of you |
Hi @auvipy, |
Yes |
Hi,
when using chord with Database result backend and multiple databases with a Database Router we encounter this error:
select_for_update cannot be used outside of a transaction.
This depends on the fact that in DatabaseBackend.on_chord_part_return a transaction.atomic is created on the default database and not on the one associated with ChordCounter.
We write this simple fix for handle also this case.
Setting we configured:
Router class: