Skip to content

Commit

Permalink
Merge pull request #12944 from rabbitmq/mk-backport-rabbitmq-server-1…
Browse files Browse the repository at this point in the history
…2921-take-2

4.0.6: backport #12921 by @noxdafox, take 2
  • Loading branch information
michaelklishin authored Dec 19, 2024
2 parents adafa7e + 14ac18b commit 3548fd5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
23 changes: 18 additions & 5 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -725,13 +725,26 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
case IsDuplicate of
true -> State1;
{true, drop} -> State1;
%% Drop publish and nack to publisher
{true, reject} ->
true ->
%% Publish to DLX
_ = with_dlx(
DLX,
fun (X) ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
at_most_once, 1),
QName = qname(State1),
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
end,
fun () ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
disabled, 1)
end),
%% Drop publish and nack to publisher
send_reject_publish(Delivery, State1);
%% Enqueue and maybe drop head later
false ->
%% Enqueue and maybe drop head later
deliver_or_enqueue(Delivery, Delivered, State1)
end
end.
Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@

%% Called prior to a publish or publish_delivered call. Allows the BQ
%% to signal that it's already seen this message, (e.g. it was published
%% or discarded previously) specifying whether to drop the message or reject it.
-callback is_duplicate(mc:state(), state())
-> {{true, drop} | {true, reject} | boolean(), state()}.
%% or discarded previously).
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.

-callback set_queue_mode(queue_mode(), state()) -> state().

Expand Down

0 comments on commit 3548fd5

Please sign in to comment.