diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index fdd09b1d29..482e9cfa4f 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -559,7 +559,19 @@ consolidate_reads([], Acc) -> read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts, client_ref = Ref }, Acc, File) -> mark_handle_closed(FileHandlesEts, File, Ref), - read_many_disk(MsgIds, CState, Acc). + %% We go back to reading from the cache rather than from disk + %% because it is possible that messages are not in a perfect + %% order of cache->disk. For example, a fanout message written + %% to a previous file by another queue, but then referenced by + %% our main queue in between newly written messages: our main + %% queue would write MsgA, MsgB, MsgFanout, MsgC, MsgD to the + %% current file, then when trying to read from that same current + %% file, it would get MsgA and MsgB from the cache; MsgFanout + %% from the previous file; and MsgC and MsgD from the cache + %% again. So the correct action here is not to continue reading + %% from disk but instead to go back to the cache to get MsgC + %% and MsgD. + read_many_cache(MsgIds, CState, Acc). -spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean(). diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 06f807a297..0356442967 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -62,6 +62,7 @@ groups() -> [ {backing_queue_tests, [], [ msg_store, + msg_store_read_many_fanout, msg_store_file_scan, {backing_queue_v2, [], Common ++ V2Only} ]} @@ -320,6 +321,68 @@ msg_store1(_Config) -> restart_msg_store_empty(), passed. +msg_store_read_many_fanout(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, msg_store_read_many_fanout1, [Config]). + +msg_store_read_many_fanout1(_Config) -> + GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end, + GenRef = fun() -> GenRefFun(msc) end, + %% We will fill the first message store file with random messages + %% + 1 fanout message (written once for now). We will then write + %% two messages from our queue, then the fanout message (to +1 + %% from our queue), and two more messages. We expect all messages + %% from our queue to be in the current write file, except the + %% fanout message. We then try to read the messages. + restart_msg_store_empty(), + CRef1 = rabbit_guid:gen(), + CRef2 = rabbit_guid:gen(), + {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), + PayloadSizeBits = 65536, + Payload = <<0:PayloadSizeBits>>, + %% @todo -7 because -1 and -hd, fix better. + NumRandomMsgs = (FileSize div (PayloadSizeBits div 8)) - 1, + RandomMsgIds = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, NumRandomMsgs)], + FanoutMsgId = {GenRef(), msg_id_bin(NumRandomMsgs + 1)}, + [Q1, Q2, Q3, Q4] = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(NumRandomMsgs + 2, NumRandomMsgs + 5)], + QueueMsgIds0 = [Q1, Q2] ++ [FanoutMsgId] ++ [Q3, Q4], + QueueMsgIds = [{GenRef(), M} || {_, M} <- QueueMsgIds0], + BasicMsgFun = fun(MsgId) -> + Ex = rabbit_misc:r(<<>>, exchange, <<>>), + BasicMsg = rabbit_basic:message(Ex, <<>>, + #'P_basic'{delivery_mode = 2}, + Payload), + {ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content), + mc:set_annotation(id, MsgId, Msg0) + end, + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, CRef1, + fun (MSCStateM) -> + [begin + Msg = BasicMsgFun(MsgId), + ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM) + end || {SeqId, MsgId} <- [FanoutMsgId] ++ RandomMsgIds], + MSCStateM + end), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, CRef2, + fun (MSCStateM) -> + [begin + Msg = BasicMsgFun(MsgId), + ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM) + end || {SeqId, MsgId} <- QueueMsgIds], + MSCStateM + end), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, CRef2, + fun (MSCStateM) -> + QueueOnlyMsgIds = [M || {_, M} <- QueueMsgIds], + {#{}, MSCStateN} = rabbit_msg_store:read_many( + QueueOnlyMsgIds, MSCStateM), + MSCStateN + end), + passed. + restart_msg_store_empty() -> ok = rabbit_variable_queue:stop_msg_store(?VHOST), ok = rabbit_variable_queue:start_msg_store(?VHOST,