Add file_handle_cache FD reservations
This commit is contained in:
parent
d00e97d6d8
commit
cf080b9937
|
|
@ -393,6 +393,7 @@ terminate(VHost, Terms, State0 = #mqistate { dir = Dir,
|
||||||
ok = file:sync(Fd),
|
ok = file:sync(Fd),
|
||||||
ok = file:close(Fd)
|
ok = file:close(Fd)
|
||||||
end, OpenFds),
|
end, OpenFds),
|
||||||
|
file_handle_cache:release_reservation(),
|
||||||
%% Write recovery terms for faster recovery.
|
%% Write recovery terms for faster recovery.
|
||||||
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
|
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
|
||||||
[{mqi_state, {?VERSION, Segments}} | Terms]),
|
[{mqi_state, {?VERSION, Segments}} | Terms]),
|
||||||
|
|
@ -408,6 +409,7 @@ delete_and_terminate(State = #mqistate { dir = Dir,
|
||||||
_ = maps:map(fun(_, Fd) ->
|
_ = maps:map(fun(_, Fd) ->
|
||||||
ok = file:close(Fd)
|
ok = file:close(Fd)
|
||||||
end, OpenFds),
|
end, OpenFds),
|
||||||
|
file_handle_cache:release_reservation(),
|
||||||
%% Erase the data on disk.
|
%% Erase the data on disk.
|
||||||
ok = erase_index_dir(Dir),
|
ok = erase_index_dir(Dir),
|
||||||
State#mqistate{ segments = #{},
|
State#mqistate{ segments = #{},
|
||||||
|
|
@ -484,9 +486,18 @@ new_segment_file(Segment, State = #mqistate{ segments = Segments }) ->
|
||||||
%% using too many FDs when the consumer lags a lot. We
|
%% using too many FDs when the consumer lags a lot. We
|
||||||
%% limit at 4 because we try to keep up to 2 for reading
|
%% limit at 4 because we try to keep up to 2 for reading
|
||||||
%% and 2 for writing.
|
%% and 2 for writing.
|
||||||
reduce_fd_usage(_, State = #mqistate{ fds = OpenFds })
|
reduce_fd_usage(SegmentToOpen, State = #mqistate{ fds = OpenFds })
|
||||||
when map_size(OpenFds) < 4 ->
|
when map_size(OpenFds) < 4 ->
|
||||||
|
%% The only case where we need to update reservations is
|
||||||
|
%% when we are opening a segment that wasn't already open,
|
||||||
|
%% and we are not closing another segment at the same time.
|
||||||
|
case OpenFds of
|
||||||
|
#{SegmentToOpen := _} ->
|
||||||
State;
|
State;
|
||||||
|
_ ->
|
||||||
|
file_handle_cache:set_reservation(map_size(OpenFds) + 1),
|
||||||
|
State
|
||||||
|
end;
|
||||||
reduce_fd_usage(SegmentToOpen, State = #mqistate{ fds = OpenFds0 }) ->
|
reduce_fd_usage(SegmentToOpen, State = #mqistate{ fds = OpenFds0 }) ->
|
||||||
case OpenFds0 of
|
case OpenFds0 of
|
||||||
#{SegmentToOpen := _} ->
|
#{SegmentToOpen := _} ->
|
||||||
|
|
@ -719,6 +730,10 @@ delete_segment(Segment, State0 = #mqistate{ fds = OpenFds0 }) ->
|
||||||
State = case maps:take(Segment, OpenFds0) of
|
State = case maps:take(Segment, OpenFds0) of
|
||||||
{Fd, OpenFds} ->
|
{Fd, OpenFds} ->
|
||||||
ok = file:close(Fd),
|
ok = file:close(Fd),
|
||||||
|
case map_size(OpenFds) of
|
||||||
|
0 -> file_handle_cache:release_reservation();
|
||||||
|
N -> file_handle_cache:set_reservation(N)
|
||||||
|
end,
|
||||||
State0#mqistate{ fds = OpenFds };
|
State0#mqistate{ fds = OpenFds };
|
||||||
error ->
|
error ->
|
||||||
State0
|
State0
|
||||||
|
|
|
||||||
|
|
@ -124,8 +124,8 @@ setup_file_handle_cache(Config) ->
|
||||||
|
|
||||||
setup_file_handle_cache1() ->
|
setup_file_handle_cache1() ->
|
||||||
%% FIXME: Why are we doing this?
|
%% FIXME: Why are we doing this?
|
||||||
application:set_env(rabbit, file_handles_high_watermark, 10),
|
application:set_env(rabbit, file_handles_high_watermark, 100),
|
||||||
ok = file_handle_cache:set_limit(10),
|
ok = file_handle_cache:set_limit(100),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
end_per_group(Group, Config) ->
|
end_per_group(Group, Config) ->
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue