Skip to content

Commit

Permalink
QQ: ensure opened segments are closed after some time of inactivity
Browse files Browse the repository at this point in the history
Processes that havea received messages that had to be read from disks
may keep a segment open indefinitely. This introduces a timer which
after some time of inactivity will close all opened segments to ensure
file desciptors are not kept open indefinitely.
  • Loading branch information
kjnilsson committed Dec 23, 2024
1 parent 2dc2cbd commit 01cc60f
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 11 deletions.
14 changes: 14 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
messages_uncommitted,
acks_uncommitted,
pending_raft_commands,
cached_segments,
prefetch_count,
state,
garbage_collection]).
Expand Down Expand Up @@ -2287,6 +2288,8 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(pending_raft_commands, #ch{queue_states = QS}) ->
pending_raft_commands(QS);
i(cached_segments, #ch{queue_states = QS}) ->
cached_segments(QS);
i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state();
i(state, #ch{cfg = #conf{state = State}}) -> State;
i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C;
Expand Down Expand Up @@ -2315,6 +2318,17 @@ pending_raft_commands(QStates) ->
end,
rabbit_queue_type:fold_state(Fun, 0, QStates).

cached_segments(QStates) ->
Fun = fun(_, V, Acc) ->
case rabbit_queue_type:state_info(V) of
#{cached_segments := P} ->
Acc + P;
_ ->
Acc
end
end,
rabbit_queue_type:fold_state(Fun, 0, QStates).

name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
list_to_binary(rabbit_misc:format("~ts (~tp)", [ConnName, Channel])).

Expand Down
91 changes: 81 additions & 10 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
purge/1,
update_machine_state/2,
pending_size/1,
num_cached_segments/1,
stat/1,
stat/2,
query_single_active_consumer/1,
Expand All @@ -40,8 +41,12 @@
-define(TIMER_TIME, 10000).
-define(COMMAND_TIMEOUT, 30000).
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
%% controls the timer for closing cached segments
-define(CACHE_SEG_TIMEOUT, 5000).

-type seq() :: non_neg_integer().
-type milliseconds() :: non_neg_integer().


-record(consumer, {key :: rabbit_fifo:consumer_key(),
% status = up :: up | cancelled,
Expand Down Expand Up @@ -70,7 +75,10 @@
{term(), rabbit_fifo:command()}},
consumers = #{} :: #{rabbit_types:ctag() => #consumer{}},
timer_state :: term(),
cached_segments :: undefined | ra_flru:state()
cached_segments :: undefined |
{undefined | reference(),
LastSeenMs :: milliseconds(),
ra_flr:state()}
}).

-opaque state() :: #state{}.
Expand Down Expand Up @@ -517,6 +525,15 @@ purge(Server) ->
pending_size(#state{pending = Pend}) ->
maps:size(Pend).

-spec num_cached_segments(state()) -> non_neg_integer().
num_cached_segments(#state{cached_segments = CachedSegments}) ->
case CachedSegments of
undefined ->
0;
{_, _, Cached} ->
ra_flru:size(Cached)
end.

-spec stat(ra:server_id()) ->
{ok, non_neg_integer(), non_neg_integer()}
| {error | timeout, term()}.
Expand Down Expand Up @@ -651,24 +668,25 @@ handle_ra_event(_QName, _, {machine, {queue_status, Status}},
#state{} = State) ->
%% just set the queue status
{ok, State#state{queue_status = Status}, []};
handle_ra_event(_QName, Leader, {machine, leader_change},
handle_ra_event(QName, Leader, {machine, leader_change},
#state{leader = OldLeader} = State0) ->
%% we need to update leader
%% and resend any pending commands
rabbit_log:debug("~ts: Detected QQ leader change from ~w to ~w",
[?MODULE, OldLeader, Leader]),
rabbit_log:debug("~ts: ~s Detected QQ leader change from ~w to ~w",
[rabbit_misc:rs(QName), ?MODULE, OldLeader, Leader]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []};
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
#state{leader = Leader} = State) ->
{ok, State, []};
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
#state{leader = OldLeader} = State0) ->
rabbit_log:debug("~ts: Detected QQ leader change (rejection) from ~w to ~w",
[?MODULE, OldLeader, Leader]),
rabbit_log:debug("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w",
[rabbit_misc:rs(QName), ?MODULE, OldLeader, Leader]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, cancel_timer(State), []};
handle_ra_event(_QName, _From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
handle_ra_event(_QName, _From,
{rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{ok, State0, []};
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
Expand All @@ -680,6 +698,30 @@ handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []}
end;
handle_ra_event(QName, Leader, close_cached_segments,
#state{cached_segments = CachedSegments} = State) ->
{ok,
case CachedSegments of
undefined ->
%% timer didn't get cancelled so just ignore this
State;
{_TRef, Last, Cache} ->
case now_ms() > Last + ?CACHE_SEG_TIMEOUT of
true ->
rabbit_log:debug("~ts: closing_cached_segments",
[rabbit_misc:rs(QName)]),
%% its been long enough, evict all
_ = ra_flru:evict_all(Cache),
State#state{cached_segments = undefined};
false ->
%% set another timer
Ref = erlang:send_after(?CACHE_SEG_TIMEOUT, self(),
{'$gen_cast',
{queue_event, QName,
{Leader, close_cached_segments}}}),
State#state{cached_segments = {Ref, Last, Cache}}
end
end, []};
handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
{eol, [{unblock, cluster_name(State)}]}.

Expand Down Expand Up @@ -845,8 +887,34 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
{State1, Deliveries} = return(Tag, MsgIntIds, State0),
{ok, State1, Deliveries};
handle_delivery(QName, Leader, {delivery, Tag, ReadPlan, Msgs},
#state{cached_segments = Cached0} = State) ->
{MsgIds, Cached} = rabbit_fifo:exec_read(Cached0, ReadPlan, Msgs),
#state{cached_segments = CachedSegments} = State) ->
{TRef, Cached0} = case CachedSegments of
undefined ->
{undefined, undefined};
{R, _, C} ->
{R, C}
end,
{MsgIds, Cached1} = rabbit_fifo:exec_read(Cached0, ReadPlan, Msgs),
%% if there are cached segments after a read and there
%% is no current timer set, set a timer
%% send a message to evict cache after some time
Cached = case ra_flru:size(Cached1) > 0 of
true when TRef == undefined ->
Ref = erlang:send_after(?CACHE_SEG_TIMEOUT, self(),
{'$gen_cast',
{queue_event, QName,
{Leader, close_cached_segments}}}),
{Ref, now_ms(), Cached1};
true ->
{TRef, now_ms(), Cached1};
false ->
if TRef =/= undefined ->
erlang:cancel_timer(TRef, [{async, true}]);
true ->
ok
end,
undefined
end,
handle_delivery(QName, Leader, {delivery, Tag, MsgIds},
State#state{cached_segments = Cached}).

Expand Down Expand Up @@ -1017,3 +1085,6 @@ send_pending(Cid, #state{unsent_commands = Unsent} = State0) ->
normal, S0)
end, State0, Commands),
State1#state{unsent_commands = maps:remove(Cid, Unsent)}.

now_ms() ->
erlang:system_time(millisecond).
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,8 @@ deliver(QSs, Msg0, Options) ->


state_info(S) ->
#{pending_raft_commands => rabbit_fifo_client:pending_size(S)}.
#{pending_raft_commands => rabbit_fifo_client:pending_size(S),
cached_segments => rabbit_fifo_client:num_cached_segments(S)}.

-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
infos(QName) ->
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/channel.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
<th>Pending Raft commands</th>
<td><%= channel.pending_raft_commands %></td>
</tr>
<tr>
<th>Cached segments</th>
<td><%= channel.cached_segments %></td>
</tr>
</table>

</div>
Expand Down

0 comments on commit 01cc60f

Please sign in to comment.