Skip to content

Commit

Permalink
Merge pull request #11846 from rabbitmq/credit-reply-crash
Browse files Browse the repository at this point in the history
Fix quorum queue credit reply crash in AMQP session
  • Loading branch information
michaelklishin authored Jul 28, 2024
2 parents d3109e9 + ce915ae commit 777fd6f
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 75 deletions.
158 changes: 84 additions & 74 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,9 @@
-record(queue_flow_ctl, {
delivery_count :: sequence_no(),
%% We cap the actual credit we grant to the sending queue.
%% If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
%% we will top up in batches to the sending queue.
credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX,
%% Credit as desired by the receiving client. If larger than
%% LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
desired_credit :: rabbit_queue_type:credit(),
drain :: boolean()
}).

Expand All @@ -197,10 +196,18 @@
%% client and for the link to the sending queue.
client_flow_ctl :: #client_flow_ctl{} | credit_api_v1,
queue_flow_ctl :: #queue_flow_ctl{} | credit_api_v1,
%% True if we sent a credit request to the sending queue
%% but haven't processed the corresponding credit reply yet.
credit_req_in_flight :: boolean() | credit_api_v1,
%% While credit_req_in_flight is true, we stash the
%% 'true' means:
%% * we haven't processed a credit reply yet since we last sent
%% a credit request to the sending queue.
%% * a credit request is certainly in flight
%% * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
%% will re-send credit requests on our behalf on quorum queue leader changes)
%% 'false' means:
%% * we processed a credit reply since we last sent a credit request to the sending queue
%% * probably no credit request is in flight, but there might be
%% (we aren't sure since we don't use correlations for credit requests)
at_least_one_credit_req_in_flight :: boolean() | credit_api_v1,
%% While at_least_one_credit_req_in_flight is true, we stash the
%% latest credit request from the receiving client.
stashed_credit_req :: none | #credit_req{} | credit_api_v1
}).
Expand Down Expand Up @@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
echo = false},
#queue_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT,
credit = 0,
desired_credit = 0,
drain = false},
false,
none};
Expand Down Expand Up @@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
delivery_count = DeliveryCount,
client_flow_ctl = ClientFlowCtl,
queue_flow_ctl = QueueFlowCtl,
credit_req_in_flight = CreditReqInFlight,
at_least_one_credit_req_in_flight = CreditReqInFlight,
stashed_credit_req = StashedCreditReq},
OutgoingLinks = OutgoingLinks0#{HandleInt => Link},
State1 = State0#state{queue_states = QStates,
Expand Down Expand Up @@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
end
end.

handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, Drain},
handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, _Drain},
State = #state{outgoing_links = OutgoingLinks}) ->
Handle = ctag_to_handle(Ctag),
case OutgoingLinks of
#{Handle := Link = #outgoing_link{queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight}} ->
%% Assert that we expect a credit reply for this consumer.
true = CreditReqInFlight,
%% Assert that "The sender's value is always the last known value indicated by the receiver."
Drain = QFC#queue_flow_ctl.drain,
#{Handle := Link} ->
handle_credit_reply0(Action, Handle, Link, State);
_ ->
%% Ignore credit reply for a detached link.
Expand All @@ -1418,18 +1419,16 @@ handle_credit_reply0(
echo = CEcho
},
queue_flow_ctl = #queue_flow_ctl{
delivery_count = QDeliveryCount,
credit = QCredit,
desired_credit = DesiredCredit
} = QFC,
delivery_count = QDeliveryCount
} = QFC0,
stashed_credit_req = StashedCreditReq
} = Link0,
#state{outgoing_links = OutgoingLinks,
queue_states = QStates0
} = S0) ->

%% Assert that flow control state between us and the queue is in sync.
QCredit = Credit,
%% Assertion: Our (receiver) delivery-count should be always
%% in sync with the delivery-count of the sending queue.
QDeliveryCount = DeliveryCount,

