Merge pull request #13798 from rabbitmq/amqp091-to-amqp10-shovel-bug
Fix amqp091->amqp10 shovel with complex headers
This commit is contained in:
commit
1806a45461
|
|
@ -23,7 +23,7 @@ dep_amqp10_client = git https://github.com/rabbitmq/rabbitmq-amqp1.0-client.git
|
||||||
|
|
||||||
LOCAL_DEPS = crypto
|
LOCAL_DEPS = crypto
|
||||||
|
|
||||||
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 meck
|
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 rabbitmq_amqp_client meck
|
||||||
|
|
||||||
PLT_APPS += rabbitmq_cli
|
PLT_APPS += rabbitmq_cli
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -445,6 +445,4 @@ bin_to_hex(Bin) ->
|
||||||
is_amqp10_compat(T) ->
|
is_amqp10_compat(T) ->
|
||||||
is_binary(T) orelse
|
is_binary(T) orelse
|
||||||
is_number(T) orelse
|
is_number(T) orelse
|
||||||
%% TODO: not all lists are compatible
|
|
||||||
is_list(T) orelse
|
|
||||||
is_boolean(T).
|
is_boolean(T).
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,8 @@ groups() ->
|
||||||
autodelete_amqp091_dest_on_confirm,
|
autodelete_amqp091_dest_on_confirm,
|
||||||
autodelete_amqp091_dest_on_publish,
|
autodelete_amqp091_dest_on_publish,
|
||||||
simple_amqp10_dest,
|
simple_amqp10_dest,
|
||||||
simple_amqp10_src
|
simple_amqp10_src,
|
||||||
|
amqp091_to_amqp10_with_dead_lettering
|
||||||
]},
|
]},
|
||||||
{with_map_config, [], [
|
{with_map_config, [], [
|
||||||
simple,
|
simple,
|
||||||
|
|
@ -96,6 +97,29 @@ simple_amqp10_dest(Config) ->
|
||||||
<<"src-queue">>)
|
<<"src-queue">>)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
amqp091_to_amqp10_with_dead_lettering(Config) ->
|
||||||
|
Dest = ?config(destq, Config),
|
||||||
|
Src = ?config(srcq, Config),
|
||||||
|
TmpQ = <<"tmp">>,
|
||||||
|
with_session(Config,
|
||||||
|
fun (Sess) ->
|
||||||
|
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
|
||||||
|
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
|
||||||
|
#{arguments =>#{<<"x-max-length">> => {uint, 0},
|
||||||
|
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
|
||||||
|
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
|
||||||
|
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
|
||||||
|
<<"sender-tmp">>,
|
||||||
|
<<"/queues/", TmpQ/binary>>,
|
||||||
|
unsettled,
|
||||||
|
unsettled_state),
|
||||||
|
ok = await_amqp10_event(link, Sender, attached),
|
||||||
|
expect_empty(Sess, TmpQ),
|
||||||
|
test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>),
|
||||||
|
%% publish to tmp, it should be dead-lettered to src and then shovelled to dest
|
||||||
|
_ = publish_expect(Sess, TmpQ, Dest, <<"tag1">>, <<"hello">>)
|
||||||
|
end).
|
||||||
|
|
||||||
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
|
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
|
||||||
MapConfig = ?config(map_config, Config),
|
MapConfig = ?config(map_config, Config),
|
||||||
shovel_test_utils:set_param(Config, <<"test">>,
|
shovel_test_utils:set_param(Config, <<"test">>,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue