CQ: Fix rare eof crash of message store with fanout
(cherry picked from commit 7138e8a0cc
)
This commit is contained in:
parent
5139360cf7
commit
98ff98f7f3
|
@ -559,7 +559,19 @@ consolidate_reads([], Acc) ->
|
|||
read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts,
|
||||
client_ref = Ref }, Acc, File) ->
|
||||
mark_handle_closed(FileHandlesEts, File, Ref),
|
||||
read_many_disk(MsgIds, CState, Acc).
|
||||
%% We go back to reading from the cache rather than from disk
|
||||
%% because it is possible that messages are not in a perfect
|
||||
%% order of cache->disk. For example, a fanout message written
|
||||
%% to a previous file by another queue, but then referenced by
|
||||
%% our main queue in between newly written messages: our main
|
||||
%% queue would write MsgA, MsgB, MsgFanout, MsgC, MsgD to the
|
||||
%% current file, then when trying to read from that same current
|
||||
%% file, it would get MsgA and MsgB from the cache; MsgFanout
|
||||
%% from the previous file; and MsgC and MsgD from the cache
|
||||
%% again. So the correct action here is not to continue reading
|
||||
%% from disk but instead to go back to the cache to get MsgC
|
||||
%% and MsgD.
|
||||
read_many_cache(MsgIds, CState, Acc).
|
||||
|
||||
-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ groups() ->
|
|||
[
|
||||
{backing_queue_tests, [], [
|
||||
msg_store,
|
||||
msg_store_read_many_fanout,
|
||||
msg_store_file_scan,
|
||||
{backing_queue_v2, [], Common ++ V2Only}
|
||||
]}
|
||||
|
@ -320,6 +321,68 @@ msg_store1(_Config) ->
|
|||
restart_msg_store_empty(),
|
||||
passed.
|
||||
|
||||
msg_store_read_many_fanout(Config) ->
|
||||
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
?MODULE, msg_store_read_many_fanout1, [Config]).
|
||||
|
||||
msg_store_read_many_fanout1(_Config) ->
|
||||
GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
|
||||
GenRef = fun() -> GenRefFun(msc) end,
|
||||
%% We will fill the first message store file with random messages
|
||||
%% + 1 fanout message (written once for now). We will then write
|
||||
%% two messages from our queue, then the fanout message (to +1
|
||||
%% from our queue), and two more messages. We expect all messages
|
||||
%% from our queue to be in the current write file, except the
|
||||
%% fanout message. We then try to read the messages.
|
||||
restart_msg_store_empty(),
|
||||
CRef1 = rabbit_guid:gen(),
|
||||
CRef2 = rabbit_guid:gen(),
|
||||
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
|
||||
PayloadSizeBits = 65536,
|
||||
Payload = <<0:PayloadSizeBits>>,
|
||||
%% @todo -7 because -1 and -hd, fix better.
|
||||
NumRandomMsgs = (FileSize div (PayloadSizeBits div 8)) - 1,
|
||||
RandomMsgIds = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, NumRandomMsgs)],
|
||||
FanoutMsgId = {GenRef(), msg_id_bin(NumRandomMsgs + 1)},
|
||||
[Q1, Q2, Q3, Q4] = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(NumRandomMsgs + 2, NumRandomMsgs + 5)],
|
||||
QueueMsgIds0 = [Q1, Q2] ++ [FanoutMsgId] ++ [Q3, Q4],
|
||||
QueueMsgIds = [{GenRef(), M} || {_, M} <- QueueMsgIds0],
|
||||
BasicMsgFun = fun(MsgId) ->
|
||||
Ex = rabbit_misc:r(<<>>, exchange, <<>>),
|
||||
BasicMsg = rabbit_basic:message(Ex, <<>>,
|
||||
#'P_basic'{delivery_mode = 2},
|
||||
Payload),
|
||||
{ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
|
||||
mc:set_annotation(id, MsgId, Msg0)
|
||||
end,
|
||||
ok = with_msg_store_client(
|
||||
?PERSISTENT_MSG_STORE, CRef1,
|
||||
fun (MSCStateM) ->
|
||||
[begin
|
||||
Msg = BasicMsgFun(MsgId),
|
||||
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
||||
end || {SeqId, MsgId} <- [FanoutMsgId] ++ RandomMsgIds],
|
||||
MSCStateM
|
||||
end),
|
||||
ok = with_msg_store_client(
|
||||
?PERSISTENT_MSG_STORE, CRef2,
|
||||
fun (MSCStateM) ->
|
||||
[begin
|
||||
Msg = BasicMsgFun(MsgId),
|
||||
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
||||
end || {SeqId, MsgId} <- QueueMsgIds],
|
||||
MSCStateM
|
||||
end),
|
||||
ok = with_msg_store_client(
|
||||
?PERSISTENT_MSG_STORE, CRef2,
|
||||
fun (MSCStateM) ->
|
||||
QueueOnlyMsgIds = [M || {_, M} <- QueueMsgIds],
|
||||
{#{}, MSCStateN} = rabbit_msg_store:read_many(
|
||||
QueueOnlyMsgIds, MSCStateM),
|
||||
MSCStateN
|
||||
end),
|
||||
passed.
|
||||
|
||||
restart_msg_store_empty() ->
|
||||
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
|
||||
ok = rabbit_variable_queue:start_msg_store(?VHOST,
|
||||
|
|
Loading…
Reference in New Issue