case StashedCreditReq of
Expand All @@ -1439,24 +1438,32 @@ handle_credit_reply0(
S = pop_credit_req(Handle, Ctag, Link0, S0),
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
S;
none when QCredit =:= 0 andalso
DesiredCredit > 0 ->
none when Credit =:= 0 andalso
CCredit > 0 ->
QName = Link0#outgoing_link.queue_name,
%% Provide queue next batch of credits.
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(CCredit),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, false, QStates0),
Link = Link0#outgoing_link{
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}
},
queue_flow_ctl = QFC0#queue_flow_ctl{credit = CappedCredit},
at_least_one_credit_req_in_flight = true},
S = S0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{Handle := Link}},
handle_queue_actions(Actions, S);
none ->
Link = Link0#outgoing_link{credit_req_in_flight = false},
%% Although we (the receiver) usually determine link credit, we set here
%% our link credit to what the queue says our link credit is (which is safer
%% in case credit requests got applied out of order in quorum queues).
%% This should be fine given that we asserted earlier that our delivery-count is
%% in sync with the delivery-count of the sending queue.
QFC = QFC0#queue_flow_ctl{credit = Credit},
Link = Link0#outgoing_link{
queue_flow_ctl = QFC,
at_least_one_credit_req_in_flight = false},
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
echo(CEcho, Handle, CDeliveryCount, DesiredCredit, Available, S),
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
S
end;
handle_credit_reply0(
Expand All @@ -1465,10 +1472,11 @@ handle_credit_reply0(
Link0 = #outgoing_link{
queue_name = QName,
client_flow_ctl = #client_flow_ctl{
delivery_count = CDeliveryCount0 } = CFC,
delivery_count = CDeliveryCount0,
credit = CCredit
} = CFC,
queue_flow_ctl = #queue_flow_ctl{
delivery_count = QDeliveryCount0,
desired_credit = DesiredCredit
delivery_count = QDeliveryCount0
} = QFC,
stashed_credit_req = StashedCreditReq},
S0 = #state{cfg = #cfg{writer_pid = Writer,
Expand All @@ -1480,31 +1488,38 @@ handle_credit_reply0(
0 = Credit,

case DeliveryCount =:= QDeliveryCount0 andalso
DesiredCredit > 0 of
CCredit > 0 of
true ->
%% We're in drain mode. The queue did not advance its delivery-count which means
%% it might still have messages available for us. We also desire more messages.
%% it might still have messages available for us. The client also desires more messages.
%% Therefore, we do the next round of credit top-up. We prioritise finishing
%% the current drain credit top-up rounds over a stashed credit request because
%% this is easier to reason about and the queue will reply promptly meaning
%% the stashed request will be processed soon enough.
CappedCredit = cap_credit(DesiredCredit),
Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}},

{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, true, QStates0),
CappedCredit = cap_credit(CCredit),
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag, DeliveryCount,
CappedCredit, true, QStates0),
Link = Link0#outgoing_link{
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit},
at_least_one_credit_req_in_flight = true},
S = S0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{Handle := Link}},
handle_queue_actions(Actions, S);
false ->
case compare(DeliveryCount, QDeliveryCount0) of
equal -> ok;
greater -> ok; %% the sending queue advanced its delivery-count
less -> error({unexpected_delivery_count, DeliveryCount, QDeliveryCount0})
end,

%% We're in drain mode.
%% The queue either advanced its delivery-count which means it has
%% no more messages available for us, or we do not desire more messages.
%% no more messages available for us, or the client does not desire more messages.
%% Therefore, we're done with draining and we "the sender will (after sending
%% all available messages) advance the delivery-count as much as possible,
%% consuming all link-credit, and send the flow state to the receiver."
CDeliveryCount = add(CDeliveryCount0, DesiredCredit),
CDeliveryCount = add(CDeliveryCount0, CCredit),
Flow0 = #'v1_0.flow'{handle = ?UINT(Handle),
delivery_count = ?UINT(CDeliveryCount),
link_credit = ?UINT(0),
Expand All @@ -1519,9 +1534,8 @@ handle_credit_reply0(
queue_flow_ctl = QFC#queue_flow_ctl{
delivery_count = DeliveryCount,
credit = 0,
desired_credit = 0,
drain = false},
credit_req_in_flight = false
at_least_one_credit_req_in_flight = false
},
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
case StashedCreditReq of
Expand Down Expand Up @@ -1553,19 +1567,17 @@ pop_credit_req(
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv, LinkCreditRcv, CDeliveryCount),
CappedCredit = cap_credit(LinkCreditSnd),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0),
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount,
CappedCredit, Drain, QStates0),
Link = Link0#outgoing_link{
client_flow_ctl = CFC#client_flow_ctl{
credit = LinkCreditSnd,
echo = Echo},
queue_flow_ctl = QFC#queue_flow_ctl{
credit = CappedCredit,
desired_credit = LinkCreditSnd,
drain = Drain
},
credit_req_in_flight = true,
drain = Drain},
at_least_one_credit_req_in_flight = true,
stashed_credit_req = none
},
S = S0#state{queue_states = QStates,
Expand Down Expand Up @@ -1685,19 +1697,20 @@ sent_pending_delivery(
credit_api_version = CreditApiVsn,
client_flow_ctl = CFC0,
queue_flow_ctl = QFC0,
credit_req_in_flight = CreditReqInFlight0
at_least_one_credit_req_in_flight = CreditReqInFlight0
} = Link0 = maps:get(Handle, OutgoingLinks0),

