rabbitmq-server/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

460 lines
19 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(protocol_interop_SUITE).
-compile([export_all,
nowarn_export_all]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_client/include/amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
all() ->
[{group, tests}].
groups() ->
[{tests, [shuffle],
[
amqpl,
amqp_credit_multiple_grants,
amqp_credit_single_grant,
amqp_attach_sub_batch,
amqp_property_filter,
amqp_sql_filter
]
}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
%% Wait for exclusive or auto-delete queues being deleted.
timer:sleep(800),
rabbit_ct_broker_helpers:rpc(Config, ?MODULE, delete_queues, []),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testsuite cases
%% -------------------------------------------------------------------
amqpl(Config) ->
[Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Ctag = Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 2}),
amqp_channel:subscribe(Ch,
#'basic.consume'{queue = Stream,
consumer_tag = Ctag,
arguments = [{<<"x-stream-offset">>, long, 0}]},
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok
after 5000 -> ct:fail(consume_timeout)
end,
%% Since prefetch is 2, we expect to receive exactly 2 messages.
%% Whenever we ack both messages, we should receive exactly 2 more messages.
ExpectedPayloads = [{<<"m1">>, <<"m2">>},
{<<"m3">>, <<"m4">>},
{<<"m5">>, <<"m6">>},
%% The broker skips delivery of compressed sub batches to non Stream protocol
%% consumers, i.e. skips delivery of m7, m8, m9.
{<<"m10">>, <<"m11">>}],
lists:foreach(
fun({P1, P2}) ->
ok = process_2_amqpl_messages(Ch, P1, P2)
end, ExpectedPayloads),
ok = amqp_channel:close(Ch).
process_2_amqpl_messages(Ch, P1, P2) ->
%% We expect to receive exactly 2 messages.
receive {#'basic.deliver'{},
#amqp_msg{payload = P1}} -> ok
after 5000 -> ct:fail({missing_delivery, P1})
end,
DTag = receive {#'basic.deliver'{delivery_tag = Tag},
#amqp_msg{payload = P2}} -> Tag
after 5000 -> ct:fail({missing_delivery, P2})
end,
receive Msg -> ct:fail({unexpected_message, Msg})
after 10 -> ok
end,
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag,
multiple = true}).
amqp_credit_single_grant(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),
%% Consume from the stream via AMQP 1.0.
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/queue/", Stream/binary>>,
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, settled,
configuration, #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}),
%% There are 8 uncompressed messages in the stream.
ok = amqp10_client:flow_link_credit(Receiver, 8, never),
Msgs = receive_amqp_messages(Receiver, 8),
?assertEqual([<<"m1">>], amqp10_msg:body(hd(Msgs))),
?assertEqual([<<"m11">>], amqp10_msg:body(lists:last(Msgs))),
ok = amqp10_client:close_connection(Connection).
amqp_credit_multiple_grants(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),
%% Consume from the stream via AMQP 1.0.
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/queue/", Stream/binary>>,
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, unsettled,
configuration, #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}),
%% Granting 1 credit should deliver us exactly 1 message.
{ok, M1} = amqp10_client:get_msg(Receiver),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 5000 -> ct:fail("expected credit_exhausted")
end,
receive {amqp10_msg, _, _} = Unexp1 -> ct:fail({unexpected_message, Unexp1})
after 10 -> ok
end,
ok = amqp10_client:flow_link_credit(Receiver, 3, never),
%% We expect to receive exactly 3 more messages
receive {amqp10_msg, Receiver, Msg2} ->
?assertEqual([<<"m2">>], amqp10_msg:body(Msg2))
after 5000 -> ct:fail("missing m2")
end,
receive {amqp10_msg, Receiver, Msg3} ->
?assertEqual([<<"m3">>], amqp10_msg:body(Msg3))
after 5000 -> ct:fail("missing m3")
end,
%% Messages in an uncompressed subbatch should be delivered individually.
M4 = receive {amqp10_msg, Receiver, Msg4} ->
?assertEqual([<<"m4">>], amqp10_msg:body(Msg4)),
Msg4
after 5000 -> ct:fail("missing m4")
end,
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 5000 -> ct:fail("expected credit_exhausted")
end,
%% Let's ack all of them.
ok = amqp10_client_session:disposition(
Receiver,
amqp10_msg:delivery_id(M1),
amqp10_msg:delivery_id(M4),
true,
accepted),
%% Acking shouldn't grant more credits.
receive {amqp10_msg, _, _} = Unexp2 -> ct:fail({unexpected_message, Unexp2})
after 10 -> ok
end,
ok = amqp10_client:flow_link_credit(Receiver, 3, never),
M5 = receive {amqp10_msg, Receiver, Msg5} ->
?assertEqual([<<"m5">>], amqp10_msg:body(Msg5)),
Msg5
after 5000 -> ct:fail("missing m5")
end,
receive {amqp10_msg, Receiver, Msg6} ->
?assertEqual([<<"m6">>], amqp10_msg:body(Msg6))
after 5000 -> ct:fail("missing m6")
end,
%% The broker skips delivery of compressed sub batches to non Stream protocol
%% consumers, i.e. skips delivery of m7, m8, m9.
receive {amqp10_msg, Receiver, Msg10} ->
?assertEqual([<<"m10">>], amqp10_msg:body(Msg10))
after 5000 -> ct:fail("missing m10")
end,
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 5000 -> ct:fail("expected credit_exhausted")
end,
receive {amqp10_msg, _, _} = Unexp3 -> ct:fail({unexpected_message, Unexp3})
after 10 -> ok
end,
%% 1 message should be left in the stream.
%% Let's drain the stream.
ok = amqp10_client:flow_link_credit(Receiver, 1000, never, true),
M11 = receive {amqp10_msg, Receiver, Msg11} ->
?assertEqual([<<"m11">>], amqp10_msg:body(Msg11)),
Msg11
after 5000 -> ct:fail("missing m11")
end,
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 5000 -> ct:fail("expected credit_exhausted")
end,
%% Let's ack them all.
ok = amqp10_client_session:disposition(
Receiver,
amqp10_msg:delivery_id(M5),
amqp10_msg:delivery_id(M11),
true,
accepted),
receive {amqp10_msg, _, _} = Unexp4 -> ct:fail({unexpected_message, Unexp4})
after 10 -> ok
end,
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:close_connection(Connection).
amqp_attach_sub_batch(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),
%% Consume from the stream via AMQP 1.0.
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/queue/", Stream/binary>>,
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, settled, configuration,
%% Attach in the middle of an uncompresssed sub batch.
#{<<"rabbitmq:stream-offset-spec">> => 4}),
{ok, M5} = amqp10_client:get_msg(Receiver),
?assertEqual([<<"m5">>], amqp10_msg:body(M5)),
{ok, M6} = amqp10_client:get_msg(Receiver),
?assertEqual([<<"m6">>], amqp10_msg:body(M6)),
%% The broker skips delivery of compressed sub batches to non Stream protocol
%% consumers, i.e. skips delivery of m7, m8, m9.
{ok, M10} = amqp10_client:get_msg(Receiver),
?assertEqual([<<"m10">>], amqp10_msg:body(M10)),
{ok, M11} = amqp10_client:get_msg(Receiver),
?assertEqual([<<"m11">>], amqp10_msg:body(M11)),
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:close_connection(Connection).
%% Test that AMQP property filter works when messages
%% are published via the stream protocol and consumed via AMQP.
amqp_property_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),
%% Consume from the stream via AMQP 1.0.
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/queue/", Stream/binary>>,
AppPropsFilter = [{{utf8, <<"my key">>},
{utf8, <<"my value">>}}],
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, settled, configuration,
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}
}),
ok = amqp10_client:flow_link_credit(Receiver, 100, never),
receive {amqp10_msg, Receiver, M2} ->
?assertEqual([<<"m2">>], amqp10_msg:body(M2))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver, M4} ->
?assertEqual([<<"m4">>], amqp10_msg:body(M4))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver, M5} ->
?assertEqual([<<"m5">>], amqp10_msg:body(M5))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver, M6} ->
?assertEqual([<<"m6">>], amqp10_msg:body(M6))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, _} = Msg ->
ct:fail({received_unexpected_msg, Msg})
after 10 -> ok
end,
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:close_connection(Connection).
amqp_sql_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = <<"/queue/", Stream/binary>>,
AppProps1 = #'v1_0.application_properties'{content = [{{utf8, <<"key">>}, {byte, 1}}]},
AppProps2 = #'v1_0.application_properties'{content = [{{utf8, <<"key">>}, {byte, 2}}]},
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 55,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
Bodies = lists:duplicate(2000, <<"middle">>),
UncompressedSubbatch1 = stream_test_utils:sub_batch_entry_uncompressed(1, AppProps1, [<<"first">>]),
UncompressedSubbatch2 = stream_test_utils:sub_batch_entry_uncompressed(2, AppProps2, Bodies),
UncompressedSubbatch3 = stream_test_utils:sub_batch_entry_uncompressed(3, AppProps2, Bodies),
UncompressedSubbatch4 = stream_test_utils:sub_batch_entry_uncompressed(4, AppProps1, [<<"last">>]),
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, 1, UncompressedSubbatch1),
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch2),
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, UncompressedSubbatch3),
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, 1, UncompressedSubbatch4),
{ok, _} = stream_test_utils:close(S, C6),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
SQL = <<"a.key % 2 = 1">>,
Filter = #{<<"from start">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
value = {symbol, <<"first">>}},
?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_NAME_SQL_FILTER,
value = {utf8, SQL}}},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
settled, configuration, Filter),
receive {amqp10_event, {link, Receiver1, attached}} -> ok
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_event, {link, Receiver2, attached}} -> ok
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver1, 3, never, true),
ok = amqp10_client:flow_link_credit(Receiver2, 3, never, true),
%% For two links filtering on the same session, we expect that RabbitMQ
%% delivers messages concurrently (instead of scanning the entire stream
%% for the 1st receiver before scanning the entire stream for the 2nd receiver).
receive {amqp10_msg, _, First1} ->
?assertEqual([<<"first">>], amqp10_msg:body(First1))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, First2} ->
?assertEqual([<<"first">>], amqp10_msg:body(First2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, Last1} ->
?assertEqual([<<"last">>], amqp10_msg:body(Last1))
after 60_000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, Last2} ->
?assertEqual([<<"last">>], amqp10_msg:body(Last2))
after 60_000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_event, {link, Receiver1, credit_exhausted}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:detach_link(Receiver1),
ok = amqp10_client:detach_link(Receiver2),
ok = amqp10_client:close_connection(Connection),
receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end.
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
publish_via_stream_protocol(Stream, Config) ->
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 99,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
M1 = stream_test_utils:simple_entry(1, <<"m1">>),
AppProps = #'v1_0.application_properties'{content = [{{utf8, <<"my key">>},
{utf8, <<"my value">>}}]},
M2 = stream_test_utils:simple_entry(2, <<"m2">>, AppProps),
M3 = stream_test_utils:simple_entry(3, <<"m3">>),
Messages1 = [M1, M2, M3],
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(
4, AppProps, [<<"m4">>, <<"m5">>, <<"m6">>]),
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch),
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
Messages2 = [M10, M11],
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2),
{ok, _} = stream_test_utils:close(S, C6).
connection_config(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}}.
receive_amqp_messages(Receiver, N) ->
receive_amqp_messages0(Receiver, N, []).
receive_amqp_messages0(_Receiver, 0, Acc) ->
lists:reverse(Acc);
receive_amqp_messages0(Receiver, N, Acc) ->
receive
{amqp10_msg, Receiver, Msg} ->
receive_amqp_messages0(Receiver, N - 1, [Msg | Acc])
after 5000 ->
exit({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
delete_queues() ->
[{ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()].