From a945b21709f114c5e994efd06b09467fb83b2914 Mon Sep 17 00:00:00 2001 From: Emil Stolarsky Date: Thu, 13 Apr 2023 19:58:13 -0400 Subject: [PATCH] Add support for deleting queues after flushing them In an environment where we're trying to reset a queue by flushing it (e.g. tests), it can be desirable to also have the ability for the queue to be deleted. For example, if you'd like to create a new queue with different parameters, but the same name. --- dramatiq/brokers/rabbitmq.py | 14 +++++++++++--- tests/conftest.py | 2 +- tests/test_rabbitmq.py | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index 20f98eec..ce67f174 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -382,21 +382,29 @@ def get_queue_message_counts(self, queue_name): xq_queue_response.method.message_count, ) - def flush(self, queue_name): + def flush(self, queue_name: str, delete_queue: bool = False): """Drop all the messages from a queue. Parameters: queue_name(str): The queue to flush. + delete_queue(bool): If true, the queue will be deleted after it is purged. """ for name in (queue_name, dq_name(queue_name), xq_name(queue_name)): if queue_name not in self.queues_pending: + self.logger.info("Purging queue %s", queue_name) self.channel.queue_purge(name) - def flush_all(self): + if delete_queue: + self.channel.queue_delete(name) + + def flush_all(self, delete_queue: bool = False): """Drop all messages from all declared queues. + + Parameters: + delete_queue(bool): If true, the queues will be deleted after they're flushed. """ for queue_name in self.queues: - self.flush(queue_name) + self.flush(queue_name, delete_queue) def join(self, queue_name, min_successes=10, idle_time=100, *, timeout=None): """Wait for all the messages on the given queue to be diff --git a/tests/conftest.py b/tests/conftest.py index f0853f0a..f12ac67d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -70,7 +70,7 @@ def rabbitmq_broker(): broker.emit_after("process_boot") dramatiq.set_broker(broker) yield broker - broker.flush_all() + broker.flush_all(delete_queue=True) broker.close() diff --git a/tests/test_rabbitmq.py b/tests/test_rabbitmq.py index 7eff8ce0..e39b8dd0 100644 --- a/tests/test_rabbitmq.py +++ b/tests/test_rabbitmq.py @@ -503,3 +503,35 @@ def put(): assert len(rabbitmq_broker.queues) == 1 assert put.queue_name in rabbitmq_broker.queues + + +def test_rabbitmq_flush_true_deletes_the_queue(): + queue_name = f"flush_all_test_queue_{current_millis()}" + url = "amqp://%s:%s@127.0.0.1:5672" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD) + broker = RabbitmqBroker(url=url) + broker.declare_queue(queue_name, ensure=True) + + broker.flush(queue_name, delete_queue=False) + assert broker.channel.queue_declare(queue=queue_name, passive=True) + + broker.flush(queue_name, delete_queue=True) + with pytest.raises(pika.exceptions.ChannelClosedByBroker, match=r"NOT_FOUND - no queue"): + broker.channel.queue_declare(queue=queue_name, passive=True) + + +def test_rabbitmq_flush_all_true_deletes_the_queue(): + queues = set(f"flush_all_test_queue_{current_millis()}_{i}" for i in range(10)) + url = "amqp://%s:%s@127.0.0.1:5672" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD) + broker = RabbitmqBroker(url=url) + + for queue_name in queues: + broker.declare_queue(queue_name, ensure=True) + + broker.flush_all(delete_queue=False) + for queue_name in queues: + assert broker.channel.queue_declare(queue=queue_name, passive=True) + + broker.flush_all(delete_queue=True) + for queue_name in queues: + with pytest.raises(pika.exceptions.ChannelClosedByBroker, match=r"NOT_FOUND - no queue"): + broker.connection.channel().queue_declare(queue=queue_name, passive=True)