Make AMQP address v2 format user friendly
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604 This commit changes the AMQP address format v2 from ``` /e/:exchange/:routing-key /e/:exchange /q/:queue ``` to ``` /exchanges/:exchange/:routing-key /exchanges/:exchange /queues/:queue ``` Advantages: 1. more user friendly 2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2 This plural form is still non-overlapping with AMQP address format v1. Although it might feel unusual at first to send a message to `/queues/q1`, if you think about `queues` just being a namespace or entity type, this address format makes sense.
This commit is contained in:
parent
a91c6ad014
commit
19523876cd
|
|
@ -2427,9 +2427,9 @@ ensure_source(#'v1_0.source'{address = Address,
|
|||
durable = Durable},
|
||||
Vhost, User, PermCache, TopicPermCache) ->
|
||||
case Address of
|
||||
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
|
||||
{utf8, <<"/queues/", QNameBinQuoted/binary>>} ->
|
||||
%% The only possible v2 source address format is:
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
try rabbit_uri:urldecode(QNameBinQuoted) of
|
||||
QNameBin ->
|
||||
QName = queue_resource(Vhost, QNameBin),
|
||||
|
|
@ -2549,9 +2549,9 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
|
|||
address_v1_permitted() ->
|
||||
rabbit_deprecated_features:is_permitted(amqp_address_v1).
|
||||
|
||||
target_address_version({utf8, <<"/e/", _/binary>>}) ->
|
||||
target_address_version({utf8, <<"/exchanges/", _/binary>>}) ->
|
||||
2;
|
||||
target_address_version({utf8, <<"/q/", _/binary>>}) ->
|
||||
target_address_version({utf8, <<"/queues/", _/binary>>}) ->
|
||||
2;
|
||||
target_address_version(undefined) ->
|
||||
%% anonymous terminus
|
||||
|
|
@ -2561,9 +2561,9 @@ target_address_version(_Address) ->
|
|||
1.
|
||||
|
||||
%% The possible v2 target address formats are:
|
||||
%% /e/:exchange/:routing-key
|
||||
%% /e/:exchange
|
||||
%% /q/:queue
|
||||
%% /exchanges/:exchange/:routing-key
|
||||
%% /exchanges/:exchange
|
||||
%% /queues/:queue
|
||||
%% <null>
|
||||
ensure_target_v2({utf8, String}, Vhost) ->
|
||||
case parse_target_v2_string(String) of
|
||||
|
|
@ -2586,7 +2586,7 @@ parse_target_v2_string(String) ->
|
|||
{error, bad_address}
|
||||
end.
|
||||
|
||||
parse_target_v2_string0(<<"/e/", Rest/binary>>) ->
|
||||
parse_target_v2_string0(<<"/exchanges/", Rest/binary>>) ->
|
||||
Key = cp_slash,
|
||||
Pattern = try persistent_term:get(Key)
|
||||
catch error:badarg ->
|
||||
|
|
@ -2609,10 +2609,10 @@ parse_target_v2_string0(<<"/e/", Rest/binary>>) ->
|
|||
_ ->
|
||||
{error, bad_address}
|
||||
end;
|
||||
parse_target_v2_string0(<<"/q/">>) ->
|
||||
parse_target_v2_string0(<<"/queues/">>) ->
|
||||
%% empty queue name is invalid
|
||||
{error, bad_address};
|
||||
parse_target_v2_string0(<<"/q/", QNameBinQuoted/binary>>) ->
|
||||
parse_target_v2_string0(<<"/queues/", QNameBinQuoted/binary>>) ->
|
||||
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
|
||||
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
|
||||
parse_target_v2_string0(_) ->
|
||||
|
|
|
|||
|
|
@ -97,14 +97,14 @@ end_per_testcase(Testcase, Config) ->
|
|||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /e/:exchange/:routing-key
|
||||
%% /exchanges/:exchange/:routing-key
|
||||
target_exchange_routing_key(Config) ->
|
||||
XName = <<"👉"/utf8>>,
|
||||
RKey = <<"🗝️"/utf8>>,
|
||||
target_exchange_routing_key0(XName, RKey, Config).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /e/:exchange/:routing-key
|
||||
%% /exchanges/:exchange/:routing-key
|
||||
%% where both :exchange and :routing-key contains a "/" character.
|
||||
target_exchange_routing_key_with_slash(Config) ->
|
||||
XName = <<"my/exchange">>,
|
||||
|
|
@ -141,7 +141,7 @@ target_exchange_routing_key0(XName, RKey, Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /e/:exchange/
|
||||
%% /exchanges/:exchange/
|
||||
%% Routing key is empty.
|
||||
target_exchange_routing_key_empty(Config) ->
|
||||
XName = <<"amq.fanout">>,
|
||||
|
|
@ -167,7 +167,7 @@ target_exchange_routing_key_empty(Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /e/:exchange
|
||||
%% /exchanges/:exchange
|
||||
%% Routing key is empty.
|
||||
target_exchange(Config) ->
|
||||
XName = <<"amq.fanout">>,
|
||||
|
|
@ -193,7 +193,7 @@ target_exchange(Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /e/:exchange
|
||||
%% /exchanges/:exchange
|
||||
%% where the target exchange does not exist.
|
||||
target_exchange_absent(Config) ->
|
||||
XName = <<"🎈"/utf8>>,
|
||||
|
|
@ -220,13 +220,13 @@ target_exchange_absent(Config) ->
|
|||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test v2 target and source address
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
queue(Config) ->
|
||||
QName = <<"🎈"/utf8>>,
|
||||
queue0(QName, Config).
|
||||
|
||||
%% Test v2 target and source address
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
%% where :queue contains a "/" character.
|
||||
queue_with_slash(Config) ->
|
||||
QName = <<"my/queue">>,
|
||||
|
|
@ -252,7 +252,7 @@ queue0(QName, Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
%% where the target queue does not exist.
|
||||
target_queue_absent(Config) ->
|
||||
QName = <<"🎈"/utf8>>,
|
||||
|
|
@ -279,7 +279,7 @@ target_queue_absent(Config) ->
|
|||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test v2 target address 'null' and 'to'
|
||||
%% /e/:exchange/:routing-key
|
||||
%% /exchanges/:exchange/:routing-key
|
||||
%% with varying routing keys.
|
||||
target_per_message_exchange_routing_key(Config) ->
|
||||
QName = atom_to_binary(?FUNCTION_NAME),
|
||||
|
|
@ -315,7 +315,7 @@ target_per_message_exchange_routing_key(Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address 'null' and 'to'
|
||||
%% /e/:exchange
|
||||
%% /exchanges/:exchange
|
||||
%% with varying exchanges.
|
||||
target_per_message_exchange(Config) ->
|
||||
XFanout = <<"amq.fanout">>,
|
||||
|
|
@ -349,7 +349,7 @@ target_per_message_exchange(Config) ->
|
|||
ok = cleanup(Init).
|
||||
|
||||
%% Test v2 target address 'null' and 'to'
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
target_per_message_queue(Config) ->
|
||||
Q1 = <<"q1">>,
|
||||
Q2 = <<"q2">>,
|
||||
|
|
@ -418,17 +418,17 @@ bad_v2_addresses() ->
|
|||
<<0>>,
|
||||
<<"/">>,
|
||||
<<"//">>,
|
||||
<<"/q">>,
|
||||
<<"/q/">>,
|
||||
<<"/queues">>,
|
||||
<<"/queues/">>,
|
||||
<<"/queue/">>,
|
||||
<<"/e">>,
|
||||
<<"/exchanges">>,
|
||||
%% default exchange in v2 target address is disallowed
|
||||
<<"/e/">>,
|
||||
<<"/e//">>,
|
||||
<<"/e//mykey">>,
|
||||
<<"/e/amq.default">>,
|
||||
<<"/e/amq.default/">>,
|
||||
<<"/e/amq.default/mykey">>,
|
||||
<<"/exchanges/">>,
|
||||
<<"/exchanges//">>,
|
||||
<<"/exchanges//mykey">>,
|
||||
<<"/exchanges/amq.default">>,
|
||||
<<"/exchanges/amq.default/">>,
|
||||
<<"/exchanges/amq.default/mykey">>,
|
||||
<<"/ex/✋"/utf8>>,
|
||||
<<"/exchange">>,
|
||||
<<"/exchange/">>,
|
||||
|
|
@ -438,13 +438,13 @@ bad_v2_addresses() ->
|
|||
<<"/exchange/amq.default/key/">>,
|
||||
<<"/exchange/amq.default/key/mykey">>,
|
||||
%% The following addresses should be percent encoded, but aren't.
|
||||
<<"/q/missing%encoding">>,
|
||||
<<"/q/missing/encoding">>,
|
||||
<<"/q/✋"/utf8>>,
|
||||
<<"/e/missing%encoding">>,
|
||||
<<"/e/missing/encoding/routingkey">>,
|
||||
<<"/e/exchange/missing%encoding">>,
|
||||
<<"/e/✋"/utf8>>
|
||||
<<"/queues/missing%encoding">>,
|
||||
<<"/queues/missing/encoding">>,
|
||||
<<"/queues/✋"/utf8>>,
|
||||
<<"/exchanges/missing%encoding">>,
|
||||
<<"/exchanges/missing/encoding/routingkey">>,
|
||||
<<"/exchanges/exchange/missing%encoding">>,
|
||||
<<"/exchanges/✋"/utf8>>
|
||||
].
|
||||
|
||||
%% Test v2 target address 'null' with an invalid 'to' addresses.
|
||||
|
|
@ -535,7 +535,7 @@ target_bad_address0(TargetAddress, Config) ->
|
|||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
%% Test v2 source address
|
||||
%% /q/:queue
|
||||
%% /queues/:queue
|
||||
%% where the source queue does not exist.
|
||||
source_queue_absent(Config) ->
|
||||
QName = <<"🎈"/utf8>>,
|
||||
|
|
|
|||
|
|
@ -14,17 +14,17 @@
|
|||
unicode:unicode_binary().
|
||||
exchange(ExchangeName) ->
|
||||
ExchangeNameQuoted = uri_string:quote(ExchangeName),
|
||||
<<"/e/", ExchangeNameQuoted/binary>>.
|
||||
<<"/exchanges/", ExchangeNameQuoted/binary>>.
|
||||
|
||||
-spec exchange(unicode:unicode_binary(), unicode:unicode_binary()) ->
|
||||
unicode:unicode_binary().
|
||||
exchange(ExchangeName, RoutingKey) ->
|
||||
ExchangeNameQuoted = uri_string:quote(ExchangeName),
|
||||
RoutingKeyQuoted = uri_string:quote(RoutingKey),
|
||||
<<"/e/", ExchangeNameQuoted/binary, "/", RoutingKeyQuoted/binary>>.
|
||||
<<"/exchanges/", ExchangeNameQuoted/binary, "/", RoutingKeyQuoted/binary>>.
|
||||
|
||||
-spec queue(unicode:unicode_binary()) ->
|
||||
unicode:unicode_binary().
|
||||
queue(QueueName) ->
|
||||
QueueNameQuoted = uri_string:quote(QueueName),
|
||||
<<"/q/", QueueNameQuoted/binary>>.
|
||||
<<"/queues/", QueueNameQuoted/binary>>.
|
||||
|
|
|
|||
|
|
@ -87,7 +87,10 @@ convert_from(mc_amqp, Sections, Env) ->
|
|||
#'v1_0.properties'{reply_to = {utf8, Address}} ->
|
||||
MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE),
|
||||
case Address of
|
||||
<<"/e/", MqttX:(byte_size(MqttX))/binary, "/", RoutingKeyQuoted/binary>> ->
|
||||
<<"/exchanges/",
|
||||
MqttX:(byte_size(MqttX))/binary,
|
||||
"/",
|
||||
RoutingKeyQuoted/binary>> ->
|
||||
try rabbit_uri:urldecode(RoutingKeyQuoted) of
|
||||
RoutingKey ->
|
||||
MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey),
|
||||
|
|
@ -263,7 +266,7 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos,
|
|||
%% We assume here that Exchange doesn't contain characters
|
||||
%% that need to be quoted. This is a reasonable assumption
|
||||
%% given that amq.topic is the default MQTT topic exchange.
|
||||
Address = <<"/e/", Exchange/binary, "/", TopicQuoted/binary>>,
|
||||
Address = <<"/exchanges/", Exchange/binary, "/", TopicQuoted/binary>>,
|
||||
{utf8, Address};
|
||||
_ ->
|
||||
undefined
|
||||
|
|
|
|||
|
|
@ -237,12 +237,12 @@ amqp_to_mqtt_reply_to(_Config) ->
|
|||
Key = mqtt_x,
|
||||
Env = #{Key => <<"mqtt-topic-exchange">>},
|
||||
|
||||
AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/my.routing.key">>}},
|
||||
AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/my.routing.key">>}},
|
||||
#mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env),
|
||||
?assertEqual({ok, <<"my/routing/key">>},
|
||||
maps:find('Response-Topic', Props1)),
|
||||
|
||||
AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/e/NON-mqtt-topic-exchange/my.routing.key">>}},
|
||||
AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/NON-mqtt-topic-exchange/my.routing.key">>}},
|
||||
#mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]),
|
||||
?assertEqual(error,
|
||||
maps:find('Response-Topic', Props2)),
|
||||
|
|
@ -251,14 +251,14 @@ amqp_to_mqtt_reply_to(_Config) ->
|
|||
%% The AMQP client must percent encode the AMQP reply_to address URI. We expect the
|
||||
%% AMQP -> MQTT conversion to percent decode because an MQTT response topic is not percent encoded.
|
||||
RoutingKeyQuoted = uri_string:quote(RoutingKey),
|
||||
AmqpProps3 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/", RoutingKeyQuoted/binary>>}},
|
||||
AmqpProps3 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/", RoutingKeyQuoted/binary>>}},
|
||||
#mqtt_msg{props = Props3} = amqp_to_mqtt([AmqpProps3, Val], Env),
|
||||
?assertEqual({ok, <<"my/sp%$@cial/routing/key">>},
|
||||
maps:find('Response-Topic', Props3)),
|
||||
|
||||
%% If the AMQP client did not percent encode the AMQP reply_to address URI as required,
|
||||
%% then the reply_to should be ignored by the conversion.
|
||||
AmqpProps4 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/", RoutingKey/binary>>}},
|
||||
AmqpProps4 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/", RoutingKey/binary>>}},
|
||||
#mqtt_msg{props = Props4} = amqp_to_mqtt([AmqpProps4, Val], Env),
|
||||
?assertEqual(error,
|
||||
maps:find('Response-Topic', Props4)).
|
||||
|
|
|
|||
Loading…
Reference in New Issue