Merge pull request #12938 from rabbitmq/mergify/bp/v4.0.x/pr-12935
Fix dead lettering crash (backport #12935)
This commit is contained in:
commit
92275ff40e
|
|
@ -407,9 +407,9 @@ record_death(Reason, SourceQueue,
|
|||
[{Key, NewDeath} | Deaths0]
|
||||
end
|
||||
end,
|
||||
Anns0#{<<"x-last-death-reason">> := ReasonBin,
|
||||
<<"x-last-death-queue">> := SourceQueue,
|
||||
<<"x-last-death-exchange">> := Exchange,
|
||||
Anns0#{<<"x-last-death-reason">> => ReasonBin,
|
||||
<<"x-last-death-queue">> => SourceQueue,
|
||||
<<"x-last-death-exchange">> => Exchange,
|
||||
deaths := Deaths};
|
||||
_ ->
|
||||
Deaths = case Env of
|
||||
|
|
|
|||
|
|
@ -1331,7 +1331,8 @@ dead_letter_headers_should_be_appended_for_each_event(Config) ->
|
|||
|
||||
dead_letter_headers_should_not_be_appended_for_republish(Config) ->
|
||||
%% here we (re-)publish a message with the DL headers already set
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
{Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
|
||||
Args = ?config(queue_args, Config),
|
||||
Durable = ?config(queue_durable, Config),
|
||||
QName = ?config(queue_name, Config),
|
||||
|
|
@ -1339,44 +1340,45 @@ dead_letter_headers_should_not_be_appended_for_republish(Config) ->
|
|||
|
||||
DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>},
|
||||
{<<"x-dead-letter-routing-key">>, longstr, DlxName}],
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
|
||||
|
||||
P = <<"msg1">>,
|
||||
|
||||
%% Publish message
|
||||
publish(Ch, QName, [P]),
|
||||
publish(Ch0, QName, [P]),
|
||||
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||
[DTag] = consume(Ch, QName, [P]),
|
||||
[DTag] = consume(Ch0, QName, [P]),
|
||||
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
|
||||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
|
||||
multiple = false,
|
||||
requeue = false}),
|
||||
amqp_channel:cast(Ch0, #'basic.nack'{delivery_tag = DTag,
|
||||
multiple = false,
|
||||
requeue = false}),
|
||||
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
|
||||
props = #'P_basic'{headers = Headers1}}} =
|
||||
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
|
||||
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
|
||||
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
|
||||
?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
|
||||
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
|
||||
amqp_channel:cast(Ch0, #'basic.ack'{delivery_tag = DTag1}),
|
||||
|
||||
wait_for_messages(Config, [[DlxName, <<"0">>, <<"0">>, <<"0">>]]),
|
||||
|
||||
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
|
||||
#'queue.delete_ok'{} = amqp_channel:call(Ch0, #'queue.delete'{queue = QName}),
|
||||
DeadLetterArgs1 = DeadLetterArgs ++ [{<<"x-message-ttl">>, long, 1}],
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
|
||||
|
||||
publish(Ch, QName, [P], Headers1),
|
||||
publish(Ch1, QName, [P], Headers1),
|
||||
|
||||
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||
{#'basic.get_ok'{}, #amqp_msg{payload = P,
|
||||
props = #'P_basic'{headers = Headers2}}} =
|
||||
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
|
||||
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
|
||||
|
||||
{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
|
||||
?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)),
|
||||
ok = rabbit_ct_client_helpers:close_connection(Conn).
|
||||
ok = rabbit_ct_client_helpers:close_connection(Conn0),
|
||||
ok = rabbit_ct_client_helpers:close_connection(Conn1).
|
||||
|
||||
%% Dead-lettering a message modifies its headers:
|
||||
%% the exchange name is replaced with that of the latest dead-letter exchange,
|
||||
|
|
|
|||
Loading…
Reference in New Issue