From c28018ee57293d73666bb9fdc8eb6aba5d996de3 Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Tue, 10 Dec 2024 23:37:59 +0200 Subject: [PATCH 1/3] backing_queue: simplify `is_duplicate` callback signature `is_duplicate` callback signature was changed in order to support both the mirroring queues as well as the de-duplication ones. As the mirroring queues are now deprecated and removed, we can fall back to a simpler boolean as return value. Signed-off-by: Matteo Cafasso (cherry picked from commit c927446e17c825208db7c734be160b91b6090c67) (cherry picked from commit 6a6e76010709d63883791905dfe76675fe6c8e8f) --- deps/rabbit/src/rabbit_backing_queue.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index ffa0a791f1b5..3169cbf87bfd 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -220,9 +220,8 @@ %% Called prior to a publish or publish_delivered call. Allows the BQ %% to signal that it's already seen this message, (e.g. it was published -%% or discarded previously) specifying whether to drop the message or reject it. --callback is_duplicate(mc:state(), state()) - -> {{true, drop} | {true, reject} | boolean(), state()}. +%% or discarded previously). +-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}. -callback set_queue_mode(queue_mode(), state()) -> state(). From 6e056e9545de231f3a787928bc627c81696a2a6f Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Tue, 10 Dec 2024 23:57:44 +0200 Subject: [PATCH 2/3] amqqueue_process: adopt new `is_duplicate` backing queue callback As the de-duplication plugin is the only adopter of the `is_duplicate` callback, we now use a simpler signature. When a message is deemed duplicated, we discard it and re-route it to dead letter exchange. Signed-off-by: Matteo Cafasso (cherry picked from commit f93baa35cbdcb1ad56dd52f414abf3376371d12a) (cherry picked from commit 8d7535e0b12c6512e582ab201feaa9218c1f776d) --- deps/rabbit/src/rabbit_amqqueue_process.erl | 23 ++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 63f886bd3763..097cf6dde67b 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -725,13 +725,26 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State1 = State#q{backing_queue_state = BQS1}, case IsDuplicate of - true -> State1; - {true, drop} -> State1; - %% Drop publish and nack to publisher - {true, reject} -> + true -> + %% Publish to DLX + _ = with_dlx( + DLX, + fun (X) -> + rabbit_global_counters:messages_dead_lettered(maxlen, + rabbit_classic_queue, + at_most_once, 1), + QName = qname(State1), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> + rabbit_global_counters:messages_dead_lettered(maxlen, + rabbit_classic_queue, + disabled, 1) + end), + %% Drop publish and nack to publisher send_reject_publish(Delivery, State1); - %% Enqueue and maybe drop head later false -> + %% Enqueue and maybe drop head later deliver_or_enqueue(Delivery, Delivered, State1) end end. From 14ac18bea05b341789ce291874cd87693828156b Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sat, 14 Dec 2024 20:03:21 -0500 Subject: [PATCH 3/3] Revert "Try running plugin tests on Ubuntu 22.04" This reverts commit 78602ec8c3c4fca2ec34f29d87f95e8819e682b7. --- .github/workflows/test-make-tests.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/test-make-tests.yaml b/.github/workflows/test-make-tests.yaml index 4fca4ab383ab..8cde4de768d0 100644 --- a/.github/workflows/test-make-tests.yaml +++ b/.github/workflows/test-make-tests.yaml @@ -68,7 +68,6 @@ jobs: test-plugin: name: Test plugins - runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: