Merge pull request #13772 from rabbitmq/mergify/bp/v4.1.x/pr-13771
CQ: Fix rare eof crash of message store with fanout (backport #13771)
This commit is contained in:
commit
4639e9daa7
|
@ -559,7 +559,19 @@ consolidate_reads([], Acc) ->
|
||||||
read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts,
|
read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts,
|
||||||
client_ref = Ref }, Acc, File) ->
|
client_ref = Ref }, Acc, File) ->
|
||||||
mark_handle_closed(FileHandlesEts, File, Ref),
|
mark_handle_closed(FileHandlesEts, File, Ref),
|
||||||
read_many_disk(MsgIds, CState, Acc).
|
%% We go back to reading from the cache rather than from disk
|
||||||
|
%% because it is possible that messages are not in a perfect
|
||||||
|
%% order of cache->disk. For example, a fanout message written
|
||||||
|
%% to a previous file by another queue, but then referenced by
|
||||||
|
%% our main queue in between newly written messages: our main
|
||||||
|
%% queue would write MsgA, MsgB, MsgFanout, MsgC, MsgD to the
|
||||||
|
%% current file, then when trying to read from that same current
|
||||||
|
%% file, it would get MsgA and MsgB from the cache; MsgFanout
|
||||||
|
%% from the previous file; and MsgC and MsgD from the cache
|
||||||
|
%% again. So the correct action here is not to continue reading
|
||||||
|
%% from disk but instead to go back to the cache to get MsgC
|
||||||
|
%% and MsgD.
|
||||||
|
read_many_cache(MsgIds, CState, Acc).
|
||||||
|
|
||||||
-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
|
-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ groups() ->
|
||||||
[
|
[
|
||||||
{backing_queue_tests, [], [
|
{backing_queue_tests, [], [
|
||||||
msg_store,
|
msg_store,
|
||||||
|
msg_store_read_many_fanout,
|
||||||
msg_store_file_scan,
|
msg_store_file_scan,
|
||||||
{backing_queue_v2, [], Common ++ V2Only}
|
{backing_queue_v2, [], Common ++ V2Only}
|
||||||
]}
|
]}
|
||||||
|
@ -320,6 +321,68 @@ msg_store1(_Config) ->
|
||||||
restart_msg_store_empty(),
|
restart_msg_store_empty(),
|
||||||
passed.
|
passed.
|
||||||
|
|
||||||
|
msg_store_read_many_fanout(Config) ->
|
||||||
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
|
?MODULE, msg_store_read_many_fanout1, [Config]).
|
||||||
|
|
||||||
|
msg_store_read_many_fanout1(_Config) ->
|
||||||
|
GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
|
||||||
|
GenRef = fun() -> GenRefFun(msc) end,
|
||||||
|
%% We will fill the first message store file with random messages
|
||||||
|
%% + 1 fanout message (written once for now). We will then write
|
||||||
|
%% two messages from our queue, then the fanout message (to +1
|
||||||
|
%% from our queue), and two more messages. We expect all messages
|
||||||
|
%% from our queue to be in the current write file, except the
|
||||||
|
%% fanout message. We then try to read the messages.
|
||||||
|
restart_msg_store_empty(),
|
||||||
|
CRef1 = rabbit_guid:gen(),
|
||||||
|
CRef2 = rabbit_guid:gen(),
|
||||||
|
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
|
||||||
|
PayloadSizeBits = 65536,
|
||||||
|
Payload = <<0:PayloadSizeBits>>,
|
||||||
|
%% @todo -7 because -1 and -hd, fix better.
|
||||||
|
NumRandomMsgs = (FileSize div (PayloadSizeBits div 8)) - 1,
|
||||||
|
RandomMsgIds = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, NumRandomMsgs)],
|
||||||
|
FanoutMsgId = {GenRef(), msg_id_bin(NumRandomMsgs + 1)},
|
||||||
|
[Q1, Q2, Q3, Q4] = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(NumRandomMsgs + 2, NumRandomMsgs + 5)],
|
||||||
|
QueueMsgIds0 = [Q1, Q2] ++ [FanoutMsgId] ++ [Q3, Q4],
|
||||||
|
QueueMsgIds = [{GenRef(), M} || {_, M} <- QueueMsgIds0],
|
||||||
|
BasicMsgFun = fun(MsgId) ->
|
||||||
|
Ex = rabbit_misc:r(<<>>, exchange, <<>>),
|
||||||
|
BasicMsg = rabbit_basic:message(Ex, <<>>,
|
||||||
|
#'P_basic'{delivery_mode = 2},
|
||||||
|
Payload),
|
||||||
|
{ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
|
||||||
|
mc:set_annotation(id, MsgId, Msg0)
|
||||||
|
end,
|
||||||
|
ok = with_msg_store_client(
|
||||||
|
?PERSISTENT_MSG_STORE, CRef1,
|
||||||
|
fun (MSCStateM) ->
|
||||||
|
[begin
|
||||||
|
Msg = BasicMsgFun(MsgId),
|
||||||
|
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
||||||
|
end || {SeqId, MsgId} <- [FanoutMsgId] ++ RandomMsgIds],
|
||||||
|
MSCStateM
|
||||||
|
end),
|
||||||
|
ok = with_msg_store_client(
|
||||||
|
?PERSISTENT_MSG_STORE, CRef2,
|
||||||
|
fun (MSCStateM) ->
|
||||||
|
[begin
|
||||||
|
Msg = BasicMsgFun(MsgId),
|
||||||
|
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
||||||
|
end || {SeqId, MsgId} <- QueueMsgIds],
|
||||||
|
MSCStateM
|
||||||
|
end),
|
||||||
|
ok = with_msg_store_client(
|
||||||
|
?PERSISTENT_MSG_STORE, CRef2,
|
||||||
|
fun (MSCStateM) ->
|
||||||
|
QueueOnlyMsgIds = [M || {_, M} <- QueueMsgIds],
|
||||||
|
{#{}, MSCStateN} = rabbit_msg_store:read_many(
|
||||||
|
QueueOnlyMsgIds, MSCStateM),
|
||||||
|
MSCStateN
|
||||||
|
end),
|
||||||
|
passed.
|
||||||
|
|
||||||
restart_msg_store_empty() ->
|
restart_msg_store_empty() ->
|
||||||
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
|
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
|
||||||
ok = rabbit_variable_queue:start_msg_store(?VHOST,
|
ok = rabbit_variable_queue:start_msg_store(?VHOST,
|
||||||
|
|
Loading…
Reference in New Issue