Require feature flag message_containers
as it is required for Native AMQP 1.0 in 4.0. Remove compatibility code.
This commit is contained in:
parent
34862063d0
commit
dda1c500da
|
@ -542,14 +542,6 @@ rabbitmq_integration_suite(
|
|||
size = "medium",
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
name = "message_containers_SUITE",
|
||||
size = "medium",
|
||||
additional_beam = [
|
||||
":test_queue_utils_beam",
|
||||
],
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
name = "metrics_SUITE",
|
||||
size = "medium",
|
||||
|
|
|
@ -2092,15 +2092,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "message_containers_SUITE_beam_files",
|
||||
testonly = True,
|
||||
srcs = ["test/message_containers_SUITE.erl"],
|
||||
outs = ["test/message_containers_SUITE.beam"],
|
||||
app_name = "rabbit",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app"],
|
||||
)
|
||||
|
||||
erlang_bytecode(
|
||||
name = "metadata_store_clustering_SUITE_beam_files",
|
||||
testonly = True,
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
-export([
|
||||
message/3,
|
||||
message/4,
|
||||
message/5,
|
||||
from_basic_message/1,
|
||||
to_091/2,
|
||||
from_091/2
|
||||
|
@ -457,16 +456,13 @@ protocol_state(Content0, Anns) ->
|
|||
message(ExchangeName, RoutingKey, Content) ->
|
||||
message(ExchangeName, RoutingKey, Content, #{}).
|
||||
|
||||
%% helper for creating message container from messages received from AMQP legacy
|
||||
-spec message(rabbit_types:exchange_name(), rabbit_types:routing_key(), #content{}, map()) ->
|
||||
{ok, mc:state()} | {error, Reason :: any()}.
|
||||
message(XName, RoutingKey, Content, Anns) ->
|
||||
message(XName, RoutingKey, Content, Anns,
|
||||
rabbit_feature_flags:is_enabled(message_containers)).
|
||||
|
||||
%% helper for creating message container from messages received from
|
||||
%% AMQP legacy
|
||||
message(#resource{name = ExchangeNameBin}, RoutingKey,
|
||||
#content{properties = Props} = Content, Anns, true)
|
||||
message(#resource{name = ExchangeNameBin},
|
||||
RoutingKey,
|
||||
#content{properties = Props} = Content,
|
||||
Anns)
|
||||
when is_binary(RoutingKey) andalso
|
||||
is_map(Anns) ->
|
||||
case rabbit_basic:header_routes(Props#'P_basic'.headers) of
|
||||
|
@ -477,19 +473,6 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
|
|||
rabbit_basic:strip_bcc_header(Content),
|
||||
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
|
||||
?ANN_EXCHANGE => ExchangeNameBin})}
|
||||
end;
|
||||
message(#resource{} = XName, RoutingKey,
|
||||
#content{} = Content, Anns, false) ->
|
||||
case rabbit_basic:message(XName, RoutingKey, Content) of
|
||||
{ok, Msg} ->
|
||||
case Anns of
|
||||
#{id := Id} ->
|
||||
{ok, Msg#basic_message{id = Id}};
|
||||
_ ->
|
||||
{ok, Msg}
|
||||
end;
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
from_basic_message(#basic_message{content = Content,
|
||||
|
@ -502,7 +485,7 @@ from_basic_message(#basic_message{content = Content,
|
|||
_ ->
|
||||
#{id => Id}
|
||||
end,
|
||||
{ok, Msg} = message(Ex, RKey, prepare(read, Content), Anns, true),
|
||||
{ok, Msg} = message(Ex, RKey, prepare(read, Content), Anns),
|
||||
Msg.
|
||||
|
||||
%% Internal
|
||||
|
|
|
@ -123,9 +123,7 @@
|
|||
-rabbit_feature_flag(
|
||||
{message_containers,
|
||||
#{desc => "Message containers.",
|
||||
%%TODO Once lower version node in mixed versions is bumped to 3.13,
|
||||
%% make 'required' for upgrading AMQP 1.0 from 3.13 to 4.0
|
||||
stability => stable,
|
||||
stability => required,
|
||||
depends_on => [feature_flags_v2]
|
||||
}}).
|
||||
|
||||
|
|
|
@ -2216,12 +2216,7 @@ target_classic_queue_down(Config) ->
|
|||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
async_notify_settled_classic_queue(Config) ->
|
||||
%% TODO Bump old version in mixed version tests to 3.13.x,
|
||||
%% require ff message_containers and always run this test case.
|
||||
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of
|
||||
ok -> async_notify(settled, <<"classic">>, Config);
|
||||
{skip, _} = Skip -> Skip
|
||||
end.
|
||||
async_notify(settled, <<"classic">>, Config).
|
||||
|
||||
async_notify_settled_quorum_queue(Config) ->
|
||||
async_notify(settled, <<"quorum">>, Config).
|
||||
|
@ -2402,12 +2397,7 @@ link_flow_control(Config) ->
|
|||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
classic_queue_on_old_node(Config) ->
|
||||
%% TODO Bump old version in mixed version tests to 3.13.x,
|
||||
%% require ff message_containers and always run this test case.
|
||||
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of
|
||||
ok -> queue_and_client_different_nodes(1, 0, <<"classic">>, Config);
|
||||
{skip, _} = Skip -> Skip
|
||||
end.
|
||||
queue_and_client_different_nodes(1, 0, <<"classic">>, Config).
|
||||
|
||||
classic_queue_on_new_node(Config) ->
|
||||
queue_and_client_different_nodes(0, 1, <<"classic">>, Config).
|
||||
|
|
|
@ -155,22 +155,15 @@ init_per_group(Group, Config) ->
|
|||
case lists:member({group, Group}, all()) of
|
||||
true ->
|
||||
ClusterSize = 3,
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, Group},
|
||||
{rmq_nodes_count, ClusterSize}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()),
|
||||
case Config2 of
|
||||
{skip, _} ->
|
||||
Config2;
|
||||
_ ->
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2,
|
||||
message_containers),
|
||||
Config2
|
||||
end;
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config, [
|
||||
{rmq_nodename_suffix, Group},
|
||||
{rmq_nodes_count, ClusterSize}
|
||||
]),
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps());
|
||||
false ->
|
||||
rabbit_ct_helpers:run_steps(Config, [])
|
||||
end.
|
||||
|
|
|
@ -108,28 +108,14 @@ init_per_testcase(Testcase, Config) ->
|
|||
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
ClusterSize = ?config(rmq_nodes_count, Config),
|
||||
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, Testcase},
|
||||
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:run_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()),
|
||||
case Config2 of
|
||||
{skip, _} ->
|
||||
Config2;
|
||||
_ ->
|
||||
case Testcase of
|
||||
change_cluster ->
|
||||
%% do not enable message_containers feature flag as it will
|
||||
%% stop nodes in mixed versions joining later
|
||||
ok;
|
||||
_ ->
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(
|
||||
Config2, message_containers)
|
||||
end,
|
||||
Config2
|
||||
end.
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config, [
|
||||
{rmq_nodename_suffix, Testcase},
|
||||
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
|
||||
]),
|
||||
rabbit_ct_helpers:run_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()).
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
Config1 = rabbit_ct_helpers:run_steps(Config,
|
||||
|
|
|
@ -81,17 +81,10 @@ init_per_testcase(Testcase, Config) ->
|
|||
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
|
||||
{<<"x-quorum-initial-group-size">>, long, 3}]}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()),
|
||||
case Config2 of
|
||||
{skip, _} ->
|
||||
Config2;
|
||||
_ ->
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2, message_containers),
|
||||
Config2
|
||||
end
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps())
|
||||
end.
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
|
|
|
@ -64,21 +64,21 @@ init_per_testcase(Testcase, Config) ->
|
|||
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
ClusterSize = 3,
|
||||
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodes_count, ClusterSize},
|
||||
{rmq_nodes_clustered, true},
|
||||
{rmq_nodename_suffix, Testcase},
|
||||
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps() ++ [
|
||||
fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1,
|
||||
fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1
|
||||
]),
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2, message_containers),
|
||||
Config2.
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config, [
|
||||
{rmq_nodes_count, ClusterSize},
|
||||
{rmq_nodes_clustered, true},
|
||||
{rmq_nodename_suffix, Testcase},
|
||||
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
|
||||
]),
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps() ++
|
||||
[
|
||||
fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1,
|
||||
fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1
|
||||
]).
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
Config1 = rabbit_ct_helpers:run_steps(Config,
|
||||
|
|
|
@ -371,7 +371,7 @@ amqpl_cc_amqp_bin_amqpl(_Config) ->
|
|||
Content = #content{properties = Props,
|
||||
payload_fragments_rev = [<<"data">>]},
|
||||
X = rabbit_misc:r(<<"/">>, exchange, <<"exch">>),
|
||||
{ok, Msg} = mc_amqpl:message(X, <<"apple">>, Content, #{}, true),
|
||||
{ok, Msg} = mc_amqpl:message(X, <<"apple">>, Content, #{}),
|
||||
|
||||
RoutingKeys = [<<"apple">>, <<"q1">>, <<"q2">>],
|
||||
?assertEqual(RoutingKeys, mc:routing_keys(Msg)),
|
||||
|
@ -663,11 +663,12 @@ amqp_amqpl_amqp_bodies(_Config) ->
|
|||
Ex = #resource{virtual_host = <<"/">>,
|
||||
kind = exchange,
|
||||
name = <<"ex">>},
|
||||
{ok, LegacyMsg} = mc_amqpl:message(Ex, <<"rkey">>,
|
||||
{ok, LegacyMsg} = mc_amqpl:message(Ex,
|
||||
<<"rkey">>,
|
||||
#content{payload_fragments_rev =
|
||||
lists:reverse(EncodedPayload),
|
||||
properties = Props},
|
||||
#{}, true),
|
||||
#{}),
|
||||
|
||||
AmqpMsg = mc:convert(mc_amqp, LegacyMsg),
|
||||
%% drop any non body sections
|
||||
|
|
|
@ -1,257 +0,0 @@
|
|||
-module(message_containers_SUITE).
|
||||
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
-define(FEATURE_FLAG, message_containers).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Common Test callbacks
|
||||
%%%===================================================================
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, classic},
|
||||
{group, quorum},
|
||||
{group, stream}
|
||||
].
|
||||
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{classic, [], all_tests()},
|
||||
{quorum, [], all_tests()},
|
||||
{stream, [], all_tests()}
|
||||
].
|
||||
|
||||
all_tests() ->
|
||||
[
|
||||
enable_ff
|
||||
].
|
||||
|
||||
init_per_suite(Config0) ->
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
Config = rabbit_ct_helpers:merge_app_env(
|
||||
Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
|
||||
rabbit_ct_helpers:run_setup_steps(Config).
|
||||
|
||||
end_per_suite(Config) ->
|
||||
rabbit_ct_helpers:run_teardown_steps(Config),
|
||||
ok.
|
||||
|
||||
init_per_group(Group, Config) ->
|
||||
ct:pal("init per group ~p", [Group]),
|
||||
ClusterSize = 3,
|
||||
Config1 = rabbit_ct_helpers:set_config(Config,
|
||||
[{rmq_nodes_count, ClusterSize},
|
||||
{rmq_nodename_suffix, Group},
|
||||
{tcp_ports_base}]),
|
||||
Config1b = rabbit_ct_helpers:set_config(Config1,
|
||||
[{queue_type, atom_to_binary(Group, utf8)},
|
||||
{net_ticktime, 10}]),
|
||||
|
||||
Config1c = rabbit_ct_helpers:merge_app_env(
|
||||
Config1b, {rabbit, [{forced_feature_flags_on_init, []}]}),
|
||||
Config2 = rabbit_ct_helpers:run_steps(Config1c,
|
||||
[fun merge_app_env/1 ] ++
|
||||
rabbit_ct_broker_helpers:setup_steps()),
|
||||
case Config2 of
|
||||
{skip, _} ->
|
||||
Config2;
|
||||
_ ->
|
||||
ok = rabbit_ct_broker_helpers:rpc(
|
||||
Config2, 0, application, set_env,
|
||||
[rabbit, channel_tick_interval, 100]),
|
||||
|
||||
AllFFs = rabbit_ct_broker_helpers:rpc(Config2, rabbit_feature_flags, list, [all, stable]),
|
||||
FFs = maps:keys(maps:remove(?FEATURE_FLAG, AllFFs)),
|
||||
ct:pal("FFs ~p", [FFs]),
|
||||
case Group of
|
||||
classic ->
|
||||
try
|
||||
rabbit_ct_broker_helpers:set_policy(
|
||||
Config2, 0,
|
||||
<<"ha-policy">>, <<".*">>, <<"queues">>,
|
||||
[{<<"ha-mode">>, <<"all">>}]),
|
||||
Config2
|
||||
catch
|
||||
_:{badmatch, {error_string, Reason}} ->
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config2,
|
||||
rabbit_ct_broker_helpers:teardown_steps()),
|
||||
{skip, Reason}
|
||||
end;
|
||||
_ ->
|
||||
Config2
|
||||
end
|
||||
end.
|
||||
|
||||
merge_app_env(Config) ->
|
||||
rabbit_ct_helpers:merge_app_env(
|
||||
rabbit_ct_helpers:merge_app_env(Config,
|
||||
{rabbit,
|
||||
[{core_metrics_gc_interval, 100},
|
||||
{log, [{file, [{level, debug}]}]}]}),
|
||||
{ra, [{min_wal_roll_over_interval, 30000}]}).
|
||||
|
||||
end_per_group(_Group, Config) ->
|
||||
rabbit_ct_helpers:run_steps(Config,
|
||||
rabbit_ct_broker_helpers:teardown_steps()).
|
||||
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
case rabbit_ct_broker_helpers:is_feature_flag_supported(Config, ?FEATURE_FLAG) of
|
||||
false ->
|
||||
{skip, "feature flag message_containers is unsupported"};
|
||||
true ->
|
||||
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, ?FEATURE_FLAG)),
|
||||
Q = rabbit_data_coercion:to_binary(Testcase),
|
||||
Config2 = rabbit_ct_helpers:set_config(Config1,
|
||||
[{queue_name, Q},
|
||||
{alt_queue_name, <<Q/binary, "_alt">>}
|
||||
]),
|
||||
rabbit_ct_helpers:run_steps(Config2,
|
||||
rabbit_ct_client_helpers:setup_steps())
|
||||
end.
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
|
||||
Config1 = rabbit_ct_helpers:run_steps(
|
||||
Config,
|
||||
rabbit_ct_client_helpers:teardown_steps()),
|
||||
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Test cases
|
||||
%%%===================================================================
|
||||
|
||||
enable_ff(Config) ->
|
||||
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||
QueueType = ?config(queue_type, Config),
|
||||
QName = ?config(queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', QName, 0, 0},
|
||||
declare(Ch, QName,
|
||||
[{<<"x-queue-type">>, longstr, QueueType}])),
|
||||
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
|
||||
amqp_channel:register_confirm_handler(Ch, self()),
|
||||
|
||||
case QueueType of
|
||||
<<"stream">> ->
|
||||
%% if it is a stream we need to wait until there is a local member
|
||||
%% on the node we want to subscibe from before proceeding
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
queue_utils:has_local_stream_member(Config, 2, QName, <<"/">>)
|
||||
end, 60000),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
|
||||
ConsumerTag1 = <<"ctag1">>,
|
||||
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 2),
|
||||
qos(Ch2, 2),
|
||||
ok = subscribe(Ch2, QName, ConsumerTag1),
|
||||
publish_and_confirm(Ch, QName, <<"msg1">>),
|
||||
|
||||
receive_and_ack(Ch2),
|
||||
%% consume
|
||||
publish(Ch, QName, <<"msg2">>),
|
||||
|
||||
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG),
|
||||
|
||||
confirm(),
|
||||
publish_and_confirm(Ch, QName, <<"msg3">>),
|
||||
receive_and_ack(Ch2),
|
||||
receive_and_ack(Ch2).
|
||||
|
||||
receive_and_ack(Ch) ->
|
||||
receive
|
||||
{#'basic.deliver'{delivery_tag = DeliveryTag,
|
||||
redelivered = false},
|
||||
#amqp_msg{}} ->
|
||||
basic_ack(Ch, DeliveryTag)
|
||||
after 5000 ->
|
||||
flush(),
|
||||
exit(basic_deliver_timeout)
|
||||
end.
|
||||
|
||||
%% Utility
|
||||
|
||||
delete_queues() ->
|
||||
[{ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|
||||
|| Q <- rabbit_amqqueue:list()].
|
||||
|
||||
declare(Ch, Q, Args) ->
|
||||
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
|
||||
durable = true,
|
||||
auto_delete = false,
|
||||
arguments = Args}).
|
||||
|
||||
delete(Ch, Q) ->
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
|
||||
|
||||
publish(Ch, Queue, Msg) ->
|
||||
ok = amqp_channel:cast(Ch,
|
||||
#'basic.publish'{routing_key = Queue},
|
||||
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
|
||||
payload = Msg}).
|
||||
|
||||
publish_and_confirm(Ch, Queue, Msg) ->
|
||||
publish(Ch, Queue, Msg),
|
||||
ct:pal("waiting for ~ts message confirmation from ~ts", [Msg, Queue]),
|
||||
confirm().
|
||||
|
||||
confirm() ->
|
||||
ok = receive
|
||||
#'basic.ack'{} -> ok;
|
||||
#'basic.nack'{} -> fail
|
||||
after 2500 ->
|
||||
flush(),
|
||||
exit(confirm_timeout)
|
||||
end.
|
||||
|
||||
subscribe(Ch, Queue, CTag) ->
|
||||
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
|
||||
no_ack = false,
|
||||
consumer_tag = CTag},
|
||||
self()),
|
||||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
exit(basic_consume_timeout)
|
||||
end.
|
||||
|
||||
basic_ack(Ch, DTag) ->
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag,
|
||||
multiple = false}).
|
||||
|
||||
basic_cancel(Ch, CTag) ->
|
||||
#'basic.cancel_ok'{} =
|
||||
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}).
|
||||
|
||||
basic_nack(Ch, DTag) ->
|
||||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
|
||||
requeue = true,
|
||||
multiple = false}).
|
||||
|
||||
flush() ->
|
||||
receive
|
||||
Any ->
|
||||
ct:pal("flush ~tp", [Any]),
|
||||
flush()
|
||||
after 0 ->
|
||||
ok
|
||||
end.
|
||||
|
||||
get_global_counters(Config) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []).
|
||||
|
||||
qos(Ch, Prefetch) ->
|
||||
?assertMatch(#'basic.qos_ok'{},
|
||||
amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch})).
|
|
@ -57,16 +57,13 @@ end_per_suite(Config) ->
|
|||
|
||||
init_per_group(single_node, Config) ->
|
||||
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodes_count, 1},
|
||||
{rmq_nodename_suffix, Suffix}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()),
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2, message_containers),
|
||||
Config2;
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config, [{rmq_nodes_count, 1},
|
||||
{rmq_nodename_suffix, Suffix}]),
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps());
|
||||
init_per_group(overflow_reject_publish, Config) ->
|
||||
rabbit_ct_helpers:set_config(Config, [
|
||||
{overflow, <<"reject-publish">>}
|
||||
|
|
|
@ -90,9 +90,6 @@ init_per_group0(Group, Config) ->
|
|||
_ ->
|
||||
ok
|
||||
end,
|
||||
EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(Config2,
|
||||
message_containers),
|
||||
ct:pal("message_containers ff ~p", [EnableFF]),
|
||||
Config2
|
||||
end.
|
||||
|
||||
|
|
|
@ -232,7 +232,6 @@ init_per_group(Group, Config) ->
|
|||
{skip, _} ->
|
||||
Ret;
|
||||
Config2 ->
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2, message_containers),
|
||||
ok = rabbit_ct_broker_helpers:rpc(
|
||||
Config2, 0, application, set_env,
|
||||
[rabbit, channel_tick_interval, 100]),
|
||||
|
|
|
@ -101,7 +101,6 @@ init_per_group(Group, Config, NodesCount) ->
|
|||
{skip, _Reason} = Skip ->
|
||||
Skip;
|
||||
_ ->
|
||||
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config2, message_containers),
|
||||
ok = rpc(Config2, 0, application, set_env,
|
||||
[rabbit, channel_tick_interval, 100]),
|
||||
Config2
|
||||
|
|
|
@ -66,7 +66,7 @@ append_to_acc(_Config) ->
|
|||
payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes
|
||||
},
|
||||
ExName = rabbit_misc:r(<<>>, exchange, <<>>),
|
||||
{ok, Msg} = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}, true),
|
||||
{ok, Msg} = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}),
|
||||
BQDepth = 10,
|
||||
SyncThroughput_0 = 0,
|
||||
FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()},
|
||||
|
|
|
@ -1600,14 +1600,13 @@ drop_local(QNames, #state{subscriptions = Subs,
|
|||
drop_local(QNames, _) ->
|
||||
QNames.
|
||||
|
||||
deliver_to_queues(Message0,
|
||||
deliver_to_queues(Message,
|
||||
Options,
|
||||
RoutedToQNames,
|
||||
State0 = #state{queue_states = QStates0,
|
||||
cfg = #cfg{proto_ver = ProtoVer}}) ->
|
||||
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames),
|
||||
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
|
||||
Message = compat(Message0, State0),
|
||||
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
|
||||
{ok, QStates, Actions} ->
|
||||
rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
|
||||
|
@ -2545,17 +2544,3 @@ mc_env() ->
|
|||
MqttX ->
|
||||
#{mqtt_x => MqttX}
|
||||
end.
|
||||
|
||||
-spec compat(mc:state(), state()) -> mc:state().
|
||||
compat(McMqtt, #state{cfg = #cfg{exchange = XName}}) ->
|
||||
case rabbit_feature_flags:is_enabled(message_containers) of
|
||||
true ->
|
||||
McMqtt;
|
||||
false = FFState ->
|
||||
#mqtt_msg{qos = Qos} = mc:protocol_state(McMqtt),
|
||||
[RoutingKey] = mc:routing_keys(McMqtt),
|
||||
McLegacy = mc:convert(mc_amqpl, McMqtt),
|
||||
Content = mc:protocol_state(McLegacy),
|
||||
{ok, BasicMsg} = mc_amqpl:message(XName, RoutingKey, Content, #{}, FFState),
|
||||
rabbit_basic:add_header(<<"x-mqtt-publish-qos">>, byte, Qos, BasicMsg)
|
||||
end.
|
||||
|
|
|
@ -143,25 +143,7 @@ mqtt_v5(Config) ->
|
|||
unlink(C1),
|
||||
?assertEqual({error, {unsupported_protocol_version, #{}}}, Connect(C1)),
|
||||
|
||||
%% Send message from node 0.
|
||||
%% Message is stored in old AMQP 0.9.1 format on node 1.
|
||||
Topic = <<"my/topic">>,
|
||||
C2 = connect(<<"sub-v4">>, Config, 1, util:non_clean_sess_opts()),
|
||||
{ok, _, [1]} = emqtt:subscribe(C2, Topic, qos1),
|
||||
ok = emqtt:disconnect(C2),
|
||||
C3 = connect(<<"pub-v4">>, Config),
|
||||
{ok, _} = emqtt:publish(C3, Topic, <<"msg">>, qos1),
|
||||
ok = emqtt:disconnect(C3),
|
||||
|
||||
DependantFF = message_containers,
|
||||
?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, DependantFF)),
|
||||
?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),
|
||||
?assert(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, DependantFF)),
|
||||
|
||||
%% Translate from old AMQP 0.9.1 message format consuming from node 2.
|
||||
C4 = connect(<<"sub-v4">>, Config, 2, [{clean_start, false}]),
|
||||
ok = expect_publishes(C4, Topic, [<<"msg">>]),
|
||||
ok = emqtt:disconnect(C4),
|
||||
|
||||
%% MQTT 5.0 is now supported.
|
||||
{C5, Connect} = util:start_client(?FUNCTION_NAME, Config, 0, [{proto_ver, v5}]),
|
||||
|
|
|
@ -277,9 +277,11 @@ def rabbitmq_integration_suite(
|
|||
"quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits," +
|
||||
# required starting from 3.12.0 in rabbit:
|
||||
"feature_flags_v2,stream_queue,classic_queue_type_delivery_support,classic_mirrored_queue_version," +
|
||||
"stream_single_active_consumer,direct_exchange_routing_v2,listener_records_in_ets,tracking_records_in_ets",
|
||||
"stream_single_active_consumer,direct_exchange_routing_v2,listener_records_in_ets,tracking_records_in_ets," +
|
||||
# required starting from 3.12.0 in rabbitmq_management_agent:
|
||||
# empty_basic_get_metric, drop_unroutable_metric
|
||||
# required starting from 4.0 in rabbit:
|
||||
"message_containers",
|
||||
"RABBITMQ_RUN": "$(location :rabbitmq-for-tests-run)",
|
||||
"RABBITMQCTL": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmqctl".format(package),
|
||||
"RABBITMQ_PLUGINS": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmq-plugins".format(package),
|
||||
|
|
Loading…
Reference in New Issue