Remove most of the fd related FHC code

Stats were not removed, including management UI stats
relating to FDs.

Web-MQTT and Web-STOMP configuration relating to FHC
were not removed.

The file_handle_cache itself must be kept until we
remove CQv1.
This commit is contained in:
Loïc Hoguin 2024-06-12 12:26:01 +02:00
parent 5de87aad3d
commit 49bedfc17e
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
25 changed files with 60 additions and 394 deletions

View File

@ -118,7 +118,6 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, State) ->
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
@ -134,7 +133,6 @@ do_connect({Addr, Family},
SIF, State) ->
{ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
app_utils:start_applications([asn1, crypto, public_key, ssl]),
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
@ -379,11 +377,5 @@ handshake_recv(Expecting) ->
end
end.
obtain() ->
case code:is_loaded(file_handle_cache) of
false -> ok;
_ -> file_handle_cache:obtain()
end.
get_reason(#'connection.close'{reply_code = ErrCode}) ->
?PROTOCOL:amqp_exception(ErrCode).

View File

@ -744,9 +744,6 @@ status() ->
get_disk_free_limit, []}},
{disk_free, {rabbit_disk_monitor,
get_disk_free, []}}]),
S3 = rabbit_misc:with_exit_handler(
fun () -> [] end,
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
S4 = [{processes, [{limit, erlang:system_info(process_limit)},
{used, erlang:system_info(process_count)}]},
{run_queue, erlang:statistics(run_queue)},
@ -781,7 +778,7 @@ status() ->
(_) -> false
end,
maps:to_list(product_info())),
S1 ++ S2 ++ S3 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.
S1 ++ S2 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.
alarms() ->
Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]),
@ -1656,7 +1653,7 @@ config_files() ->
start_fhc() ->
ok = rabbit_sup:start_restartable_child(
file_handle_cache,
[fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]),
[fun(_) -> ok end, fun(_) -> ok end]),
ensure_working_fhc().
ensure_working_fhc() ->

View File

@ -7,7 +7,6 @@
-module(rabbit_amqqueue).
-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
@ -119,21 +118,6 @@
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).
warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
L = length(DurableQueues),
%% if there are not enough file handles, the server might hang
%% when trying to recover queues, warn the user:
case file_handle_cache:get_limit() < L of
true ->
rabbit_log:warning(
"Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!",
[L, file_handle_cache:get_limit(), L]);
false ->
ok
end.
-spec recover(rabbit_types:vhost()) ->
{Recovered :: [amqqueue:amqqueue()],
Failed :: [amqqueue:amqqueue()]}.
@ -183,11 +167,6 @@ find_local_durable_queues(VHostName) ->
rabbit_queue_type:is_recoverable(Q)
end).
find_recoverable_queues() ->
rabbit_db_queue:filter_all_durable(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end).
-spec declare(name(),
boolean(),
boolean(),

View File

@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q,
(Res == created orelse Res == existing) ->
case matches(Recover, Q, Q1) of
true ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
BQ = backing_queue_module(),
BQS = bq_init(BQ, Q, TermsOrNew),
send_reply(From, {new, Q}),
@ -1189,7 +1187,6 @@ prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
@ -1499,10 +1496,6 @@ handle_cast({deactivate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
ChPid, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
handle_cast({credit, SessionPid, CTag, Credit, Drain},
#q{q = Q,
backing_queue = BQ,

View File

@ -41,15 +41,6 @@
-define(HEADER_SIZE, 64). %% bytes
-define(ENTRY_SIZE, 32). %% bytes
%% The file_handle_cache module tracks reservations at
%% the level of the process. This means we cannot
%% handle them independently in the store and index.
%% Because the index may reserve more FDs than the
%% store the index becomes responsible for this and
%% will always reserve at least 2 FDs, and release
%% everything when terminating.
-define(STORE_FD_RESERVATIONS, 2).
-include_lib("rabbit_common/include/rabbit.hrl").
%% Set to true to get an awful lot of debug logs.
-if(false).
@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
ok = file:sync(Fd),
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir,
_ = maps:map(fun(_, Fd) ->
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Erase the data on disk.
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
State#qi{ segments = #{},
@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments })
%% using too many FDs when the consumer lags a lot. We
%% limit at 4 because we try to keep up to 2 for reading
%% and 2 for writing.
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds })
reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds })
when map_size(OpenFds) < 4 ->
%% The only case where we need to update reservations is
%% when we are opening a segment that wasn't already open,
%% and we are not closing another segment at the same time.
case OpenFds of
#{SegmentToOpen := _} ->
State;
_ ->
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1),
State
end;
State;
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
case OpenFds0 of
#{SegmentToOpen := _} ->
@ -868,7 +848,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) ->
State = case maps:take(Segment, OpenFds0) of
{Fd, OpenFds} ->
ok = file:close(Fd),
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)),
State0#qi{ fds = OpenFds };
error ->
State0

