364 lines
17 KiB
Erlang
364 lines
17 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(exchanges_SUITE).
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
-compile([nowarn_export_all, export_all]).
|
|
-compile(export_all).
|
|
|
|
suite() ->
|
|
[{timetrap, 5 * 60000}].
|
|
|
|
all() ->
|
|
[
|
|
{group, mnesia_store},
|
|
{group, khepri_store},
|
|
{group, khepri_migration}
|
|
].
|
|
|
|
groups() ->
|
|
[
|
|
{mnesia_store, [], all_tests()},
|
|
{khepri_store, [], all_tests()},
|
|
{khepri_migration, [], [
|
|
from_mnesia_to_khepri
|
|
]}
|
|
].
|
|
|
|
all_tests() ->
|
|
[
|
|
direct_exchange,
|
|
headers_exchange,
|
|
topic_exchange,
|
|
fanout_exchange,
|
|
invalid_exchange
|
|
].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
rabbit_ct_helpers:run_setup_steps(Config, []).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
init_per_group(mnesia_store = Group, Config0) ->
|
|
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
|
|
init_per_group_common(Group, Config, 1);
|
|
init_per_group(khepri_store = Group, Config0) ->
|
|
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, khepri}]),
|
|
init_per_group_common(Group, Config, 1);
|
|
init_per_group(khepri_migration = Group, Config0) ->
|
|
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
|
|
init_per_group_common(Group, Config, 1).
|
|
|
|
init_per_group_common(Group, Config, Size) ->
|
|
Config1 = rabbit_ct_helpers:set_config(Config,
|
|
[{rmq_nodes_count, Size},
|
|
{rmq_nodename_suffix, Group},
|
|
{tcp_ports_base}]),
|
|
rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps()).
|
|
|
|
end_per_group(_, Config) ->
|
|
rabbit_ct_helpers:run_steps(Config,
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
|
|
Name = rabbit_data_coercion:to_binary(Testcase),
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
|
|
Config2 = rabbit_ct_helpers:set_config(Config1,
|
|
[{queue_name, Name},
|
|
{alt_queue_name, <<Name/binary, "_alt">>},
|
|
{exchange_name, Name}
|
|
]),
|
|
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
|
|
[?config(exchange_name, Config)]),
|
|
Config1 = rabbit_ct_helpers:run_steps(
|
|
Config,
|
|
rabbit_ct_client_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testcases.
|
|
%% -------------------------------------------------------------------
|
|
direct_exchange(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
AltQ = ?config(alt_queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', AltQ, 0, 0}, declare(Ch, AltQ, [])),
|
|
|
|
Direct = <<"amq.direct">>,
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Direct,
|
|
queue = Q,
|
|
routing_key = Q}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Direct,
|
|
queue = AltQ,
|
|
routing_key = AltQ}),
|
|
publish(Ch, Direct, Q, <<"msg1">>),
|
|
publish(Ch, Direct, <<"anyotherkey">>, <<"msg2">>),
|
|
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>],
|
|
[AltQ, <<"0">>, <<"0">>, <<"0">>]]),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
ok.
|
|
|
|
topic_exchange(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
|
|
Topic = <<"amq.topic">>,
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"this.*.rules">>}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"*.for.*">>}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"simply#carrots">>}),
|
|
|
|
publish(Ch, Topic, <<"this.queue.rules">>, <<"msg1">>),
|
|
publish(Ch, Topic, <<"this.exchange.rules">>, <<"msg2">>),
|
|
publish(Ch, Topic, <<"another.queue.rules">>, <<"msg3">>),
|
|
publish(Ch, Topic, <<"carrots.for.power">>, <<"msg4">>),
|
|
publish(Ch, Topic, <<"simplycarrots">>, <<"msg5">>),
|
|
publish(Ch, Topic, <<"*.queue.rules">>, <<"msg6">>),
|
|
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"3">>, <<"3">>, <<"0">>]]),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg4">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"#.noclue">>}),
|
|
publish(Ch, Topic, <<"simplycarrots">>, <<"msg7">>),
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"3">>, <<"0">>, <<"3">>]]),
|
|
publish(Ch, Topic, <<"#.bla">>, <<"msg8">>),
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"3">>, <<"0">>, <<"3">>]]),
|
|
publish(Ch, Topic, <<"#.noclue">>, <<"msg9">>),
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"1">>, <<"3">>]]),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg9">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"#">>}),
|
|
publish(Ch, Topic, <<"simplycarrots">>, <<"msg10">>),
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"5">>, <<"1">>, <<"4">>]]),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg10">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
ok.
|
|
|
|
fanout_exchange(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
AltQ = ?config(alt_queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', AltQ, 0, 0}, declare(Ch, AltQ, [])),
|
|
|
|
Fanout = <<"amq.fanout">>,
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Fanout,
|
|
queue = Q,
|
|
routing_key = Q}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Fanout,
|
|
queue = AltQ,
|
|
routing_key = AltQ}),
|
|
publish(Ch, Fanout, Q, <<"msg1">>),
|
|
publish(Ch, Fanout, <<"anyotherkey">>, <<"msg2">>),
|
|
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>],
|
|
[AltQ, <<"2">>, <<"2">>, <<"0">>]]),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
ok.
|
|
|
|
headers_exchange(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
AltQ = ?config(alt_queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', AltQ, 0, 0}, declare(Ch, AltQ, [])),
|
|
|
|
Headers = <<"amq.headers">>,
|
|
#'queue.bind_ok'{} =
|
|
amqp_channel:call(Ch,
|
|
#'queue.bind'{exchange = Headers,
|
|
queue = Q,
|
|
arguments = [{<<"x-match">>, longstr, <<"all">>},
|
|
{<<"foo">>, longstr, <<"bar">>},
|
|
{<<"fuu">>, longstr, <<"ber">>}]
|
|
}),
|
|
#'queue.bind_ok'{} =
|
|
amqp_channel:call(Ch,
|
|
#'queue.bind'{exchange = Headers,
|
|
queue = AltQ,
|
|
arguments = [{<<"x-match">>, longstr, <<"any">>},
|
|
{<<"foo">>, longstr, <<"bar">>},
|
|
{<<"fuu">>, longstr, <<"ber">>}]
|
|
}),
|
|
|
|
publish(Ch, Headers, <<>>, <<"msg1">>, [{<<"foo">>, longstr, <<"bar">>},
|
|
{<<"fuu">>, longstr, <<"ber">>}]),
|
|
publish(Ch, Headers, <<>>, <<"msg2">>, [{<<"foo">>, longstr, <<"bar">>}]),
|
|
publish(Ch, Headers, <<>>, <<"msg3">>),
|
|
|
|
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>],
|
|
[AltQ, <<"2">>, <<"2">>, <<"0">>]]),
|
|
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = AltQ})),
|
|
ok.
|
|
|
|
invalid_exchange(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
|
|
?assertExit(
|
|
{{shutdown, {server_initiated_close, 404, _}}, _},
|
|
amqp_channel:call(Ch, #'queue.bind'{exchange = <<"invalid">>,
|
|
queue = Q,
|
|
routing_key = Q})).
|
|
|
|
from_mnesia_to_khepri(Config) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
|
Q = ?config(queue_name, Config),
|
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
|
|
|
%% Test transient exchanges
|
|
X = ?config(exchange_name, Config),
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
|
|
durable = false}),
|
|
|
|
%% Topic is the only exchange type that has its own mnesia/khepri tables.
|
|
%% Let's test that the exchange works as expected after migration
|
|
Topic = <<"amq.topic">>,
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Topic,
|
|
queue = Q,
|
|
routing_key = <<"this.queue.rules">>}),
|
|
|
|
Exchanges = lists:sort([rabbit_misc:r(<<"/">>, exchange, <<>>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.fanout">>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.headers">>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.match">>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.rabbitmq.trace">>),
|
|
rabbit_misc:r(<<"/">>, exchange, <<"amq.topic">>),
|
|
rabbit_misc:r(<<"/">>, exchange, X)]),
|
|
?assertEqual(
|
|
Exchanges,
|
|
lists:sort([X0#exchange.name ||
|
|
X0 <- rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [])])),
|
|
|
|
case rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db) of
|
|
ok ->
|
|
rabbit_ct_helpers:await_condition(
|
|
fun() ->
|
|
RecoveredExchanges =
|
|
lists:sort([X0#exchange.name ||
|
|
X0 <- rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [])]),
|
|
Exchanges == RecoveredExchanges
|
|
end),
|
|
publish(Ch, Topic, <<"this.queue.rules">>, <<"msg1">>),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q})),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q}));
|
|
Skip ->
|
|
Skip
|
|
end.
|
|
|
|
%% Internal
|
|
|
|
delete_queues() ->
|
|
[{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|
|
|| Q <- rabbit_amqqueue:list()].
|
|
|
|
delete_exchange(Name) ->
|
|
ok = rabbit_exchange:ensure_deleted(
|
|
rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
|
|
|
|
declare(Ch, Q, Args) ->
|
|
declare(Ch, Q, Args, true).
|
|
|
|
declare(Ch, Q, Args, Durable) ->
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
|
|
durable = Durable,
|
|
auto_delete = false,
|
|
arguments = Args}).
|
|
|
|
publish(Ch, X, RoutingKey, Msg) ->
|
|
publish(Ch, X, RoutingKey, Msg, []).
|
|
|
|
publish(Ch, X, RoutingKey, Msg, Headers) ->
|
|
ok = amqp_channel:cast(Ch, #'basic.publish'{exchange = X,
|
|
routing_key = RoutingKey},
|
|
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
|
|
headers = Headers},
|
|
payload = Msg}).
|