CQ: Retry opening write file when writing messages
Followup toff8ecf1cf7
only this time it's for the index. (cherry picked from commitaf8c0af408
)
This commit is contained in:
parent
d78f61ac6e
commit
de44109ba0
|
@ -591,7 +591,9 @@ publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, TargetRamCou
|
||||||
new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) ->
|
new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) ->
|
||||||
#qi{ fds = OpenFds } = reduce_fd_usage(Segment, State),
|
#qi{ fds = OpenFds } = reduce_fd_usage(Segment, State),
|
||||||
false = maps:is_key(Segment, OpenFds), %% assert
|
false = maps:is_key(Segment, OpenFds), %% assert
|
||||||
{ok, Fd} = file:open(segment_file(Segment, State), [read, write, raw, binary]),
|
{ok, Fd} = rabbit_file:open_eventually(
|
||||||
|
segment_file(Segment, State),
|
||||||
|
[read, write, raw, binary]),
|
||||||
%% We then write the segment file header. It contains
|
%% We then write the segment file header. It contains
|
||||||
%% some useful info and some reserved bytes for future use.
|
%% some useful info and some reserved bytes for future use.
|
||||||
%% We currently do not make use of this information. It is
|
%% We currently do not make use of this information. It is
|
||||||
|
|
|
@ -194,25 +194,6 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
|
||||||
false -> State
|
false -> State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
open_eventually(File, Modes) ->
|
|
||||||
open_eventually(File, Modes, 3).
|
|
||||||
|
|
||||||
open_eventually(_, _, 0) ->
|
|
||||||
{error, eacces};
|
|
||||||
open_eventually(File, Modes, N) ->
|
|
||||||
case file:open(File, Modes) of
|
|
||||||
OK = {ok, _} ->
|
|
||||||
OK;
|
|
||||||
%% When the current write file was recently deleted it
|
|
||||||
%% is possible on Windows to get an {error,eacces}.
|
|
||||||
%% Sometimes Windows sets the files to "DELETE PENDING"
|
|
||||||
%% state and delays deletion a bit. So we wait 10ms and
|
|
||||||
%% try again up to 3 times.
|
|
||||||
{error, eacces} ->
|
|
||||||
timer:sleep(10),
|
|
||||||
open_eventually(File, Modes, N - 1)
|
|
||||||
end.
|
|
||||||
|
|
||||||
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
|
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
|
||||||
State;
|
State;
|
||||||
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
|
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
|
||||||
|
@ -223,7 +204,9 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
|
||||||
Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
|
Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
|
||||||
%% Then we do the writes for each segment.
|
%% Then we do the writes for each segment.
|
||||||
State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
|
State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
|
||||||
{ok, Fd} = open_eventually(segment_file(Segment, FoldState), [read, write, raw, binary]),
|
{ok, Fd} = rabbit_file:open_eventually(
|
||||||
|
segment_file(Segment, FoldState),
|
||||||
|
[read, write, raw, binary]),
|
||||||
case file:position(Fd, eof) of
|
case file:position(Fd, eof) of
|
||||||
{ok, 0} ->
|
{ok, 0} ->
|
||||||
%% We write the file header if it does not exist.
|
%% We write the file header if it does not exist.
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-export([read_file_info/1]).
|
-export([read_file_info/1]).
|
||||||
-export([filename_as_a_directory/1]).
|
-export([filename_as_a_directory/1]).
|
||||||
-export([filename_to_binary/1, binary_to_filename/1]).
|
-export([filename_to_binary/1, binary_to_filename/1]).
|
||||||
|
-export([open_eventually/2]).
|
||||||
|
|
||||||
-define(TMP_EXT, ".tmp").
|
-define(TMP_EXT, ".tmp").
|
||||||
|
|
||||||
|
@ -338,3 +339,34 @@ binary_to_filename(Bin) when is_binary(Bin) ->
|
||||||
Other ->
|
Other ->
|
||||||
erlang:error(Other)
|
erlang:error(Other)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% On Windows the file may be in "DELETE PENDING" state following
|
||||||
|
%% its deletion (when the last message was acked). A subsequent
|
||||||
|
%% open may fail with an {error,eacces}. In that case we wait 10ms
|
||||||
|
%% and retry up to 3 times.
|
||||||
|
|
||||||
|
-spec open_eventually(File, Modes) -> {ok, IoDevice} | {error, Reason} when
|
||||||
|
File :: Filename | iodata(),
|
||||||
|
Filename :: file:name_all(),
|
||||||
|
Modes :: [file:mode() | ram | directory],
|
||||||
|
IoDevice :: file:io_device(),
|
||||||
|
Reason :: file:posix() | badarg | system_limit.
|
||||||
|
|
||||||
|
open_eventually(File, Modes) ->
|
||||||
|
open_eventually(File, Modes, 3).
|
||||||
|
|
||||||
|
open_eventually(_, _, 0) ->
|
||||||
|
{error, eacces};
|
||||||
|
open_eventually(File, Modes, N) ->
|
||||||
|
case file:open(File, Modes) of
|
||||||
|
OK = {ok, _} ->
|
||||||
|
OK;
|
||||||
|
%% When the current write file was recently deleted it
|
||||||
|
%% is possible on Windows to get an {error,eacces}.
|
||||||
|
%% Sometimes Windows sets the files to "DELETE PENDING"
|
||||||
|
%% state and delays deletion a bit. So we wait 10ms and
|
||||||
|
%% try again up to 3 times.
|
||||||
|
{error, eacces} ->
|
||||||
|
timer:sleep(10),
|
||||||
|
open_eventually(File, Modes, N - 1)
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue