Support x-cc message annotation (#12559)
Support x-cc message annotation Support an `x-cc` message annotation in AMQP 1.0 similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. The value of the `x-cc` message annotation must by a list of strings. A message annotation is used since application properties allow only simple types.
This commit is contained in:
parent
0c905f9b17
commit
2c0cdee7d2
|
|
@ -862,6 +862,12 @@ rabbitmq_integration_suite(
|
|||
rabbitmq_integration_suite(
|
||||
name = "topic_permission_SUITE",
|
||||
size = "medium",
|
||||
additional_beam = [
|
||||
":test_amqp_utils_beam",
|
||||
],
|
||||
runtime_deps = [
|
||||
"//deps/rabbitmq_amqp_client:erlang_app",
|
||||
],
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
|
|
|
|||
|
|
@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
outs = ["test/topic_permission_SUITE.beam"],
|
||||
app_name = "rabbit",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app"],
|
||||
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "transactions_SUITE_beam_files",
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@
|
|||
priority/1,
|
||||
set_ttl/2,
|
||||
x_header/2,
|
||||
x_headers/1,
|
||||
routing_headers/2,
|
||||
exchange/1,
|
||||
routing_keys/1,
|
||||
|
|
@ -88,6 +89,7 @@
|
|||
{timestamp, non_neg_integer()} |
|
||||
{list, [tagged_value()]} |
|
||||
{map, [{tagged_value(), tagged_value()}]} |
|
||||
{array, atom(), [tagged_value()]} |
|
||||
null |
|
||||
undefined.
|
||||
|
||||
|
|
@ -104,11 +106,16 @@
|
|||
{MetadataSize :: non_neg_integer(),
|
||||
PayloadSize :: non_neg_integer()}.
|
||||
|
||||
%% retrieve and x- header from the protocol data
|
||||
%% retrieve an x- header from the protocol data
|
||||
%% the return value should be tagged with an AMQP 1.0 type
|
||||
-callback x_header(binary(), proto_state()) ->
|
||||
tagged_value().
|
||||
|
||||
%% retrieve x- headers from the protocol data
|
||||
%% the return values should be tagged with an AMQP 1.0 type
|
||||
-callback x_headers(proto_state()) ->
|
||||
#{binary() => tagged_value()}.
|
||||
|
||||
%% retrieve a property field from the protocol data
|
||||
%% e.g. message_id, correlation_id
|
||||
-callback property(atom(), proto_state()) ->
|
||||
|
|
@ -148,7 +155,7 @@ init(Proto, Data, Anns) ->
|
|||
-spec init(protocol(), term(), annotations(), environment()) -> state().
|
||||
init(Proto, Data, Anns0, Env) ->
|
||||
{ProtoData, ProtoAnns} = Proto:init(Data),
|
||||
Anns1 = case map_size(Env) == 0 of
|
||||
Anns1 = case map_size(Env) =:= 0 of
|
||||
true -> Anns0;
|
||||
false -> Anns0#{env => Env}
|
||||
end,
|
||||
|
|
@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto,
|
|||
x_header(Key, BasicMsg) ->
|
||||
mc_compat:x_header(Key, BasicMsg).
|
||||
|
||||
-spec x_headers(state()) ->
|
||||
#{binary() => tagged_value()}.
|
||||
x_headers(#?MODULE{protocol = Proto,
|
||||
annotations = Anns,
|
||||
data = Data}) ->
|
||||
%% x-headers may be have been added to the annotations map.
|
||||
New = maps:filtermap(
|
||||
fun(Key, Val) ->
|
||||
case mc_util:is_x_header(Key) of
|
||||
true ->
|
||||
{true, mc_util:infer_type(Val)};
|
||||
false ->
|
||||
false
|
||||
end
|
||||
end, Anns),
|
||||
maps:merge(Proto:x_headers(Data), New);
|
||||
x_headers(BasicMsg) ->
|
||||
mc_compat:x_headers(BasicMsg).
|
||||
|
||||
-spec routing_headers(state(), [x_headers | complex_types]) ->
|
||||
#{binary() => property_value()}.
|
||||
routing_headers(#?MODULE{protocol = Proto,
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
init/1,
|
||||
size/1,
|
||||
x_header/2,
|
||||
x_headers/1,
|
||||
property/2,
|
||||
routing_headers/2,
|
||||
convert_to/3,
|
||||
|
|
@ -125,6 +126,9 @@ size(#v1{message_annotations = MA,
|
|||
x_header(Key, Msg) ->
|
||||
message_annotation(Key, Msg, undefined).
|
||||
|
||||
x_headers(Msg) ->
|
||||
#{K => V || {{_T, K}, V} <- message_annotations(Msg)}.
|
||||
|
||||
property(_Prop, #msg_body_encoded{properties = undefined}) ->
|
||||
undefined;
|
||||
property(Prop, #msg_body_encoded{properties = Props}) ->
|
||||
|
|
@ -618,41 +622,16 @@ encode_deaths(Deaths) ->
|
|||
{map, Map}
|
||||
end, Deaths).
|
||||
|
||||
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
|
||||
essential_properties(Msg) ->
|
||||
Durable = get_property(durable, Msg),
|
||||
Priority = get_property(priority, Msg),
|
||||
Timestamp = get_property(timestamp, Msg),
|
||||
Ttl = get_property(ttl, Msg),
|
||||
Anns0 = #{?ANN_DURABLE => Durable},
|
||||
Anns = maps_put_truthy(
|
||||
?ANN_PRIORITY, Priority,
|
||||
maps_put_truthy(
|
||||
?ANN_TIMESTAMP, Timestamp,
|
||||
maps_put_truthy(
|
||||
ttl, Ttl,
|
||||
Anns0))),
|
||||
case MA of
|
||||
[] ->
|
||||
Anns;
|
||||
_ ->
|
||||
lists:foldl(
|
||||
fun ({{symbol, <<"x-routing-key">>},
|
||||
{utf8, Key}}, Acc) ->
|
||||
maps:update_with(?ANN_ROUTING_KEYS,
|
||||
fun(L) -> [Key | L] end,
|
||||
[Key],
|
||||
Acc);
|
||||
({{symbol, <<"x-cc">>},
|
||||
{list, CCs0}}, Acc) ->
|
||||
CCs = [CC || {_T, CC} <- CCs0],
|
||||
maps:update_with(?ANN_ROUTING_KEYS,
|
||||
fun(L) -> L ++ CCs end,
|
||||
CCs,
|
||||
Acc);
|
||||
({{symbol, <<"x-exchange">>},
|
||||
{utf8, Exchange}}, Acc) ->
|
||||
Acc#{?ANN_EXCHANGE => Exchange};
|
||||
(_, Acc) ->
|
||||
Acc
|
||||
end, Anns, MA)
|
||||
end.
|
||||
Anns = #{?ANN_DURABLE => Durable},
|
||||
maps_put_truthy(
|
||||
?ANN_PRIORITY, Priority,
|
||||
maps_put_truthy(
|
||||
?ANN_TIMESTAMP, Timestamp,
|
||||
maps_put_truthy(
|
||||
ttl, Ttl,
|
||||
Anns))).
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@
|
|||
init/1,
|
||||
size/1,
|
||||
x_header/2,
|
||||
x_headers/1,
|
||||
routing_headers/2,
|
||||
convert_to/3,
|
||||
convert_from/3,
|
||||
|
|
@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) ->
|
|||
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
|
||||
x_header(Key, Content).
|
||||
|
||||
x_headers(#content{properties = #'P_basic'{headers = undefined}}) ->
|
||||
#{};
|
||||
x_headers(#content{properties = #'P_basic'{headers = Headers}}) ->
|
||||
L = lists:filtermap(
|
||||
fun({Name, Type, Val}) ->
|
||||
case mc_util:is_x_header(Name) of
|
||||
true ->
|
||||
{true, {Name, from_091(Type, Val)}};
|
||||
false ->
|
||||
false
|
||||
end
|
||||
end, Headers),
|
||||
maps:from_list(L);
|
||||
x_headers(#content{properties = none} = Content0) ->
|
||||
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
|
||||
x_headers(Content).
|
||||
|
||||
property(Prop, Content) ->
|
||||
mc_util:infer_type(mc_compat:get_property(Prop, Content)).
|
||||
|
||||
|
|
@ -707,7 +725,6 @@ supported_header_value_type(table) ->
|
|||
supported_header_value_type(_) ->
|
||||
true.
|
||||
|
||||
|
||||
amqp10_map_get(_K, []) ->
|
||||
undefined;
|
||||
amqp10_map_get(K, Tuples) ->
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@
|
|||
priority/1,
|
||||
set_ttl/2,
|
||||
x_header/2,
|
||||
x_headers/1,
|
||||
routing_headers/2,
|
||||
%%%
|
||||
convert_to/2,
|
||||
|
|
@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) ->
|
|||
x_header(Key,#basic_message{content = Content}) ->
|
||||
mc_amqpl:x_header(Key, Content).
|
||||
|
||||
x_headers(#basic_message{content = Content}) ->
|
||||
mc_amqpl:x_headers(Content).
|
||||
|
||||
routing_headers(#basic_message{content = Content}, Opts) ->
|
||||
mc_amqpl:routing_headers(Content, Opts).
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) ->
|
|||
amqp_map_get(Key, {map, List}, Default) ->
|
||||
amqp_map_get(Key, List, Default);
|
||||
amqp_map_get(Key, List, Default) when is_list(List) ->
|
||||
case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of
|
||||
case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of
|
||||
{value, {_K, V}} ->
|
||||
V;
|
||||
false ->
|
||||
|
|
|
|||
|
|
@ -154,6 +154,7 @@
|
|||
%% The routing key is either defined in the ATTACH frame and static for
|
||||
%% the life time of the link or dynamically provided in each message's
|
||||
%% "to" field (address v2) or "subject" field (address v1).
|
||||
%% (A publisher can set additional routing keys via the x-cc message annotation.)
|
||||
routing_key :: rabbit_types:routing_key() | to | subject,
|
||||
%% queue_name_bin is only set if the link target address refers to a queue.
|
||||
queue_name_bin :: undefined | rabbit_misc:resource_name(),
|
||||
|
|
@ -2369,11 +2370,11 @@ incoming_link_transfer(
|
|||
|
||||
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
|
||||
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
|
||||
{ok, X, RoutingKey, Mc1, PermCache} ->
|
||||
{ok, X, RoutingKeys, Mc1, PermCache} ->
|
||||
Mc2 = rabbit_message_interceptor:intercept(Mc1),
|
||||
check_user_id(Mc2, User),
|
||||
TopicPermCache = check_write_permitted_on_topic(
|
||||
X, User, RoutingKey, TopicPermCache0),
|
||||
TopicPermCache = check_write_permitted_on_topics(
|
||||
X, User, RoutingKeys, TopicPermCache0),
|
||||
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
|
||||
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
|
||||
Opts = #{correlation => {HandleInt, DeliveryId}},
|
||||
|
|
@ -2408,14 +2409,14 @@ incoming_link_transfer(
|
|||
"delivery_tag=~p, delivery_id=~p, reason=~p",
|
||||
[DeliveryTag, DeliveryId, Reason])
|
||||
end;
|
||||
{error, #'v1_0.error'{} = Err} ->
|
||||
{error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} ->
|
||||
Disposition = case Settled of
|
||||
true -> [];
|
||||
false -> [released(DeliveryId)]
|
||||
end,
|
||||
Detach = [detach(HandleInt, Link0, Err)],
|
||||
{error, Disposition ++ Detach};
|
||||
{error, anonymous_terminus, #'v1_0.error'{} = Err} ->
|
||||
{error, {anonymous_terminus, true}, #'v1_0.error'{} = Err} ->
|
||||
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
|
||||
case Settled of
|
||||
true ->
|
||||
|
|
@ -2440,13 +2441,13 @@ incoming_link_transfer(
|
|||
end.
|
||||
|
||||
lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) ->
|
||||
lookup_routing_key(X, LinkRKey, Mc, PermCache);
|
||||
lookup_routing_key(X, LinkRKey, Mc, false, PermCache);
|
||||
lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) ->
|
||||
case rabbit_exchange:lookup(XName) of
|
||||
{ok, X} ->
|
||||
lookup_routing_key(X, LinkRKey, Mc, PermCache);
|
||||
lookup_routing_key(X, LinkRKey, Mc, false, PermCache);
|
||||
{error, not_found} ->
|
||||
{error, error_not_found(XName)}
|
||||
{error, {anonymous_terminus, false}, error_not_found(XName)}
|
||||
end;
|
||||
lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
|
||||
case mc:property(to, Mc) of
|
||||
|
|
@ -2458,25 +2459,26 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
|
|||
case rabbit_exchange:lookup(XName) of
|
||||
{ok, X} ->
|
||||
check_internal_exchange(X),
|
||||
lookup_routing_key(X, RKey, Mc, PermCache);
|
||||
lookup_routing_key(X, RKey, Mc, true, PermCache);
|
||||
{error, not_found} ->
|
||||
{error, anonymous_terminus, error_not_found(XName)}
|
||||
{error, {anonymous_terminus, true}, error_not_found(XName)}
|
||||
end;
|
||||
{error, bad_address} ->
|
||||
{error, anonymous_terminus,
|
||||
{error, {anonymous_terminus, true},
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
|
||||
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
|
||||
end;
|
||||
undefined ->
|
||||
{error, anonymous_terminus,
|
||||
{error, {anonymous_terminus, true},
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
|
||||
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
|
||||
end.
|
||||
|
||||
lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
|
||||
RKey0, Mc0, PermCache) ->
|
||||
RKey0, Mc0, AnonTerm, PermCache) ->
|
||||
Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0),
|
||||
RKey = case RKey0 of
|
||||
subject ->
|
||||
case mc:property(subject, Mc0) of
|
||||
|
|
@ -2488,9 +2490,31 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
|
|||
_ when is_binary(RKey0) ->
|
||||
RKey0
|
||||
end,
|
||||
Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0),
|
||||
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1),
|
||||
{ok, X, RKey, Mc, PermCache}.
|
||||
case mc:x_header(<<"x-cc">>, Mc0) of
|
||||
undefined ->
|
||||
RKeys = [RKey],
|
||||
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
|
||||
{ok, X, RKeys, Mc, PermCache};
|
||||
{list, CCs0} = L ->
|
||||
try lists:map(fun({utf8, CC}) -> CC end, CCs0) of
|
||||
CCs ->
|
||||
RKeys = [RKey | CCs],
|
||||
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
|
||||
{ok, X, RKeys, Mc, PermCache}
|
||||
catch error:function_clause ->
|
||||
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)}
|
||||
end;
|
||||
BadValue ->
|
||||
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)}
|
||||
end.
|
||||
|
||||
bad_x_cc(Value) ->
|
||||
Desc = unicode:characters_to_binary(
|
||||
lists:flatten(
|
||||
io_lib:format(
|
||||
"bad value for 'x-cc' message-annotation: ~tp", [Value]))),
|
||||
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
|
||||
description = {utf8, Desc}}.
|
||||
|
||||
process_routing_confirm([], _SenderSettles = true, _, U) ->
|
||||
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
|
||||
|
|
@ -3445,14 +3469,20 @@ check_resource_access(Resource, Perm, User, Cache) ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec check_write_permitted_on_topic(
|
||||
-spec check_write_permitted_on_topics(
|
||||
rabbit_types:exchange(),
|
||||
rabbit_types:user(),
|
||||
rabbit_types:routing_key(),
|
||||
[rabbit_types:routing_key(),...],
|
||||
topic_permission_cache()) ->
|
||||
topic_permission_cache().
|
||||
check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) ->
|
||||
check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache).
|
||||
check_write_permitted_on_topics(#exchange{type = topic} = Resource,
|
||||
User, RoutingKeys, TopicPermCache) ->
|
||||
lists:foldl(
|
||||
fun(RoutingKey, Cache) ->
|
||||
check_topic_authorisation(Resource, User, RoutingKey, write, Cache)
|
||||
end, TopicPermCache, RoutingKeys);
|
||||
check_write_permitted_on_topics(_, _, _, TopicPermCache) ->
|
||||
TopicPermCache.
|
||||
|
||||
-spec check_read_permitted_on_topic(
|
||||
rabbit_types:exchange(),
|
||||
|
|
|
|||
|
|
@ -1272,17 +1272,36 @@ parse_uncompressed_subbatch(
|
|||
|
||||
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) ->
|
||||
Mc0 = mc:init(mc_amqp, Entry, #{}),
|
||||
%% If exchange or routing_keys annotation isn't present the entry most likely came
|
||||
%% If exchange or routing keys annotation isn't present the entry most likely came
|
||||
%% from the rabbitmq-stream plugin so we'll choose defaults that simulate use
|
||||
%% of the direct exchange.
|
||||
Mc1 = case mc:exchange(Mc0) of
|
||||
undefined -> mc:set_annotation(?ANN_EXCHANGE, <<>>, Mc0);
|
||||
_ -> Mc0
|
||||
end,
|
||||
Mc2 = case mc:routing_keys(Mc1) of
|
||||
[] -> mc:set_annotation(?ANN_ROUTING_KEYS, [QName], Mc1);
|
||||
_ -> Mc1
|
||||
end,
|
||||
XHeaders = mc:x_headers(Mc0),
|
||||
Exchange = case XHeaders of
|
||||
#{<<"x-exchange">> := {utf8, X}} ->
|
||||
X;
|
||||
_ ->
|
||||
<<>>
|
||||
end,
|
||||
RKeys0 = case XHeaders of
|
||||
#{<<"x-cc">> := {list, CCs}} ->
|
||||
[CC || {utf8, CC} <- CCs];
|
||||
_ ->
|
||||
[]
|
||||
end,
|
||||
RKeys1 = case XHeaders of
|
||||
#{<<"x-routing-key">> := {utf8, RK}} ->
|
||||
[RK | RKeys0];
|
||||
_ ->
|
||||
RKeys0
|
||||
end,
|
||||
RKeys = case RKeys1 of
|
||||
[] ->
|
||||
[QName];
|
||||
_ ->
|
||||
RKeys1
|
||||
end,
|
||||
Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0),
|
||||
Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
|
||||
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2),
|
||||
case rabbit_amqp_filtex:filter(Filter, Mc) of
|
||||
true ->
|
||||
|
|
|
|||
|
|
@ -304,10 +304,9 @@ target_per_message_exchange_routing_key(Config) ->
|
|||
Tag1 = Body1 = <<1>>,
|
||||
Tag2 = Body2 = <<2>>,
|
||||
|
||||
%% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored.
|
||||
%% Although mc_amqp:essential_properties/1 parses the x-exchange annotation, it should be ignored.
|
||||
Msg1 = amqp10_msg:set_message_annotations(
|
||||
#{<<"x-exchange">> => <<"ignored">>,
|
||||
<<"x-routing-key">> => <<"ignored">>},
|
||||
#{<<"x-exchange">> => <<"ignored">>},
|
||||
amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))),
|
||||
Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)),
|
||||
ok = amqp10_client:send_msg(Sender, Msg1),
|
||||
|
|
|
|||
|
|
@ -116,7 +116,8 @@ groups() ->
|
|||
available_messages_quorum_queue,
|
||||
available_messages_stream,
|
||||
incoming_message_interceptors,
|
||||
trace,
|
||||
trace_classic_queue,
|
||||
trace_stream,
|
||||
user_id,
|
||||
message_ttl,
|
||||
plugin,
|
||||
|
|
@ -156,7 +157,12 @@ groups() ->
|
|||
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
|
||||
session_max_per_connection,
|
||||
link_max_per_session,
|
||||
reserved_annotation
|
||||
reserved_annotation,
|
||||
x_cc_annotation_exchange,
|
||||
x_cc_annotation_exchange_routing_key_empty,
|
||||
x_cc_annotation_queue,
|
||||
x_cc_annotation_null,
|
||||
bad_x_cc_annotation_exchange
|
||||
]},
|
||||
|
||||
{cluster_size_3, [shuffle],
|
||||
|
|
@ -4393,16 +4399,26 @@ incoming_message_interceptors(Config) ->
|
|||
ok = amqp10_client:close_connection(Connection),
|
||||
true = rpc(Config, persistent_term, erase, [Key]).
|
||||
|
||||
trace(Config) ->
|
||||
trace_classic_queue(Config) ->
|
||||
trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config).
|
||||
|
||||
trace_stream(Config) ->
|
||||
trace(atom_to_binary(?FUNCTION_NAME), <<"stream">>, Config).
|
||||
|
||||
trace(Q, QType, Config) ->
|
||||
Node = atom_to_binary(get_node_config(Config, 0, nodename)),
|
||||
TraceQ = <<"my trace queue">>,
|
||||
Q = <<"my queue">>,
|
||||
Qs = [Q, TraceQ],
|
||||
RoutingKey = <<"my routing key">>,
|
||||
Payload = <<"my payload">>,
|
||||
CorrelationId = <<"my correlation 👀"/utf8>>,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
[#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs],
|
||||
#'queue.declare_ok'{} = amqp_channel:call(
|
||||
Ch, #'queue.declare'{
|
||||
queue = Q,
|
||||
durable = true,
|
||||
arguments = [{<<"x-queue-type">>, longstr, QType}]}),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = TraceQ}),
|
||||
#'queue.bind_ok'{} = amqp_channel:call(
|
||||
Ch, #'queue.bind'{queue = TraceQ,
|
||||
exchange = <<"amq.rabbitmq.trace">>,
|
||||
|
|
@ -4420,16 +4436,21 @@ trace(Config) ->
|
|||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
|
||||
{ok, SessionReceiver} = amqp10_client:begin_session_sync(Connection),
|
||||
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver,
|
||||
<<"test-receiver">>,
|
||||
rabbitmq_amqp_address:queue(Q)),
|
||||
receive {amqp10_event, {link, Receiver, attached}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(
|
||||
SessionSender,
|
||||
<<"test-sender">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.direct">>, RoutingKey)),
|
||||
ok = wait_for_credit(Sender),
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver,
|
||||
<<"test-receiver">>,
|
||||
rabbitmq_amqp_address:queue(Q)),
|
||||
Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true),
|
||||
Msg = amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0),
|
||||
Msg = amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"my CC key">>}]}},
|
||||
amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)),
|
||||
ok = amqp10_client:send_msg(Sender, Msg),
|
||||
{ok, _} = amqp10_client:get_msg(Receiver),
|
||||
|
||||
|
|
@ -4439,7 +4460,7 @@ trace(Config) ->
|
|||
payload = Payload}} =
|
||||
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
|
||||
?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>,
|
||||
<<"routing_keys">> := [RoutingKey],
|
||||
<<"routing_keys">> := [RoutingKey, <<"my CC key">>],
|
||||
<<"connection">> := <<"127.0.0.1:", _/binary>>,
|
||||
<<"node">> := Node,
|
||||
<<"vhost">> := <<"/">>,
|
||||
|
|
@ -4454,7 +4475,7 @@ trace(Config) ->
|
|||
payload = Payload}} =
|
||||
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
|
||||
?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>,
|
||||
<<"routing_keys">> := [RoutingKey],
|
||||
<<"routing_keys">> := [RoutingKey, <<"my CC key">>],
|
||||
<<"connection">> := <<"127.0.0.1:", _/binary>>,
|
||||
<<"node">> := Node,
|
||||
<<"vhost">> := <<"/">>,
|
||||
|
|
@ -5956,6 +5977,239 @@ reserved_annotation(Config) ->
|
|||
end,
|
||||
ok = close_connection_sync(Connection).
|
||||
|
||||
%% Test that x-cc routing keys work together with target address
|
||||
%% /exchanges/:exchange/:routing-key
|
||||
x_cc_annotation_exchange(Config) ->
|
||||
QName1 = <<"queue 1">>,
|
||||
QName2 = <<"queue 2">>,
|
||||
{Connection, Session, LinkPair} = init(Config),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}),
|
||||
Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key 1">>),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
|
||||
ok = wait_for_credit(Sender),
|
||||
|
||||
Payload = <<"my message">>,
|
||||
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}},
|
||||
amqp10_msg:new(<<"tag">>, Payload))),
|
||||
ok = wait_for_accepted(<<"tag">>),
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
|
||||
{ok, Receiver1} = amqp10_client:attach_receiver_link(
|
||||
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled),
|
||||
{ok, Receiver2} = amqp10_client:attach_receiver_link(
|
||||
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled),
|
||||
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg1)),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg2)),
|
||||
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
|
||||
ok = end_session_sync(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test that x-cc routing keys work together with target address
|
||||
%% /exchanges/:exchange
|
||||
x_cc_annotation_exchange_routing_key_empty(Config) ->
|
||||
QName1 = <<"queue 1">>,
|
||||
QName2 = <<"queue 2">>,
|
||||
{Connection, Session, LinkPair} = init(Config),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}),
|
||||
AddressEmptyRoutingKey = rabbitmq_amqp_address:exchange(<<"amq.direct">>),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, AddressEmptyRoutingKey),
|
||||
ok = wait_for_credit(Sender),
|
||||
|
||||
Payload = <<"my message">>,
|
||||
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"key 1">>},
|
||||
{utf8, <<"key 2">>}]}},
|
||||
amqp10_msg:new(<<"tag">>, Payload))),
|
||||
ok = wait_for_accepted(<<"tag">>),
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
|
||||
{ok, Receiver1} = amqp10_client:attach_receiver_link(
|
||||
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled),
|
||||
{ok, Receiver2} = amqp10_client:attach_receiver_link(
|
||||
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled),
|
||||
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg1)),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg2)),
|
||||
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
|
||||
ok = end_session_sync(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test that x-cc routing keys work together with target address
|
||||
%% /queues/:queue
|
||||
x_cc_annotation_queue(Config) ->
|
||||
QName1 = <<"queue 1">>,
|
||||
QName2 = <<"queue 2">>,
|
||||
Address1 = rabbitmq_amqp_address:queue(QName1),
|
||||
Address2 = rabbitmq_amqp_address:queue(QName2),
|
||||
{Connection, Session, LinkPair} = init(Config),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address1),
|
||||
ok = wait_for_credit(Sender),
|
||||
|
||||
Payload = <<"my message">>,
|
||||
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, QName2}]}},
|
||||
amqp10_msg:new(<<"tag">>, Payload))),
|
||||
ok = wait_for_accepted(<<"tag">>),
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
|
||||
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled),
|
||||
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address2, settled),
|
||||
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg1)),
|
||||
?assertEqual([Payload], amqp10_msg:body(Msg2)),
|
||||
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
|
||||
ok = end_session_sync(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test that x-cc routing keys work together with target address 'null'
|
||||
x_cc_annotation_null(Config) ->
|
||||
QName1 = <<"queue 1">>,
|
||||
QName2 = <<"queue 2">>,
|
||||
QAddress1 = rabbitmq_amqp_address:queue(QName1),
|
||||
QAddress2 = rabbitmq_amqp_address:queue(QName2),
|
||||
{Connection, Session, LinkPair} = init(Config),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key-1">>, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"🗝️-2"/utf8>>, #{}),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
|
||||
ok = wait_for_credit(Sender),
|
||||
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, QAddress1, settled),
|
||||
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled),
|
||||
|
||||
Msg1 = amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"key-1">>},
|
||||
{utf8, <<"key-3">>}]}},
|
||||
amqp10_msg:set_properties(
|
||||
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)},
|
||||
amqp10_msg:new(<<"t1">>, <<"m1">>))),
|
||||
ok = amqp10_client:send_msg(Sender, Msg1),
|
||||
ok = wait_for_accepted(<<"t1">>),
|
||||
{ok, R1M1} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, R2M1} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)),
|
||||
?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)),
|
||||
|
||||
Msg2 = amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>},
|
||||
{utf8, <<"key-1">>}]}},
|
||||
amqp10_msg:set_properties(
|
||||
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)},
|
||||
amqp10_msg:new(<<"t2">>, <<"m2">>))),
|
||||
ok = amqp10_client:send_msg(Sender, Msg2),
|
||||
ok = wait_for_accepted(<<"t2">>),
|
||||
{ok, R1M2} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, R2M2} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)),
|
||||
?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)),
|
||||
|
||||
Msg3 = amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, QName1}]}},
|
||||
amqp10_msg:set_properties(
|
||||
#{to => rabbitmq_amqp_address:queue(QName2)},
|
||||
amqp10_msg:new(<<"t3">>, <<"m3">>))),
|
||||
ok = amqp10_client:send_msg(Sender, Msg3),
|
||||
ok = wait_for_accepted(<<"t3">>),
|
||||
{ok, R1M3} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, R2M3} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)),
|
||||
?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)),
|
||||
|
||||
Msg4 = amqp10_msg:set_message_annotations(
|
||||
%% We send a symbol instead of utf8..
|
||||
#{<<"x-cc">> => {list, [{symbol, QName1}]}},
|
||||
amqp10_msg:set_properties(
|
||||
#{to => rabbitmq_amqp_address:queue(QName2)},
|
||||
amqp10_msg:new(<<"t4">>, <<"m4">>))),
|
||||
ok = amqp10_client:send_msg(Sender, Msg4),
|
||||
%% "If the source of the link supports the rejected outcome, and the message has not
|
||||
%% already been settled by the sender, then the routing node MUST reject the message.
|
||||
%% In this case the error field of rejected MUST contain the error which would have been communicated
|
||||
%% in the detach which would have be sent if a link to the same address had been attempted."
|
||||
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
|
||||
receive {amqp10_disposition, {{rejected, Error}, <<"t4">>}} ->
|
||||
?assertMatch(
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
|
||||
description = {utf8, <<"bad value for 'x-cc' message-annotation:", _/binary>>}},
|
||||
Error)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
ok = amqp10_client:detach_link(Receiver1),
|
||||
ok = amqp10_client:detach_link(Receiver2),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
|
||||
ok = end_session_sync(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
bad_x_cc_annotation_exchange(Config) ->
|
||||
OpnConf = connection_config(Config),
|
||||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
{ok, Session} = amqp10_client:begin_session(Connection),
|
||||
|
||||
Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>),
|
||||
{ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address),
|
||||
ok = wait_for_credit(Sender1),
|
||||
ok = amqp10_client:send_msg(
|
||||
Sender1,
|
||||
amqp10_msg:set_message_annotations(
|
||||
%% We send an array instead of a list.
|
||||
#{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}},
|
||||
amqp10_msg:new(<<"t1">>, <<"m1">>))),
|
||||
ok = wait_for_settlement(<<"t1">>, released),
|
||||
receive {amqp10_event, {link, Sender1, {detached, Error1}}} ->
|
||||
?assertMatch(
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
|
||||
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
|
||||
"{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}},
|
||||
Error1)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
{ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address),
|
||||
ok = wait_for_credit(Sender2),
|
||||
ok = amqp10_client:send_msg(
|
||||
Sender2,
|
||||
amqp10_msg:set_message_annotations(
|
||||
%% We include a non-utf8 type in the list.
|
||||
#{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}},
|
||||
amqp10_msg:new(<<"t2">>, <<"m2">>))),
|
||||
ok = wait_for_settlement(<<"t2">>, released),
|
||||
receive {amqp10_event, {link, Sender2, {detached, Error2}}} ->
|
||||
?assertEqual(
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
|
||||
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
|
||||
"{list,[{symbol,<<\"key-3\">>}]}">>}},
|
||||
Error2)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = end_session_sync(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% internal
|
||||
%%
|
||||
|
||||
|
|
|
|||
|
|
@ -177,15 +177,11 @@ end_per_group(Group, Config) ->
|
|||
init_per_testcase(T, Config)
|
||||
when T =:= dead_letter_reject_expire_expire orelse
|
||||
T =:= stream ->
|
||||
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of
|
||||
ok ->
|
||||
init_per_testcase0(T, Config);
|
||||
{skip, _} = Skip ->
|
||||
%% With feature flag message_containers_deaths_v2 disabled, test case:
|
||||
%% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159
|
||||
%% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173
|
||||
Skip
|
||||
end;
|
||||
%% With feature flag message_containers_deaths_v2 disabled, test case:
|
||||
%% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159
|
||||
%% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173
|
||||
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2),
|
||||
init_per_testcase0(T, Config);
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
init_per_testcase0(Testcase, Config).
|
||||
|
||||
|
|
@ -1860,6 +1856,10 @@ stream(Config) ->
|
|||
{timestamp, T2} = rabbit_misc:table_lookup(Death2, <<"time">>),
|
||||
?assert(T1 < T2),
|
||||
|
||||
?assertEqual({array, [{longstr, <<"cc 1">>},
|
||||
{longstr, <<"cc 2">>}]},
|
||||
rabbit_misc:table_lookup(Headers, <<"CC">>)),
|
||||
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch0),
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch1).
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,9 @@ all_tests() ->
|
|||
amqp_amqpl_message_id_binary,
|
||||
amqp_amqpl_unsupported_values_not_converted,
|
||||
amqp_to_amqpl_data_body,
|
||||
amqp_amqpl_amqp_bodies
|
||||
amqp_amqpl_amqp_bodies,
|
||||
amqp_x_headers,
|
||||
amqpl_x_headers
|
||||
].
|
||||
|
||||
%%%===================================================================
|
||||
|
|
@ -195,10 +197,7 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
|
|||
[{{symbol, <<"type">>}, {utf8, <<"orange">>}},
|
||||
{{symbol, <<"count">>}, {long, 45}}]}
|
||||
]},
|
||||
mc:x_header(<<"x-fruit">>, Msg)),
|
||||
|
||||
|
||||
ok.
|
||||
mc:x_header(<<"x-fruit">>, Msg)).
|
||||
|
||||
amqpl_death_v1_records(_Config) ->
|
||||
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}).
|
||||
|
|
@ -364,8 +363,9 @@ amqpl_amqp_bin_amqpl(_Config) ->
|
|||
Msg10Pre = mc:convert(mc_amqp, Msg),
|
||||
Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)),
|
||||
Msg10 = mc:init(mc_amqp, Payload, #{}),
|
||||
?assertEqual(<<"exch">>, mc:exchange(Msg10)),
|
||||
?assertEqual([<<"apple">>], mc:routing_keys(Msg10)),
|
||||
?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>},
|
||||
<<"x-routing-key">> := {utf8, <<"apple">>}},
|
||||
mc:x_headers(Msg10)),
|
||||
?assertEqual(98, mc:priority(Msg10)),
|
||||
?assertEqual(true, mc:is_persistent(Msg10)),
|
||||
?assertEqual(99000, mc:timestamp(Msg10)),
|
||||
|
|
@ -422,8 +422,6 @@ amqpl_amqp_bin_amqpl(_Config) ->
|
|||
|
||||
MsgL2 = mc:convert(mc_amqpl, Msg10),
|
||||
|
||||
?assertEqual(<<"exch">>, mc:exchange(MsgL2)),
|
||||
?assertEqual([<<"apple">>], mc:routing_keys(MsgL2)),
|
||||
?assertEqual(98, mc:priority(MsgL2)),
|
||||
?assertEqual(true, mc:is_persistent(MsgL2)),
|
||||
?assertEqual(99000, mc:timestamp(MsgL2)),
|
||||
|
|
@ -450,9 +448,17 @@ amqpl_cc_amqp_bin_amqpl(_Config) ->
|
|||
Msg10Pre = mc:convert(mc_amqp, Msg),
|
||||
Sections = iolist_to_binary(mc:protocol_state(Msg10Pre)),
|
||||
Msg10 = mc:init(mc_amqp, Sections, #{}),
|
||||
?assertEqual(RoutingKeys, mc:routing_keys(Msg10)),
|
||||
?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>},
|
||||
<<"x-routing-key">> := {utf8, <<"apple">>},
|
||||
<<"x-cc">> := {list, [{utf8, <<"q1">>},
|
||||
{utf8, <<"q2">>}]}},
|
||||
mc:x_headers(Msg10)),
|
||||
|
||||
MsgL2 = mc:convert(mc_amqpl, Msg10),
|
||||
%% Here, we simulate what rabbit_stream_queue does:
|
||||
Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10),
|
||||
Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b),
|
||||
|
||||
MsgL2 = mc:convert(mc_amqpl, Msg10c),
|
||||
?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)),
|
||||
?assertMatch(#content{properties = #'P_basic'{headers = Headers}},
|
||||
mc:protocol_state(MsgL2)).
|
||||
|
|
@ -751,6 +757,52 @@ amqp_amqpl_amqp_bodies(_Config) ->
|
|||
end || Body <- Bodies],
|
||||
ok.
|
||||
|
||||
amqp_x_headers(_Config) ->
|
||||
MAC = [
|
||||
{{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}},
|
||||
thead2('x-list', list, [utf8(<<"l">>)]),
|
||||
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
|
||||
],
|
||||
M = #'v1_0.message_annotations'{content = MAC},
|
||||
AC = [thead(long, 5)],
|
||||
A = #'v1_0.application_properties'{content = AC},
|
||||
D = #'v1_0.data'{content = <<"data">>},
|
||||
|
||||
Payload = serialize_sections([M, A, D]),
|
||||
Msg0 = mc:init(mc_amqp, Payload, annotations()),
|
||||
Msg1 = mc:set_annotation(<<"x-1">>, {byte, -2}, Msg0),
|
||||
?assertEqual(#{<<"x-1">> => {byte, -2},
|
||||
<<"x-list">> => {list,[{utf8,<<"l">>}]},
|
||||
<<"x-map">> => {map,[{{utf8,<<"k">>},{utf8,<<"v">>}}]},
|
||||
<<"x-stream-filter">> => {utf8,<<"apple">>}},
|
||||
mc:x_headers(Msg1)).
|
||||
|
||||
amqpl_x_headers(_Config) ->
|
||||
Props = #'P_basic'{headers = [{<<"a-string">>, longstr, <<"a string">>},
|
||||
{<<"x-1">>, binary, <<"v1">>},
|
||||
{<<"x-stream-filter">>, longstr, <<"apple">>}]},
|
||||
Payload = [<<"data">>],
|
||||
Content = #content{properties = Props,
|
||||
payload_fragments_rev = Payload},
|
||||
|
||||
Msg0 = mc:init(mc_amqpl, Content, annotations()),
|
||||
Msg1 = mc:set_annotation(delivery_count, 1, Msg0),
|
||||
Msg = mc:set_annotation(<<"x-delivery-count">>, 2, Msg1),
|
||||
?assertEqual(#{<<"x-1">> => {binary, <<"v1">>},
|
||||
<<"x-stream-filter">> => {utf8,<<"apple">>},
|
||||
<<"x-delivery-count">> => {long, 2}},
|
||||
mc:x_headers(Msg)),
|
||||
|
||||
XName = <<"exch">>,
|
||||
RoutingKey = <<"apple">>,
|
||||
{ok, BasicMsg0} = rabbit_basic:message_no_id(XName, RoutingKey, Content),
|
||||
BasicMsg1 = mc:set_annotation(delivery_count, 1, BasicMsg0),
|
||||
BasicMsg = mc:set_annotation(<<"x-delivery-count">>, 2, BasicMsg1),
|
||||
?assertEqual(#{<<"x-1">> => {binary, <<"v1">>},
|
||||
<<"x-stream-filter">> => {utf8,<<"apple">>},
|
||||
<<"x-delivery-count">> => {long, 2}},
|
||||
mc:x_headers(BasicMsg)).
|
||||
|
||||
%% Utility
|
||||
|
||||
amqp10_encode_bin(L) when is_list(L) ->
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
-module(topic_permission_SUITE).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
|
@ -21,6 +22,7 @@ groups() ->
|
|||
[
|
||||
{sequential_tests, [],
|
||||
[
|
||||
amqp_x_cc_annotation,
|
||||
amqpl_cc_headers,
|
||||
amqpl_bcc_headers,
|
||||
topic_permission_database_access,
|
||||
|
|
@ -29,6 +31,7 @@ groups() ->
|
|||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
{ok, _} = application:ensure_all_started(amqp10_client),
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config,
|
||||
|
|
@ -56,6 +59,91 @@ init_per_testcase(Testcase, Config) ->
|
|||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
amqp_x_cc_annotation(Config) ->
|
||||
ok = set_topic_permissions(Config, "^a", ".*"),
|
||||
|
||||
QName1 = <<"queue 1">>,
|
||||
QName2 = <<"queue 2">>,
|
||||
{Connection, Session1, LinkPair} = amqp_utils:init(Config),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.topic">>, <<"a.1">>, #{}),
|
||||
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.topic">>, <<"a.2">>, #{}),
|
||||
|
||||
{ok, Sender1} = amqp10_client:attach_sender_link(
|
||||
Session1,
|
||||
<<"sender 1">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)),
|
||||
ok = amqp_utils:wait_for_credit(Sender1),
|
||||
{ok, Receiver1} = amqp10_client:attach_receiver_link(
|
||||
Session1, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled),
|
||||
{ok, Receiver2} = amqp10_client:attach_receiver_link(
|
||||
Session1, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled),
|
||||
%% We have permissions to send to both topics.
|
||||
%% Therefore, m1 should be sent to both queues.
|
||||
ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}},
|
||||
amqp10_msg:new(<<"t1">>, <<"m1">>, true))),
|
||||
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
|
||||
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
|
||||
?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)),
|
||||
?assertEqual([<<"m1">>], amqp10_msg:body(Msg2)),
|
||||
ok = amqp_utils:detach_link_sync(Sender1),
|
||||
ok = amqp_utils:detach_link_sync(Receiver1),
|
||||
ok = amqp_utils:detach_link_sync(Receiver2),
|
||||
|
||||
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
|
||||
{ok, Sender2} = amqp10_client:attach_sender_link(
|
||||
Session2,
|
||||
<<"sender 2">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)),
|
||||
ok = amqp_utils:wait_for_credit(Sender2),
|
||||
ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}},
|
||||
amqp10_msg:new(<<"t2">>, <<"m2">>, true))),
|
||||
receive
|
||||
{amqp10_event,
|
||||
{session, Session2,
|
||||
{ended,
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
|
||||
description = {utf8, Description1}}}}} ->
|
||||
?assertEqual(
|
||||
<<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
|
||||
Description1)
|
||||
after 5000 -> amqp_utils:flush(missing_ended),
|
||||
ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
{ok, Session3} = amqp10_client:begin_session_sync(Connection),
|
||||
{ok, Sender3} = amqp10_client:attach_sender_link(
|
||||
Session3,
|
||||
<<"sender 3">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)),
|
||||
ok = amqp_utils:wait_for_credit(Sender3),
|
||||
ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations(
|
||||
#{<<"x-cc">> => {list, [{utf8, <<"x.2">>}]}},
|
||||
amqp10_msg:new(<<"t3">>, <<"m3">>, true))),
|
||||
receive
|
||||
{amqp10_event,
|
||||
{session, Session3,
|
||||
{ended,
|
||||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
|
||||
description = {utf8, Description2}}}}} ->
|
||||
?assertEqual(
|
||||
<<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
|
||||
Description2)
|
||||
after 5000 -> amqp_utils:flush(missing_ended),
|
||||
ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
|
||||
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
|
||||
ok = amqp_utils:end_session_sync(Session1),
|
||||
ok = amqp10_client:close_connection(Connection),
|
||||
ok = clear_topic_permissions(Config).
|
||||
|
||||
amqpl_cc_headers(Config) ->
|
||||
amqpl_headers(<<"CC">>, Config).
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
init/1,
|
||||
size/1,
|
||||
x_header/2,
|
||||
x_headers/1,
|
||||
property/2,
|
||||
routing_headers/2,
|
||||
convert_to/3,
|
||||
|
|
@ -390,6 +391,11 @@ x_header(Key, #mqtt_msg{props = #{'User-Property' := UserProp}}) ->
|
|||
x_header(_Key, #mqtt_msg{}) ->
|
||||
undefined.
|
||||
|
||||
x_headers(#mqtt_msg{props = #{'User-Property' := UserProp}}) ->
|
||||
#{Key => {utf8, Val} || {<<"x-", _/binary>> = Key, Val} <- UserProp};
|
||||
x_headers(#mqtt_msg{}) ->
|
||||
#{}.
|
||||
|
||||
property(correlation_id, #mqtt_msg{props = #{'Correlation-Data' := Corr}}) ->
|
||||
case mc_util:urn_string_to_uuid(Corr) of
|
||||
{ok, UUId} ->
|
||||
|
|
|
|||
|
|
@ -61,6 +61,10 @@ roundtrip_amqp(_Config) ->
|
|||
PayloadSize = 10,
|
||||
ExpectedSize = {MetaDataSize, PayloadSize},
|
||||
?assertEqual(ExpectedSize, mc:size(Mc0)),
|
||||
?assertEqual(#{<<"x-key-1">> => {utf8, <<"val-1">>},
|
||||
<<"x-key-2">> => {utf8, <<"val-2">>},
|
||||
<<"x-key-3">> => {utf8, <<"val-3">>}},
|
||||
mc:x_headers(Mc0)),
|
||||
|
||||
Env = #{},
|
||||
?assertEqual(Msg, mc_mqtt:convert_to(mc_mqtt, Msg, Env)),
|
||||
|
|
@ -310,6 +314,7 @@ mqtt_amqpl_alt(_Config) ->
|
|||
},
|
||||
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]},
|
||||
Mc = mc:init(mc_mqtt, Msg, Anns),
|
||||
?assertEqual(#{}, mc:x_headers(Mc)),
|
||||
MsgL = mc:convert(mc_amqpl, Mc),
|
||||
|
||||
#content{properties = #'P_basic'{headers = HL} = Props} =
|
||||
|
|
|
|||
|
|
@ -18,6 +18,12 @@ This feature:
|
|||
This feature allows operators to gain insights into the message sizes being published to RabbitMQ, such as average message size, number of messages per pre-defined bucket (which can both be computed accurately), and percentiles (which will be approximated).
|
||||
Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, and MQTT 3.1).
|
||||
|
||||
## New Features
|
||||
|
||||
### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation
|
||||
[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation.
|
||||
This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
|
||||
|
||||
## Potential incompatibilities
|
||||
|
||||
* The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).
|
||||
|
|
|
|||
Loading…
Reference in New Issue