Skip to content

Commit

Permalink
Merge pull request #11685 from rabbitmq/md/khepri-minority-errors/rab…
Browse files Browse the repository at this point in the history
…bit_db_binding

Handle timeouts possible in Khepri minority in `rabbit_db_binding`
  • Loading branch information
the-mikedavis authored Jul 22, 2024
2 parents bd2d6d9 + f1be7ba commit 3a0808c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 13 deletions.
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error({'binding_invalid', string(), [any()]}) |
%% inner_fun() result
rabbit_types:error(rabbit_types:amqp_error()).
rabbit_types:error(rabbit_types:amqp_error()) |
rabbit_khepri:timeout_error().
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
fun((rabbit_types:exchange(),
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,9 @@ binding_action(Action, Binding, Username, ConnPid) ->
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error, "Could not ~s binding due to timeout", [Action]);
ok ->
ok
end.
Expand Down
15 changes: 10 additions & 5 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ exists_in_khepri(#binding{source = SrcName,
Errs ->
Errs
end
end) of
end, ro) of
{ok, not_found} -> false;
{ok, Set} -> sets:is_element(Binding, Set);
Errs -> not_found_errs_in_khepri(not_found(Errs, SrcName, DstName))
Expand Down Expand Up @@ -150,8 +150,9 @@ not_found({[], []}, SrcName, DstName) ->
Binding :: rabbit_types:binding(),
Src :: rabbit_types:binding_source(),
Dst :: rabbit_types:binding_destination(),
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {error, Reason :: any()}.
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
ChecksErrReason :: any(),
Ret :: ok | {error, ChecksErrReason} | rabbit_khepri:timeout_error().
%% @doc Writes a binding if it doesn't exist already and passes the validation in
%% `ChecksFun' i.e. exclusive access
%%
Expand Down Expand Up @@ -255,8 +256,12 @@ serial_in_khepri(true, X) ->
Binding :: rabbit_types:binding(),
Src :: rabbit_types:binding_source(),
Dst :: rabbit_types:binding_destination(),
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
ChecksErrReason :: any(),
Ret :: ok |
{ok, rabbit_binding:deletions()} |
{error, ChecksErrReason} |
rabbit_khepri:timeout_error().
%% @doc Deletes a binding record from the database if it passes the validation in
%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
%%
Expand Down
17 changes: 11 additions & 6 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -883,12 +883,17 @@ add_binding(VHost, Binding, ActingUser) ->
rv(VHost, DestType, destination, Binding), ActingUser).

add_binding_int(Binding, Source, Destination, ActingUser) ->
rabbit_binding:add(
#binding{source = Source,
destination = Destination,
key = maps:get(routing_key, Binding, undefined),
args = args(maps:get(arguments, Binding, undefined))},
ActingUser).
case rabbit_binding:add(
#binding{source = Source,
destination = Destination,
key = maps:get(routing_key, Binding, undefined),
args = args(maps:get(arguments, Binding, undefined))},
ActingUser) of
ok ->
ok;
{error, _} = Err ->
throw(Err)
end.

dest_type(Binding) ->
rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)).
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@
clear_forced_metadata_store/0]).
-endif.

-type timeout_error() :: khepri:error(timeout).
%% Commands like 'put'/'delete' etc. might time out in Khepri. It might take
%% the leader longer to apply the command and reply to the caller than the
%% configured timeout. This error is easy to reproduce - a cluster which is
%% only running a minority of nodes will consistently return `{error, timeout}`
%% for commands until the cluster majority can be re-established. Commands
%% returning `{error, timeout}` are a likely (but not certain) indicator that
%% the node which submitted the command is running in a minority.

-export_type([timeout_error/0]).

-compile({no_auto_import, [get/1, get/2, nodes/0]}).

-define(RA_SYSTEM, coordination).
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ groups() ->
open_channel,
declare_exchange,
declare_binding,
delete_binding,
declare_queue,
publish_to_exchange,
publish_and_consume_to_local_classic_queue,
Expand Down Expand Up @@ -85,7 +86,7 @@ init_per_group(Group, Config0) when Group == client_operations;
{skip, _} ->
Config1;
_ ->
%% Before partitioning the cluster, create a policy and queue that can be used in
%% Before partitioning the cluster, create resources that can be used in
%% the test cases. They're needed for delete and consume operations, which can list
%% them but fail to operate anything else.
%%
Expand All @@ -95,6 +96,10 @@ init_per_group(Group, Config0) when Group == client_operations;
%% To be used in consume_from_queue
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
%% To be used in delete_binding
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
source = <<"amq.direct">>,
routing_key = <<"binding-to-be-deleted">>}),

%% Lower the default Khepri command timeout. By default this is set
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
Expand Down Expand Up @@ -160,6 +165,14 @@ declare_binding(Config) ->
source = <<"amq.direct">>,
routing_key = <<"key">>})).

delete_binding(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'exchange.unbind'{destination = <<"amq.fanout">>,
source = <<"amq.direct">>,
routing_key = <<"binding-to-be-deleted">>})).

declare_queue(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,8 @@ add_super_stream_binding(VirtualHost,
{error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
{error, #amqp_error{} = Error} ->
{error, {internal_error, rabbit_misc:format("~tp", [Error])}};
{error, timeout} ->
{error, {internal_error, "failed to add binding due to a timeout"}};
ok ->
ok
end.
Expand Down

0 comments on commit 3a0808c

Please sign in to comment.