CQ: Write large messages into their own files
This commit is contained in:
parent
18acc01a47
commit
d45fbc3da4
|
@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
|
|||
%% or the non-current files. If the message *is* in the
|
||||
%% current file then the cache entry will be removed by
|
||||
%% the normal logic for that in write_message/4 and
|
||||
%% maybe_roll_to_new_file/2.
|
||||
%% flush_or_roll_to_new_file/2.
|
||||
case index_lookup(MsgId, State) of
|
||||
#msg_location { file = File }
|
||||
when File == State #msstate.current_file ->
|
||||
|
@ -1208,26 +1208,123 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
|
|||
gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
|
||||
State#msstate{ gc_candidates = Candidates#{ File => true }}.
|
||||
|
||||
write_message(MsgId, Msg,
|
||||
State0 = #msstate { current_file_handle = CurHdl,
|
||||
%% This value must be smaller enough than ?SCAN_BLOCK_SIZE
|
||||
%% to ensure we only ever need 2 reads when scanning files.
|
||||
%% Hence the choice of 4MB here and 4MiB there, the difference
|
||||
%% in size being more than enough to ensure that property.
|
||||
-define(LARGE_MESSAGE_THRESHOLD, 4000000). %% 4MB.
|
||||
|
||||
write_message(MsgId, MsgBody, State) ->
|
||||
MsgBodyBin = term_to_binary(MsgBody),
|
||||
%% Large messages get written to their own files.
|
||||
if
|
||||
byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD ->
|
||||
write_large_message(MsgId, MsgBodyBin, State);
|
||||
true ->
|
||||
write_small_message(MsgId, MsgBodyBin, State)
|
||||
end.
|
||||
|
||||
write_small_message(MsgId, MsgBodyBin,
|
||||
State = #msstate { current_file_handle = CurHdl,
|
||||
current_file = CurFile,
|
||||
current_file_offset = CurOffset,
|
||||
file_summary_ets = FileSummaryEts }) ->
|
||||
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
|
||||
State = case MaybeFlush of
|
||||
flush -> internal_sync(State0);
|
||||
ok -> State0
|
||||
end,
|
||||
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
|
||||
ok = index_insert(
|
||||
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
|
||||
offset = CurOffset, total_size = TotalSize }, State),
|
||||
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
|
||||
[{#file_summary.valid_total_size, TotalSize},
|
||||
{#file_summary.file_size, TotalSize}]),
|
||||
maybe_roll_to_new_file(CurOffset + TotalSize,
|
||||
flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush,
|
||||
State #msstate {
|
||||
current_file_offset = CurOffset + TotalSize }).
|
||||
|
||||
flush_or_roll_to_new_file(
|
||||
Offset, _MaybeFlush,
|
||||
State = #msstate { dir = Dir,
|
||||
current_file_handle = CurHdl,
|
||||
current_file = CurFile,
|
||||
file_summary_ets = FileSummaryEts,
|
||||
cur_file_cache_ets = CurFileCacheEts,
|
||||
file_size_limit = FileSizeLimit })
|
||||
when Offset >= FileSizeLimit ->
|
||||
State1 = internal_sync(State),
|
||||
ok = writer_close(CurHdl),
|
||||
NextFile = CurFile + 1,
|
||||
{ok, NextHdl} = writer_open(Dir, NextFile),
|
||||
true = ets:insert_new(FileSummaryEts, #file_summary {
|
||||
file = NextFile,
|
||||
valid_total_size = 0,
|
||||
file_size = 0,
|
||||
locked = false }),
|
||||
%% Delete messages from the cache that were written to disk.
|
||||
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
|
||||
State1 #msstate { current_file_handle = NextHdl,
|
||||
current_file = NextFile,
|
||||
current_file_offset = 0 };
|
||||
%% If we need to flush, do so here.
|
||||
flush_or_roll_to_new_file(_, flush, State) ->
|
||||
internal_sync(State);
|
||||
flush_or_roll_to_new_file(_, _, State) ->
|
||||
State.
|
||||
|
||||
write_large_message(MsgId, MsgBodyBin,
|
||||
State0 = #msstate { dir = Dir,
|
||||
current_file_handle = CurHdl,
|
||||
current_file = CurFile,
|
||||
current_file_offset = CurOffset,
|
||||
file_summary_ets = FileSummaryEts,
|
||||
cur_file_cache_ets = CurFileCacheEts }) ->
|
||||
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
|
||||
%% We haven't written in the file yet. Use it.
|
||||
0 ->
|
||||
{CurFile, CurHdl};
|
||||
%% Flush the current file and close it. Open a new file.
|
||||
_ ->
|
||||
ok = writer_flush(CurHdl),
|
||||
ok = writer_close(CurHdl),
|
||||
LargeMsgFile0 = CurFile + 1,
|
||||
{ok, LargeMsgHdl0} = writer_open(Dir, LargeMsgFile0),
|
||||
{LargeMsgFile0, LargeMsgHdl0}
|
||||
end,
|
||||
%% Write the message directly and close the file.
|
||||
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
|
||||
ok = writer_close(LargeMsgHdl),
|
||||
%% Update ets with the new information.
|
||||
ok = index_insert(
|
||||
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
|
||||
offset = 0, total_size = TotalSize }, State0),
|
||||
_ = case CurFile of
|
||||
%% We didn't open a new file. We must update the existing value.
|
||||
LargeMsgFile ->
|
||||
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
|
||||
[{#file_summary.valid_total_size, TotalSize},
|
||||
{#file_summary.file_size, TotalSize}]);
|
||||
%% We opened a new file. We can insert it all at once.
|
||||
_ ->
|
||||
true = ets:insert_new(FileSummaryEts, #file_summary {
|
||||
file = LargeMsgFile,
|
||||
valid_total_size = TotalSize,
|
||||
file_size = TotalSize,
|
||||
locked = false })
|
||||
end,
|
||||
%% Roll over to the next file.
|
||||
NextFile = LargeMsgFile + 1,
|
||||
{ok, NextHdl} = writer_open(Dir, NextFile),
|
||||
true = ets:insert_new(FileSummaryEts, #file_summary {
|
||||
file = NextFile,
|
||||
valid_total_size = 0,
|
||||
file_size = 0,
|
||||
locked = false }),
|
||||
%% Delete messages from the cache that were written to disk.
|
||||
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
|
||||
%% Process confirms (this won't flush; we already did) and continue.
|
||||
State = internal_sync(State0),
|
||||
State #msstate { current_file_handle = NextHdl,
|
||||
current_file = NextFile,
|
||||
current_file_offset = 0 }.
|
||||
|
||||
contains_message(MsgId, From, State) ->
|
||||
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
|
||||
gen_server2:reply(From, MsgLocation =/= not_found),
|
||||
|
@ -1325,8 +1422,7 @@ writer_recover(Dir, Num, Offset) ->
|
|||
ok = file:truncate(Fd),
|
||||
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.
|
||||
|
||||
writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
|
||||
MsgBodyBin = term_to_binary(MsgBody),
|
||||
writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) ->
|
||||
MsgBodyBinSize = byte_size(MsgBodyBin),
|
||||
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
|
||||
%% We send an iovec to the buffer instead of building a binary.
|
||||
|
@ -1354,6 +1450,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
|
|||
file:write(Fd, prim_buffer:read_iovec(Buffer, Size))
|
||||
end.
|
||||
|
||||
%% For large messages we don't buffer anything. Large messages
|
||||
%% are kept within their own files.
|
||||
%%
|
||||
%% This is basically the same as writer_append except no buffering.
|
||||
writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) ->
|
||||
MsgBodyBinSize = byte_size(MsgBodyBin),
|
||||
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
|
||||
ok = file:write(Fd, [
|
||||
<<EntrySize:64>>,
|
||||
MsgId,
|
||||
MsgBodyBin,
|
||||
<<255>> %% OK marker.
|
||||
]),
|
||||
EntrySize + 9.
|
||||
|
||||
writer_close(#writer{fd = Fd}) ->
|
||||
file:close(Fd).
|
||||
|
||||
|
@ -1700,33 +1811,6 @@ rebuild_index(Gatherer, Files, State) ->
|
|||
%% garbage collection / compaction / aggregation -- internal
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
maybe_roll_to_new_file(
|
||||
Offset,
|
||||
State = #msstate { dir = Dir,
|
||||
current_file_handle = CurHdl,
|
||||
current_file = CurFile,
|
||||
file_summary_ets = FileSummaryEts,
|
||||
cur_file_cache_ets = CurFileCacheEts,
|
||||
file_size_limit = FileSizeLimit })
|
||||
when Offset >= FileSizeLimit ->
|
||||
State1 = internal_sync(State),
|
||||
ok = writer_close(CurHdl),
|
||||
NextFile = CurFile + 1,
|
||||
{ok, NextHdl} = writer_open(Dir, NextFile),
|
||||
true = ets:insert_new(FileSummaryEts, #file_summary {
|
||||
file = NextFile,
|
||||
valid_total_size = 0,
|
||||
file_size = 0,
|
||||
locked = false }),
|
||||
%% We only delete messages from the cache that were written to disk
|
||||
%% in the previous file.
|
||||
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
|
||||
State1 #msstate { current_file_handle = NextHdl,
|
||||
current_file = NextFile,
|
||||
current_file_offset = 0 };
|
||||
maybe_roll_to_new_file(_, State) ->
|
||||
State.
|
||||
|
||||
%% We keep track of files that have seen removes and
|
||||
%% check those periodically for compaction. We only
|
||||
%% compact files that have less than half valid data.
|
||||
|
|
Loading…
Reference in New Issue