CQ: Retry opening write file when flushing buffers

On Windows the file may be in "DELETE PENDING" state following
its deletion (when the last message was acked). A subsequent
message leads us to writing to that file again but we can't
and get an {error,eacces}. In that case we wait 10ms and retry
up to 3 times.

(cherry picked from commit ff8ecf1cf7)
This commit is contained in:
Loïc Hoguin 2025-06-23 14:17:38 +02:00 committed by Mergify
parent f5bf2405d5
commit 5d2c09c96b
1 changed files with 20 additions and 1 deletions

View File

@ -194,6 +194,25 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
false -> State
end.
open_eventually(File, Modes) ->
open_eventually(File, Modes, 3).
open_eventually(_, _, 0) ->
{error, eacces};
open_eventually(File, Modes, N) ->
case file:open(File, Modes) of
OK = {ok, _} ->
OK;
%% When the current write file was recently deleted it
%% is possible on Windows to get an {error,eacces}.
%% Sometimes Windows sets the files to "DELETE PENDING"
%% state and delays deletion a bit. So we wait 10ms and
%% try again up to 3 times.
{error, eacces} ->
timer:sleep(10),
open_eventually(File, Modes, N - 1)
end.
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
State;
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
@ -204,7 +223,7 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
%% Then we do the writes for each segment.
State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
{ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]),
{ok, Fd} = open_eventually(segment_file(Segment, FoldState), [read, write, raw, binary]),
case file:position(Fd, eof) of
{ok, 0} ->
%% We write the file header if it does not exist.