View File

@ -41,11 +41,6 @@
%% need to look into the store to discard them. Messages on disk
%% will be dropped at the same time as the index deletes the
%% corresponding segment file.
%%
%% The file_handle_cache reservations are done by the v2 index
%% because they are handled at a pid level. Since we are using
%% up to 2 FDs in this module we make the index reserve 2 extra
%% FDs.
-module(rabbit_classic_queue_store_v2).

View File

@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects];
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects)
when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation | Effects];
[{aux, eol} | Effects];
state_enter0(_, _, Effects) ->
%% catch all as not handling all states
Effects.

View File

@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].

View File

@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
[{aux, eol}];
state_enter(_, _) ->
%% catch all as not handling all states
[].

View File

@ -18,8 +18,6 @@
-export([filename_as_a_directory/1]).
-export([filename_to_binary/1, binary_to_filename/1]).
-import(file_handle_cache, [with_handle/1, with_handle/2]).
-define(TMP_EXT, ".tmp").
%%----------------------------------------------------------------------------
@ -56,7 +54,7 @@ file_size(File) ->
-spec ensure_dir((file:filename())) -> ok_or_error().
ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end).
ensure_dir(File) -> ensure_dir_internal(File).
ensure_dir_internal("/") ->
ok;
@ -81,16 +79,17 @@ wildcard(Pattern, Dir) ->
-spec list_dir(file:filename()) ->
rabbit_types:ok_or_error2([file:filename()], any()).
list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end).
list_dir(Dir) -> prim_file:list_dir(Dir).
read_file_info(File) ->
with_handle(fun () -> file:read_file_info(File, [raw]) end).
file:read_file_info(File, [raw]).
-spec read_term_file
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()).
read_term_file(File) ->
try
%% @todo OTP-27+ has file:read_file(File, [raw]).
F = fun() ->
{ok, FInfo} = file:read_file_info(File, [raw]),
{ok, Fd} = file:open(File, [read, raw, binary]),
@ -100,7 +99,7 @@ read_term_file(File) ->
file:close(Fd)
end
end,
{ok, Data} = with_handle(F),
{ok, Data} = F(),
{ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
TokenGroups = group_tokens(Tokens),
{ok, [begin
@ -166,22 +165,19 @@ with_synced_copy(Path, Modes, Fun) ->
true ->
{error, append_not_supported, Path};
false ->
with_handle(
fun () ->
Bak = Path ++ ?TMP_EXT,
case prim_file:open(Bak, Modes) of
{ok, Hdl} ->
try
Result = Fun(Hdl),
ok = prim_file:sync(Hdl),
ok = prim_file:rename(Bak, Path),
Result
after
prim_file:close(Hdl)
end;
{error, _} = E -> E
end
end)
Bak = Path ++ ?TMP_EXT,
case prim_file:open(Bak, Modes) of
{ok, Hdl} ->
try
Result = Fun(Hdl),
ok = prim_file:sync(Hdl),
ok = prim_file:rename(Bak, Path),
Result
after
prim_file:close(Hdl)
end;
{error, _} = E -> E
end
end.
%% TODO the semantics of this function are rather odd. But see bug 25021.
@ -198,16 +194,12 @@ append_file(File, Suffix) ->
append_file(_, _, "") ->
ok;
append_file(File, 0, Suffix) ->
with_handle(fun () ->
case prim_file:open([File, Suffix], [append]) of
{ok, Fd} -> prim_file:close(Fd);
Error -> Error
end
end);
case prim_file:open([File, Suffix], [append]) of
{ok, Fd} -> prim_file:close(Fd);
Error -> Error
end;
append_file(File, _, Suffix) ->
case with_handle(2, fun () ->
file:copy(File, {[File, Suffix], [append]})
end) of
case file:copy(File, {[File, Suffix], [append]}) of
{ok, _BytesCopied} -> ok;
Error -> Error
end.
@ -223,21 +215,19 @@ ensure_parent_dirs_exist(Filename) ->
-spec rename(file:filename(), file:filename()) -> ok_or_error().
rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end).
rename(Old, New) -> prim_file:rename(Old, New).
-spec delete([file:filename()]) -> ok_or_error().
delete(File) -> with_handle(fun () -> prim_file:delete(File) end).
delete(File) -> prim_file:delete(File).
-spec recursive_delete([file:filename()]) ->
rabbit_types:ok_or_error({file:filename(), any()}).
recursive_delete(Files) ->
with_handle(
fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
(_Path, {error, _Err} = Error) -> Error
end, ok, Files)
end).
lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
(_Path, {error, _Err} = Error) -> Error
end, ok, Files).
recursive_delete1(Path) ->
case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of
@ -315,10 +305,8 @@ recursive_copy(Src, Dest) ->
lock_file(Path) ->
case is_file(Path) of
true -> {error, eexist};
false -> with_handle(
fun () -> {ok, Lock} = prim_file:open(Path, [write]),
ok = prim_file:close(Lock)
end)
false -> {ok, Lock} = prim_file:open(Path, [write]),
ok = prim_file:close(Lock)
end.
-spec filename_as_a_directory(file:filename()) -> file:filename().

View File

@ -576,7 +576,7 @@ handshake(Ref, ProxyProtocolEnabled) ->
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
setup_socket(Sock),
ok = tune_buffer_size(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end
end;
@ -585,15 +585,11 @@ handshake(Ref, ProxyProtocolEnabled) ->
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
setup_socket(Sock),
ok = tune_buffer_size(Sock),
{ok, Sock}
end
end.
setup_socket(Sock) ->
ok = tune_buffer_size(Sock),
ok = file_handle_cache:obtain().
tune_buffer_size(Sock) ->
case tune_buffer_size1(Sock) of
ok -> ok;

View File

@ -51,9 +51,6 @@
grow/4,
grow/5]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
-export([list_with_minimum_quorum/0,
list_with_local_promotable/0,
list_with_local_promotable_for_cli/0,
@ -1415,24 +1412,6 @@ matches_strategy(even, Members) ->
is_match(Subj, E) ->
nomatch /= re:run(Subj, E).
file_handle_leader_reservation(QName) ->
try
{ok, Q} = rabbit_amqqueue:lookup(QName),
ClusterSize = length(get_nodes(Q)),
file_handle_cache:set_reservation(2 + ClusterSize)
catch Class:Err ->
rabbit_log:warning("~s:~s/~b failed with ~w ~w",
[?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY,
Class, Err])
end.
file_handle_other_reservation() ->
file_handle_cache:set_reservation(2).
file_handle_release_reservation() ->
file_handle_cache:release_reservation().
-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}.
reclaim_memory(Vhost, QueueName) ->
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},

View File

@ -374,11 +374,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
%% delays. We could just terminate - the reader is the
%% controlling process and hence its termination will close
%% the socket. However, to keep the file_handle_cache
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
%% delays.
rabbit_net:fast_close(RealSocket),
rabbit_networking:unregister_connection(self()),
rabbit_core_metrics:connection_closed(self()),

View File

@ -36,8 +36,6 @@ recover() ->
%% faster than other nodes handled DOWN messages from us.
rabbit_amqqueue:on_node_down(node()),
rabbit_amqqueue:warn_file_limit(),
%% Prepare rabbit_semi_durable_route table
{Time, _} = timer:tc(fun() ->
rabbit_binding:recover()

View File

@ -97,8 +97,7 @@ init_per_group(Group, Config) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++ [
fun(C) -> init_per_group1(Group, C) end,
fun setup_file_handle_cache/1
fun(C) -> init_per_group1(Group, C) end
]);
false ->
rabbit_ct_helpers:run_steps(Config, [
@ -137,17 +136,6 @@ init_per_group1(from_cluster_node2, Config) ->
init_per_group1(_, Config) ->
Config.
setup_file_handle_cache(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, setup_file_handle_cache1, []),
Config.
setup_file_handle_cache1() ->
%% FIXME: Why are we doing this?
application:set_env(rabbit, file_handles_high_watermark, 100),
ok = file_handle_cache:set_limit(100),
ok.
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->

View File

@ -80,8 +80,6 @@ groups() ->
reject_after_leader_transfer,
shrink_all,
rebalance,
file_handle_reservations,
file_handle_reservations_above_limit,
node_removal_is_not_quorum_critical,
leader_locator_client_local,
leader_locator_balanced,
@ -2053,61 +2051,6 @@ node_removal_is_not_quorum_critical(Config) ->
?assertEqual([], Qs).
file_handle_reservations(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "file_handle_reservations tests isn't mixed version compatible"};
false ->
file_handle_reservations0(Config)
end.
file_handle_reservations0(Config) ->
Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
{ok, _, {_, Leader}} = ra:members({RaName, Server1}),
[Follower1, Follower2] = Servers -- [Leader],
?assertEqual([{files_reserved, 5}],
rpc:call(Leader, file_handle_cache, info, [[files_reserved]])),
?assertEqual([{files_reserved, 2}],
rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])),
?assertEqual([{files_reserved, 2}],
rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])),
force_leader_change(Servers, QQ),
{ok, _, {_, Leader0}} = ra:members({RaName, Server1}),
[Follower01, Follower02] = Servers -- [Leader0],
?assertEqual([{files_reserved, 5}],
rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])),
?assertEqual([{files_reserved, 2}],
rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])),
?assertEqual([{files_reserved, 2}],
rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])).
file_handle_reservations_above_limit(Config) ->
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
QQ = ?config(queue_name, Config),
QQ2 = ?config(alt_queue_name, Config),
Limit = rpc:call(S1, file_handle_cache, get_limit, []),
ok = rpc:call(S1, file_handle_cache, set_limit, [3]),
ok = rpc:call(S2, file_handle_cache, set_limit, [3]),
ok = rpc:call(S3, file_handle_cache, set_limit, [3]),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ2, 0, 0},
declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]),
ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]),
ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]).
cleanup_data_dir(Config) ->
%% With Khepri this test needs to run in a 3-node cluster, otherwise the queue can't
%% be deleted in minority

View File

@ -725,29 +725,6 @@ duplicate_delivery_test(C) ->
?assertEqual(1, lqueue:len(Messages)),
ok.
state_enter_file_handle_leader_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
Resource = {resource, <<"/">>, queue, <<"test">>},
Effects = rabbit_fifo:state_enter(leader, S0),
?assertMatch([{mod_call, m, f, [a, the_name]},
_Timer,
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
| _], Effects),
ok.
state_enter_file_handle_other_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
Effects = rabbit_fifo:state_enter(other, S0),
?assertEqual([
{mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
],
Effects),
ok.
state_enter_monitors_and_notifications_test(C) ->
Oth = spawn(fun () -> ok end),
{State0, _} = enq(C, 1, 1, first, test_init(test)),
@ -1251,8 +1228,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(C) ->
Effects = rabbit_fifo:state_enter(leader, State1),
%% 2 effects for each consumer process (channel process), 1 effect for the node,
%% 1 effect for file handle reservation
?assertEqual(2 * 3 + 1 + 1 + 1 + 1, length(Effects)).
?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) ->
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
@ -1282,9 +1258,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) ->
Effects = rabbit_fifo:state_enter(eol, State1),
%% 1 effect for each consumer process (channel process),
%% 1 effect for file handle reservation
%% 1 effect for eol to handle rabbit_fifo_usage entries
?assertEqual(5, length(Effects)).
?assertEqual(4, length(Effects)).
query_consumers_test(C) ->
State0 = init(#{name => ?FUNCTION_NAME,

View File

@ -63,8 +63,6 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end),
meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(?RA_SYSTEM),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),

View File

@ -541,29 +541,6 @@ duplicate_delivery_test(_) ->
?assertEqual(1, maps:size(Messages)),
ok.
state_enter_file_handle_leader_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
Resource = {resource, <<"/">>, queue, <<"test">>},
Effects = rabbit_fifo_v0:state_enter(leader, S0),
?assertEqual([
{mod_call, m, f, [a, the_name]},
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
], Effects),
ok.
state_enter_file_handle_other_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
Effects = rabbit_fifo_v0:state_enter(other, S0),
?assertEqual([
{mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
],
Effects),
ok.
state_enter_monitors_and_notifications_test(_) ->
Oth = spawn(fun () -> ok end),
{State0, _} = enq(1, 1, first, test_init(test)),
@ -998,8 +975,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
Effects = rabbit_fifo_v0:state_enter(leader, State1),
%% 2 effects for each consumer process (channel process), 1 effect for the node,
%% 1 effect for file handle reservation
?assertEqual(2 * 3 + 1 + 1, length(Effects)).
?assertEqual(2 * 3 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
@ -1029,8 +1005,7 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
Effects = rabbit_fifo_v0:state_enter(eol, State1),
%% 1 effect for each consumer process (channel process),
%% 1 effect for file handle reservation
?assertEqual(4, length(Effects)).
?assertEqual(3, length(Effects)).
query_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,

View File

@ -27,7 +27,6 @@ groups() ->
file_handle_cache, %% Change FHC limit.
file_handle_cache_reserve,
file_handle_cache_reserve_release,
file_handle_cache_reserve_above_limit,
file_handle_cache_reserve_monitor,
file_handle_cache_reserve_open_file_above_limit
]}
@ -189,27 +188,6 @@ file_handle_cache_reserve_release1(_Config) ->
?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])),
passed.
file_handle_cache_reserve_above_limit(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, file_handle_cache_reserve_above_limit1, [Config]).
file_handle_cache_reserve_above_limit1(_Config) ->
Limit = file_handle_cache:get_limit(),
ok = file_handle_cache:set_limit(5),
%% Reserves are always accepted, even if above the limit
%% These are for special processes such as quorum queues
ok = file_handle_cache:obtain(5),
?assertEqual([{file_descriptor_limit, []}], rabbit_alarm:get_alarms()),
ok = file_handle_cache:set_reservation(7),
Props = file_handle_cache:info([files_reserved, sockets_used]),
?assertEqual(7, proplists:get_value(files_reserved, Props)),
?assertEqual(5, proplists:get_value(sockets_used, Props)),
ok = file_handle_cache:set_limit(Limit),
passed.
file_handle_cache_reserve_open_file_above_limit(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]).

View File

@ -65,8 +65,8 @@ gen_server2_with_state(Config) ->
?MODULE, gen_server2_with_state1, [Config]).
gen_server2_with_state1(_Config) ->
fhc_state = gen_server2:with_state(file_handle_cache,
fun (S) -> element(1, S) end),
state = gen_server2:with_state(background_gc,
fun (S) -> element(1, S) end),
passed.

View File

@ -17,7 +17,6 @@
-export([start_link/1, next_job_from/2, submit/3, submit_async/2,
run/1]).
-export([set_maximum_since_use/2]).
-export([set_timeout/2, set_timeout/3, clear_timeout/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -32,7 +31,6 @@
-spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
-spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'.
-spec run(fun (() -> A)) -> A; (mfargs()) -> any().
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
%%----------------------------------------------------------------------------
@ -53,9 +51,6 @@ submit(Pid, Fun, ProcessModel) ->
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun, self()}).
set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).
run({M, F, A}) -> apply(M, F, A);
run(Fun) -> Fun().
@ -76,15 +71,12 @@ run(Fun, single) ->
%%----------------------------------------------------------------------------
init([PoolName]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
ok = worker_pool:ready(PoolName, self()),
put(worker_pool_worker, true),
put(worker_pool_name, PoolName),
{ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
@ -120,10 +112,6 @@ handle_cast({submit_async, Fun, CPid}, {from, CPid, MRef}) ->
ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
{noreply, State, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.

View File

@ -164,12 +164,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do
"#{category}: #{IU.convert(val[:bytes], unit)} #{unit} (#{val[:percentage]} %)"
end)
file_descriptors = [
"\n#{bright("File Descriptors")}\n",
"Total: #{m[:file_descriptors][:total_used]}, limit: #{m[:file_descriptors][:total_limit]}",
"Sockets: #{m[:file_descriptors][:sockets_used]}, limit: #{m[:file_descriptors][:sockets_limit]}"
]
disk_space_section = [
"\n#{bright("Free Disk Space")}\n",
"Low free disk space watermark: #{IU.convert(m[:disk_free_limit], unit)} #{unit}",
@ -200,7 +194,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do
log_section ++
alarms_section ++
memory_section ++
file_descriptors ++ disk_space_section ++ totals_section ++ listeners_section
disk_space_section ++ totals_section ++ listeners_section
{:ok, Enum.join(lines, line_separator())}
end
@ -264,7 +258,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do
vm_memory_high_watermark_limit: Keyword.get(result, :vm_memory_limit),
disk_free_limit: Keyword.get(result, :disk_free_limit),
disk_free: Keyword.get(result, :disk_free),
file_descriptors: Enum.into(Keyword.get(result, :file_descriptors), %{}),
alarms: Keyword.get(result, :alarms),
listeners: listener_maps(Keyword.get(result, :listeners, [])),
memory: Keyword.get(result, :memory) |> Enum.into(%{}),

View File

@ -41,8 +41,7 @@
conserve = false :: boolean(),
stats_timer :: option(rabbit_event:state()),
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: option(binary()),
should_use_fhc :: rabbit_types:option(boolean())
conn_name :: option(binary())
}).
-type state() :: #state{}.
@ -82,15 +81,10 @@ init(Req, Opts) ->
true ->
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true),
case ShouldUseFHC of
true -> ?LOG_INFO("Web MQTT: file handle cache use is enabled");
false -> ?LOG_INFO("Web MQTT: file handle cache use is disabled")
end,
{?MODULE,
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
#state{socket = maps:get(proxy_header, Req, undefined), should_use_fhc = ShouldUseFHC},
#state{socket = maps:get(proxy_header, Req, undefined)},
WsOpts}
end
end.
@ -111,15 +105,8 @@ info(Pid, Items) ->
-spec websocket_init(state()) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
websocket_init(State0 = #state{socket = Sock, should_use_fhc = ShouldUseFHC}) ->
websocket_init(State0 = #state{socket = Sock}) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}),
case ShouldUseFHC of
true ->
ok = file_handle_cache:obtain();
false -> ok;
undefined ->
ok = file_handle_cache:obtain()
end,
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
@ -271,18 +258,10 @@ terminate(Reason, Request, #state{} = State) ->
terminate(_Reason, _Request,
{SendWill, #state{conn_name = ConnName,
proc_state = PState,
keepalive = KState,
should_use_fhc = ShouldUseFHC} = State}) ->
keepalive = KState} = State}) ->
?LOG_INFO("Web MQTT closing connection ~ts", [ConnName]),
maybe_emit_stats(State),
_ = rabbit_mqtt_keepalive:cancel_timer(KState),
case ShouldUseFHC of
true ->
ok = file_handle_cache:release();
false -> ok;
undefined ->
ok = file_handle_cache:release()
end,
case PState of
connect_packet_unprocessed ->
ok;
@ -296,12 +275,9 @@ terminate(_Reason, _Request,
no_supported_sub_protocol(Protocol, Req) ->
%% The client MUST include mqtt in the list of WebSocket Sub Protocols it offers [MQTT-6.0.0-3].
?LOG_ERROR("Web MQTT: 'mqtt' not included in client offered subprotocols: ~tp", [Protocol]),
%% Set should_use_fhc to false, because at this early stage of init no fhc
%% obtain was called, so terminate/3 should not call fhc release
%% (even if use_file_handle_cache is true)
{ok,
cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req),
#state{should_use_fhc = false}}.
#state{}}.
handle_data(Data, State0 = #state{}) ->
case handle_data1(Data, State0) of

View File

@ -42,8 +42,7 @@
peername,
auth_hd,
stats_timer,
connection,
should_use_fhc :: rabbit_types:option(boolean())
connection
}).
-define(APP, rabbitmq_web_stomp).
@ -84,11 +83,6 @@ init(Req0, Opts) ->
end,
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true),
case ShouldUseFHC of
true -> ?LOG_INFO("Web STOMP: file handle cache use is enabled");
false -> ?LOG_INFO("Web STOMP: file handle cache use is disabled")
end,
{?MODULE, Req, #state{
frame_type = proplists:get_value(type, Opts, text),
heartbeat_sup = KeepaliveSup,
@ -98,18 +92,10 @@ init(Req0, Opts) ->
conserve_resources = false,
socket = SockInfo,
peername = PeerAddr,
auth_hd = cowboy_req:header(<<"authorization">>, Req),
should_use_fhc = ShouldUseFHC
auth_hd = cowboy_req:header(<<"authorization">>, Req)
}, WsOpts}.
websocket_init(State = #state{should_use_fhc = ShouldUseFHC}) ->
case ShouldUseFHC of
true ->
ok = file_handle_cache:obtain();
false -> ok;
undefined ->
ok = file_handle_cache:obtain()
end,
websocket_init(State) ->
process_flag(trap_exit, true),
{ok, ProcessorState} = init_processor_state(State),
{ok, rabbit_event:init_stats_timer(
@ -330,15 +316,8 @@ maybe_block(State, _) ->
stop(State) ->
stop(State, 1000, "STOMP died").
stop(State = #state{proc_state = ProcState, should_use_fhc = ShouldUseFHC}, CloseCode, Error0) ->
stop(State = #state{proc_state = ProcState}, CloseCode, Error0) ->
maybe_emit_stats(State),
case ShouldUseFHC of
true ->
ok = file_handle_cache:release();
false -> ok;
undefined ->
ok = file_handle_cache:release()
end,
_ = rabbit_stomp_processor:flush_and_die(ProcState),
Error1 = rabbit_data_coercion:to_binary(Error0),
{[{close, CloseCode, Error1}], State}.