rabbitmq-server/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUIT...

566 lines
23 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(unit_runtime_parameter_SUITE).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-define(EXCHANGE, <<"test_exchange">>).
-define(TO_SHOVEL, <<"to_the_shovel">>).
-define(FROM_SHOVEL, <<"from_the_shovel">>).
-define(UNSHOVELLED, <<"unshovelled">>).
-define(SHOVELLED, <<"shovelled">>).
-define(TIMEOUT, 1000).
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [parallel], [
parse_amqp091_maps,
parse_amqp091_proplists,
parse_amqp091_empty_maps,
parse_amqp091_empty_proplists,
parse_amqp10,
parse_amqp10_minimal,
validate_amqp10,
validate_amqp10_with_a_map,
test_src_protocol_defaults,
test_src_protocol_explicit,
test_dest_protocol_defaults,
test_dest_protocol_explicit,
test_protocols_defaults,
test_protocols_explicit,
test_protocols_mixed,
test_protocols_with_maps
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
Secret = crypto:strong_rand_bytes(128),
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodename_suffix, ?MODULE}]),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:rpc(Config2, 0, credentials_obfuscation,
set_secret, [Secret]),
Config2.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(_Testcase, Config) ->
Config.
end_per_testcase(_Testcase, Config) ->
Config.
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
parse_amqp091_maps(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_maps_0, []).
parse_amqp091_maps_0() ->
Params =
[{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"add-forward-headers">>, true},
{<<"add-timestamp-header">>, true},
{<<"publish-properties">>, #{<<"cluster_id">> => <<"x">>,
<<"delivery_mode">> => 2}},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"queue-length">>},
{<<"prefetch-count">>, 30},
{<<"reconnect-delay">>, 1001},
{<<"src-queue">>, <<"a-src-queue">>},
{<<"dest-queue">>, <<"a-dest-queue">>}
],
test_parse_amqp091(Params).
parse_amqp091_proplists(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_proplists_0, []).
parse_amqp091_proplists_0() ->
Params =
[{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-publish-properties">>, [{<<"cluster_id">>, <<"x">>},
{<<"delivery_mode">>, 2}]},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"queue-length">>},
{<<"src-prefetch-count">>, 30},
{<<"reconnect-delay">>, 1001},
{<<"src-queue">>, <<"a-src-queue">>},
{<<"dest-queue">>, <<"a-dest-queue">>}
],
test_parse_amqp091(Params).
parse_amqp091_empty_maps(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_empty_maps_0, []).
parse_amqp091_empty_maps_0() ->
Params =
[{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-publish-properties">>, #{}},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"queue-length">>},
{<<"src-prefetch-count">>, 30},
{<<"reconnect-delay">>, 1001},
{<<"src-queue">>, <<"a-src-queue">>},
{<<"dest-queue">>, <<"a-dest-queue">>}
],
test_parse_amqp091_with_blank_proprties(Params).
parse_amqp091_empty_proplists(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_empty_proplists_0, []).
parse_amqp091_empty_proplists_0() ->
Params =
[{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-publish-properties">>, []},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"queue-length">>},
{<<"src-prefetch-count">>, 30},
{<<"reconnect-delay">>, 1001},
{<<"src-queue">>, <<"a-src-queue">>},
{<<"dest-queue">>, <<"a-dest-queue">>}
],
test_parse_amqp091_with_blank_proprties(Params).
test_parse_amqp091(Params) ->
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
{ok, Result} = rabbit_shovel_parameters:parse({"vhost", "name"},
"my-cluster", ObfuscatedParams),
#{ack_mode := on_publish,
name := "name",
reconnect_delay := 1001,
dest := #{module := rabbit_amqp091_shovel,
uris := ["amqp://remotehost:5672"],
props_fun := {M, F, Args}
},
source := #{module := rabbit_amqp091_shovel,
uris := ["amqp://localhost:5672"],
prefetch_count := 30,
queue := <<"a-src-queue">>,
delete_after := 'queue-length'}
} = Result,
#'P_basic'{headers = ActualHeaders,
delivery_mode = 2,
cluster_id = <<"x">>} = apply(M, F, Args ++ ["amqp://localhost:5672",
"amqp://remotehost:5672",
#'P_basic'{headers = undefined}]),
assert_amqp901_headers(ActualHeaders),
ok.
test_parse_amqp091_with_blank_proprties(Params) ->
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
{ok, Result} = rabbit_shovel_parameters:parse({"vhost", "name"},
"my-cluster", ObfuscatedParams),
#{ack_mode := on_publish,
name := "name",
reconnect_delay := 1001,
dest := #{module := rabbit_amqp091_shovel,
uris := ["amqp://remotehost:5672"],
props_fun := {M, F, Args}
},
source := #{module := rabbit_amqp091_shovel,
uris := ["amqp://localhost:5672"],
prefetch_count := 30,
queue := <<"a-src-queue">>,
delete_after := 'queue-length'}
} = Result,
#'P_basic'{headers = ActualHeaders} = apply(M, F, Args ++ ["amqp://localhost:5672",
"amqp://remotehost:5672",
#'P_basic'{headers = undefined}]),
assert_amqp901_headers(ActualHeaders),
ok.
assert_amqp901_headers(ActualHeaders) ->
{_, array, [{table, Shovelled}]} = lists:keyfind(<<"x-shovelled">>, 1, ActualHeaders),
{_, long, _} = lists:keyfind(<<"x-shovelled-timestamp">>, 1, ActualHeaders),
ExpectedHeaders =
[{<<"shovelled-by">>, "my-cluster"},
{<<"shovel-type">>, <<"dynamic">>},
{<<"shovel-name">>, "name"},
{<<"shovel-vhost">>, "vhost"},
{<<"src-uri">>,"amqp://localhost:5672"},
{<<"dest-uri">>,"amqp://remotehost:5672"},
{<<"src-queue">>,<<"a-src-queue">>},
{<<"dest-queue">>,<<"a-dest-queue">>}],
lists:foreach(fun({K, V}) ->
?assertMatch({K, _, V},
lists:keyfind(K, 1, Shovelled))
end, ExpectedHeaders),
ok.
parse_amqp10(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp10_0, []).
parse_amqp10_0() ->
Params =
[
{<<"ack-mode">>, <<"on-publish">>},
{<<"reconnect-delay">>, 1001},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-address">>, <<"a-src-queue">>},
{<<"src-delete-after">>, <<"never">>},
{<<"src-prefetch-count">>, 30},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-address">>, <<"a-dest-queue">>},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-application-properties">>, [{<<"some-app-prop">>,
<<"app-prop-value">>}]},
{<<"dest-message-annotations">>, [{<<"some-message-ann">>,
<<"message-ann-value">>}]},
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
],
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
?assertMatch(
{ok, #{name := "my_shovel",
ack_mode := on_publish,
source := #{module := rabbit_amqp10_shovel,
uris := ["amqp://localhost:5672"],
delete_after := never,
prefetch_count := 30,
source_address := <<"a-src-queue">>
},
dest := #{module := rabbit_amqp10_shovel,
uris := ["amqp://remotehost:5672"],
target_address := <<"a-dest-queue">>,
message_annotations := #{<<"some-message-ann">> :=
<<"message-ann-value">>},
application_properties := #{<<"some-app-prop">> :=
<<"app-prop-value">>},
properties := #{user_id := <<"some-user">>},
add_timestamp_header := true,
add_forward_headers := true
}
}},
rabbit_shovel_parameters:parse({"vhost", "my_shovel"}, "my-cluster",
ObfuscatedParams)),
ok.
parse_amqp10_minimal(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp10_minimal_0, []).
parse_amqp10_minimal_0() ->
Params =
[
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-address">>, <<"a-src-queue">>},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-address">>, <<"a-dest-queue">>}
],
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
?assertMatch(
{ok, #{name := "my_shovel",
ack_mode := on_confirm,
source := #{module := rabbit_amqp10_shovel,
uris := ["amqp://localhost:5672"],
delete_after := never,
source_address := <<"a-src-queue">>
},
dest := #{module := rabbit_amqp10_shovel,
uris := ["amqp://remotehost:5672"],
unacked := #{},
target_address := <<"a-dest-queue">>
}
}},
rabbit_shovel_parameters:parse({"vhost", "my_shovel"}, "my-cluster",
ObfuscatedParams)),
ok.
validate_amqp10(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, validate_amqp10_0, []).
validate_amqp10_0() ->
Params =
[
{<<"ack-mode">>, <<"on-publish">>},
{<<"reconnect-delay">>, 1001},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-uri">>, <<"amqp://localhost:5672">>},
{<<"src-address">>, <<"a-src-queue">>},
{<<"src-delete-after">>, <<"never">>},
{<<"src-prefetch-count">>, 30},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
{<<"dest-address">>, <<"a-dest-queue">>},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-application-properties">>, [{<<"some-app-prop">>,
<<"app-prop-value">>}]},
{<<"dest-message-annotations">>, [{<<"some-message-ann">>,
<<"message-ann-value">>}]},
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
],
Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
"my-shovel", Params, none),
[] = validate_ok(Res),
ok.
validate_amqp10_with_a_map(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, validate_amqp10_with_a_map_0, []).
validate_amqp10_with_a_map_0() ->
Params =
#{
<<"ack-mode">> => <<"on-publish">>,
<<"reconnect-delay">> => 1001,
<<"src-protocol">> => <<"amqp10">>,
<<"src-uri">> => <<"amqp://localhost:5672">>,
<<"src-address">> => <<"a-src-queue">>,
<<"src-delete-after">> => <<"never">>,
<<"src-prefetch-count">> => 30,
<<"dest-protocol">> => <<"amqp10">>,
<<"dest-uri">> => <<"amqp://remotehost:5672">>,
<<"dest-address">> => <<"a-dest-queue">>,
<<"dest-add-forward-headers">> => true,
<<"dest-add-timestamp-header">> => true,
<<"dest-application-properties">> => [{<<"some-app-prop">>,
<<"app-prop-value">>}],
<<"dest-message-annotations">> => [{<<"some-message-ann">>, <<"message-ann-value">>}],
<<"dest-properties">> => #{<<"user_id">> => <<"some-user">>}
},
Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
"my-shovel", Params, none),
[] = validate_ok(Res),
ok.
validate_ok([ok | T]) ->
validate_ok(T);
validate_ok([[_|_] = L | T]) ->
validate_ok(L) ++ validate_ok(T);
validate_ok([]) -> [];
validate_ok(X) ->
exit({not_ok, X}).
%% -------------------------------------------------------------------
%% Protocol detection tests
%% -------------------------------------------------------------------
test_src_protocol_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefProplist)),
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap)),
ok.
test_src_protocol_explicit(_Config) ->
Def091 = [{<<"src-protocol">>, <<"amqp091">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(Def091)),
Def10 = [{<<"src-protocol">>, <<"amqp10">>}],
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(Def10)),
DefLocal = [{<<"src-protocol">>, <<"local">>}],
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefLocal)),
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>},
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap091)),
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>},
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(DefMap10)),
DefMapLocal = #{<<"src-protocol">> => <<"local">>},
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefMapLocal)),
ok.
test_dest_protocol_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefProplist)),
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap)),
ok.
test_dest_protocol_explicit(_Config) ->
Def091 = [{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(Def091)),
Def10 = [{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(Def10)),
DefLocal = [{<<"dest-protocol">>, <<"local">>}],
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefLocal)),
DefMap091 = #{<<"dest-protocol">> => <<"amqp091">>},
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap091)),
DefMap10 = #{<<"dest-protocol">> => <<"amqp10">>},
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(DefMap10)),
DefMapLocal = #{<<"dest-protocol">> => <<"local">>},
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefMapLocal)),
ok.
test_protocols_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefProplist)),
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap)),
ok.
test_protocols_explicit(_Config) ->
Def091 = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(Def091)),
Def10 = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(Def10)),
DefLocal = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefLocal)),
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap091)),
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(DefMap10)),
DefMapLocal = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMapLocal)),
ok.
test_protocols_mixed(_Config) ->
Def091to10 = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(Def091to10)),
Def10to091 = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(Def10to091)),
DefLocalTo091 = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefLocalTo091)),
Def091ToLocal = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(Def091ToLocal)),
Def10ToLocal = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(Def10ToLocal)),
DefLocalTo10 = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefLocalTo10)),
DefMap091to10 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap091to10)),
DefMap10to091 = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap10to091)),
DefMapLocalTo091 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefMapLocalTo091)),
DefMap091ToLocal = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap091ToLocal)),
DefMap10ToLocal = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(DefMap10ToLocal)),
DefMapLocalTo10 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefMapLocalTo10)),
ok.
test_protocols_with_maps(_Config) ->
DefMap1 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap1)),
DefMap2 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMap2)),
DefMap3 = #{<<"src-protocol">> => <<"amqp10">>},
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap3)),
DefMap4 = #{<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap4)),
ok.