QQ/Streams: Ensure open file handles are closed when a queue is deleted.
If a stream or quorum queue has opened a file to read a consumer message
and the queue is deleted the file handle reference is lost and kept
open until the end of the channel lifetime.
(cherry picked from commit c688169f08
)
This commit is contained in:
parent
5a41a13f7b
commit
8b40a8e09e
|
@ -14,6 +14,7 @@
|
|||
-export([
|
||||
init/1,
|
||||
init/2,
|
||||
close/1,
|
||||
checkout/4,
|
||||
cancel_checkout/3,
|
||||
enqueue/3,
|
||||
|
@ -755,6 +756,13 @@ handle_ra_event(QName, Leader, close_cached_segments,
|
|||
handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
|
||||
{eol, [{unblock, cluster_name(State)}]}.
|
||||
|
||||
-spec close(rabbit_fifo_client:state()) -> ok.
|
||||
close(#state{cached_segments = undefined}) ->
|
||||
ok;
|
||||
close(#state{cached_segments = {_, _, Flru}}) ->
|
||||
_ = ra_flru:evict_all(Flru),
|
||||
ok.
|
||||
|
||||
%% @doc Attempts to enqueue a message using cast semantics. This provides no
|
||||
%% guarantees or retries if the message fails to achieve consensus or if the
|
||||
%% servers sent to happens not to be available. If the message is sent to a
|
||||
|
|
|
@ -414,7 +414,9 @@ remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
|
|||
case maps:take(QRef, Ctxs0) of
|
||||
error ->
|
||||
State;
|
||||
{_, Ctxs} ->
|
||||
{#ctx{module = Mod,
|
||||
state = S}, Ctxs} ->
|
||||
ok = Mod:close(S),
|
||||
State#?STATE{ctxs = Ctxs}
|
||||
end.
|
||||
|
||||
|
@ -502,11 +504,10 @@ init() ->
|
|||
|
||||
-spec close(state()) -> ok.
|
||||
close(#?STATE{ctxs = Contexts}) ->
|
||||
maps:foreach(
|
||||
fun (_, #ctx{module = Mod,
|
||||
state = S}) ->
|
||||
ok = Mod:close(S)
|
||||
end, Contexts).
|
||||
maps:foreach(fun (_, #ctx{module = Mod,
|
||||
state = S}) ->
|
||||
ok = Mod:close(S)
|
||||
end, Contexts).
|
||||
|
||||
-spec new(amqqueue:amqqueue(), state()) -> state().
|
||||
new(Q, State) when ?is_amqqueue(Q) ->
|
||||
|
|
|
@ -143,7 +143,6 @@
|
|||
-define(RPC_TIMEOUT, 1000).
|
||||
-define(START_CLUSTER_TIMEOUT, 5000).
|
||||
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
|
||||
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
|
||||
-define(TICK_INTERVAL, 5000). %% the ra server tick time
|
||||
-define(DELETE_TIMEOUT, 5000).
|
||||
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
|
||||
|
@ -214,8 +213,8 @@ init(Q) when ?is_amqqueue(Q) ->
|
|||
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
|
||||
|
||||
-spec close(rabbit_fifo_client:state()) -> ok.
|
||||
close(_State) ->
|
||||
ok.
|
||||
close(State) ->
|
||||
rabbit_fifo_client:close(State).
|
||||
|
||||
-spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
|
||||
rabbit_fifo_client:state().
|
||||
|
|
|
@ -197,6 +197,7 @@ all_tests() ->
|
|||
requeue_multiple_true,
|
||||
requeue_multiple_false,
|
||||
subscribe_from_each,
|
||||
dont_leak_file_handles,
|
||||
leader_health_check
|
||||
].
|
||||
|
||||
|
@ -1641,6 +1642,54 @@ subscribe_from_each(Config) ->
|
|||
|
||||
ok.
|
||||
|
||||
dont_leak_file_handles(Config) ->
|
||||
|
||||
[Server0 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
|
||||
QQ = ?config(queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
[begin
|
||||
publish_confirm(Ch, QQ)
|
||||
end || _ <- Servers],
|
||||
timer:sleep(100),
|
||||
%% roll the wal to force consumer messages to be read from disk
|
||||
[begin
|
||||
ok = rpc:call(S, ra_log_wal, force_roll_over, [ra_log_wal])
|
||||
end || S <- Servers],
|
||||
timer:sleep(256),
|
||||
|
||||
C = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||
[_, NCh1] = rpc:call(Server0, rabbit_channel, list, []),
|
||||
qos(C, 1, false),
|
||||
subscribe(C, QQ, false),
|
||||
[begin
|
||||
receive
|
||||
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
|
||||
amqp_channel:call(C, #'basic.ack'{delivery_tag = DeliveryTag})
|
||||
after 5000 ->
|
||||
flush(1),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end
|
||||
end || _ <- Servers],
|
||||
flush(1),
|
||||
[{_, MonBy2}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
|
||||
NumMonRefsBefore = length([M || M <- MonBy2, is_reference(M)]),
|
||||
%% delete queue
|
||||
?assertMatch(#'queue.delete_ok'{},
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
|
||||
[{_, MonBy3}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
|
||||
NumMonRefsAfter = length([M || M <- MonBy3, is_reference(M)]),
|
||||
%% this isn't an ideal way to assert this but every file handle creates
|
||||
%% a monitor that (currenlty?) is a reference so we assert that we have
|
||||
%% fewer reference monitors after
|
||||
?assert(NumMonRefsAfter < NumMonRefsBefore),
|
||||
|
||||
rabbit_ct_client_helpers:close_channel(C),
|
||||
ok.
|
||||
|
||||
gh_12635(Config) ->
|
||||
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
|
||||
[Server0, _Server1, Server2] =
|
||||
|
@ -4949,3 +4998,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
|
|||
ensure_qq_proc_dead(Config, Server, RaName)
|
||||
end.
|
||||
|
||||
lsof_rpc() ->
|
||||
Cmd = rabbit_misc:format(
|
||||
"lsof -p ~ts", [os:getpid()]),
|
||||
os:cmd(Cmd).
|
||||
|
|
Loading…
Reference in New Issue