Skip to content

Commit

Permalink
Merge pull request #9647 from rabbitmq/mergify/bp/v3.11.x/pr-9645
Browse files Browse the repository at this point in the history
Improve rabbit_stream_queue:get_local_pid/1 (backport #9640) (backport #9645)
  • Loading branch information
michaelklishin authored Oct 5, 2023
2 parents 1c8a087 + abe8da2 commit 665a5a5
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,24 @@ parse_offset_arg({_, V}) ->
parse_offset_arg(V) ->
{error, {invalid_offset_arg, V}}.


get_local_pid(#stream_client{local_pid = Pid} = State)
when is_pid(Pid) ->
{Pid, State};
when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
{Pid, State};
false ->
query_local_pid(State)
end;
get_local_pid(#stream_client{leader = Pid} = State)
when is_pid(Pid) andalso node(Pid) == node() ->
{Pid, State#stream_client{local_pid = Pid}};
get_local_pid(#stream_client{stream_id = StreamId,
local_pid = undefined} = State) ->
when is_pid(Pid) andalso node(Pid) == node() ->
get_local_pid(State#stream_client{local_pid = Pid});
get_local_pid(#stream_client{} = State) ->
%% query local coordinator to get pid
query_local_pid(State).


query_local_pid(#stream_client{stream_id = StreamId} = State) ->
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
{Pid, State#stream_client{local_pid = Pid}};
Expand Down

0 comments on commit 665a5a5

Please sign in to comment.