S = case CreditApiVsn of
1 ->
S0;
2 ->
#client_flow_ctl{
delivery_count = CDeliveryCount0,
credit = CCredit0
} = CFC0,
#queue_flow_ctl{
delivery_count = QDeliveryCount0,
credit = QCredit0,
desired_credit = DesiredCredit0
credit = QCredit0
} = QFC0,

CDeliveryCount = add(CDeliveryCount0, 1),
Expand All @@ -1715,17 +1728,16 @@ sent_pending_delivery(

QDeliveryCount = add(QDeliveryCount0, 1),
QCredit1 = max(0, QCredit0 - 1),
DesiredCredit = max(0, DesiredCredit0 - 1),

{QCredit, CreditReqInFlight, QStates, Actions} =
case QCredit1 =:= 0 andalso
DesiredCredit > 0 andalso
CCredit > 0 andalso
not CreditReqInFlight0 of
true ->
%% assertion
none = Link0#outgoing_link.stashed_credit_req,
%% Provide queue next batch of credits.
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(CCredit),
{ok, QStates1, Actions0} =
rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount, CappedCredit,
Expand All @@ -1740,17 +1752,15 @@ sent_pending_delivery(
credit = CCredit},
QFC = QFC0#queue_flow_ctl{
delivery_count = QDeliveryCount,
credit = QCredit,
desired_credit = DesiredCredit},
Link = Link0#outgoing_link{client_flow_ctl = CFC,
queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight},
credit = QCredit},
Link = Link0#outgoing_link{
client_flow_ctl = CFC,
queue_flow_ctl = QFC,
at_least_one_credit_req_in_flight = CreditReqInFlight},
OutgoingLinks = OutgoingLinks0#{Handle := Link},
S1 = S0#state{outgoing_links = OutgoingLinks,
queue_states = QStates},
handle_queue_actions(Actions, S1);
1 ->
S0
handle_queue_actions(Actions, S1)
end,
record_outgoing_unsettled(Pending, S).

Expand Down Expand Up @@ -2677,7 +2687,7 @@ handle_outgoing_link_flow_control(
credit_api_version = CreditApiVsn,
client_flow_ctl = CFC,
queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight
at_least_one_credit_req_in_flight = CreditReqInFlight
} = Link0,
#'v1_0.flow'{handle = ?UINT(HandleInt),
delivery_count = MaybeDeliveryCountRcv,
Expand All @@ -2695,26 +2705,26 @@ handle_outgoing_link_flow_control(
2 ->
case CreditReqInFlight of
false ->
DesiredCredit = amqp10_util:link_credit_snd(
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv,
LinkCreditRcv,
CFC#client_flow_ctl.delivery_count),
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(LinkCreditSnd),
Link = Link0#outgoing_link{
credit_req_in_flight = true,
client_flow_ctl = CFC#client_flow_ctl{
credit = DesiredCredit,
credit = LinkCreditSnd,
echo = Echo},
queue_flow_ctl = QFC#queue_flow_ctl{
credit = CappedCredit,
desired_credit = DesiredCredit,
drain = Drain}},
drain = Drain},
at_least_one_credit_req_in_flight = true},
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag,
QFC#queue_flow_ctl.delivery_count,
CappedCredit, Drain, QStates0),
State = State0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{HandleInt := Link}},
State = State0#state{
queue_states = QStates,
outgoing_links = OutgoingLinks#{HandleInt := Link}},
handle_queue_actions(Actions, State);
true ->
%% A credit request is currently in-flight. Let's first process its reply
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ seq_applied({Seq, Response},
when Response /= not_enqueued ->
{[Corr | Corrs], Actions, State#state{pending = Pending}};
_ ->
{Corrs, Actions, State#state{}}
{Corrs, Actions, State}
end;
seq_applied(_Seq, Acc) ->
Acc.
Expand Down
Loading

0 comments on commit 777fd6f

Please sign in to comment.