Track requeue history

Support tracking the requeue history as described in
https://github.com/rabbitmq/rabbitmq-website/pull/2095

This commit:
1. adds a test case tracing the requeue history via AMQP 1.0
   using the modified outcome and
2. fixes bugs in the broker which crashed if a modified message
   annotation value is an AMQP 1.0 list, map, or array.

Complex modified annotation values (list, map, array) are stored as tagged values from now on.
This means AMQP 0.9.1 consumers will not receive modified annotations of
type list, map, or array (which is okay).
This commit is contained in:
David Ansari 2024-10-11 09:35:46 +02:00
parent d9ff6a00d8
commit e6818f0040
5 changed files with 76 additions and 33 deletions

View File

@ -1178,14 +1178,16 @@ wrap_map_value(true) ->
{boolean, true};
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
{uint, V};
wrap_map_value(V) when is_integer(V) andalso V >= 0 ->
uint(V);
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.
utf8(V) -> amqp10_client_types:utf8(V).

View File

@ -44,7 +44,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.

View File

@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
{long, V};
infer_type(V) when is_boolean(V) ->
{boolean, V};
infer_type({T, _} = V) when is_atom(T) ->
%% looks like a pre-tagged type
V.
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.
utf8_string_is_ascii(UTF8String) ->
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).

View File

@ -1938,7 +1938,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
Anns1 = lists:map(
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
{K, unwrap(V)}
{K, unwrap_simple_type(V)}
end, KVList),
maps:from_list(Anns1)
end,
@ -3624,7 +3624,14 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).
unwrap({_Tag, V}) ->
unwrap_simple_type(V = {list, _}) ->
V;
unwrap(V) ->
unwrap_simple_type(V = {map, _}) ->
V;
unwrap_simple_type(V = {array, _, _}) ->
V;
unwrap_simple_type({_SimpleType, V}) ->
V;
unwrap_simple_type(V) ->
V.

View File

@ -501,61 +501,96 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:detach_link(Sender),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
Receiver1Name = <<"receiver 1">>,
Receiver2Name = <<"receiver 2">>,
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),
{ok, M1} = amqp10_client:get_msg(Receiver),
{ok, M1} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M1)),
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),
{ok, M2a} = amqp10_client:get_msg(Receiver),
{ok, M2a} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M2a)),
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),
{ok, M2b} = amqp10_client:get_msg(Receiver),
{ok, M2b} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(M2b)),
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),
{ok, M2c} = amqp10_client:get_msg(Receiver),
{ok, M2c} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(M2c)),
ok = amqp10_client:settle_msg(Receiver, M2c,
ok = amqp10_client:settle_msg(
Receiver1, M2c,
{modified, true, false,
#{<<"x-opt-key">> => <<"val 1">>}}),
%% Test that a history of requeue events can be tracked as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
<<"x-opt-my-map">> => {map, [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}}
]}}}),
{ok, M2d} = amqp10_client:get_msg(Receiver),
{ok, M2d} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2d)),
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
ok = amqp10_client:settle_msg(Receiver, M2d,
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
<<"x-opt-requeue-reason">> := L1,
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
ok = amqp10_client:settle_msg(
Receiver1, M2d,
{modified, false, false,
#{<<"x-opt-key">> => <<"val 2">>,
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
<<"x-other">> => 99}}),
{ok, M2e} = amqp10_client:get_msg(Receiver),
{ok, M2e} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2e)),
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
<<"x-opt-my-map">> := [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}},
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
],
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
ok = amqp10_client:detach_link(Receiver),
?assertMatch({ok, #{message_count := 1}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
%% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}}
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
%% We expect to receive only modified AMQP 1.0 message annotations that are of simple types
%% (i.e. excluding list, map, array).
?assertEqual({value, {<<"x-other">>, long, 99}},
lists:keysearch(<<"x-other">>, 1, Headers)),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).