Support dynamic creation of queues

## What?
Support the `dynamic` field of sources and targets.

 ## Why?
1. This allows AMQP clients to dynamically create exclusive queues, which
   can be useful for RPC workloads.
2. Support creation of JMS temporary queues over AMQP using the Qpid JMS
   client. Exclusive queues map very nicely to JMS temporary queues
   because:

> Although sessions are used to create temporary destinations, this is only
for convenience. Their scope is actually the entire connection. Their
lifetime is that of their connection and any of the connection’s sessions
are allowed to create a consumer for them.

https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#creating-temporary-destinations

 ## How?
If the terminus contains the capability `temporary-queue` as defined in
[amqp-bindmap-jms-v1.0-wd10](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=67638)
[5.2] and as sent by Qpid JMS client,
RabbitMQ will create an exclusive queue.
(This allows a future commit to take other actions if capability
`temporary-topic` will be used, such as the additional creation of bindings.)

No matter what the desired node properties are, RabbitMQ will set the
lifetime policy delete-on-close deleting the exclusive queue when the
link which caused its creation ceases to exist. This means the exclusive
queue will be deleted if either:
* the link gets detached, or
* the session ends, or
* the connection closes

Although the AMQP JMS Mapping and Qpid JMS create only a **sending** link
with `dynamic=true`, this commit also supports **receiving** links with
`dynamic=true` for non-JMS AMQP clients.

RabbitMQ is free to choose the generated queue name. As suggested by the
AMQP spec, the generated queue name will contain the container-id and link
name unless they are very long.

Co-authored-by: Arnaud Cogoluègnes <acogoluegnes@gmail.com>
This commit is contained in:
David Ansari 2025-02-04 18:45:24 +01:00
parent 06ec8f0342
commit 9062476a18
11 changed files with 1243 additions and 101 deletions

View File

@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
make_source(#{role := {receiver, Source, _Pid},
filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
Dynamic = maps:get(dynamic, Source, false),
TranslatedFilter = translate_filters(Filter),
#'v1_0.source'{address = {utf8, Address},
#'v1_0.source'{address = make_address(Source),
durable = {uint, Durable},
filter = TranslatedFilter}.
dynamic = Dynamic,
filter = TranslatedFilter,
capabilities = make_capabilities(Source)}.
make_target(#{role := {receiver, _Source, _Pid}}) ->
#'v1_0.target'{};
make_target(#{role := {sender, #{address := Address} = Target}}) ->
make_target(#{role := {sender, Target}}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
TargetAddr = case is_binary(Address) of
true -> {utf8, Address};
false -> Address
end,
#'v1_0.target'{address = TargetAddr,
durable = {uint, Durable}}.
Dynamic = maps:get(dynamic, Target, false),
#'v1_0.target'{address = make_address(Target),
durable = {uint, Durable},
dynamic = Dynamic,
capabilities = make_capabilities(Target)}.
make_address(#{address := Addr}) ->
if is_binary(Addr) ->
{utf8, Addr};
is_atom(Addr) ->
Addr
end.
make_capabilities(#{capabilities := Caps0}) ->
Caps = [{symbol, C} || C <- Caps0],
{array, symbol, Caps};
make_capabilities(_) ->
undefined.
max_message_size(#{max_message_size := Size})
when is_integer(Size) andalso

View File

@ -260,8 +260,8 @@ wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0)
State = untrack_channel(ChannelNum, SessionPid, State0),
wait_for_shutdown_sessions(TimerRef, State);
shutdown_sessions_timeout ->
?LOG_INFO("sessions not shut down after ~b ms: ~p",
[?SHUTDOWN_SESSIONS_TIMEOUT, Channels]),
?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p",
[?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]),
State0
end.
@ -792,6 +792,7 @@ send_to_new_session(
connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
vhost = Vhost,
user = User,
container_id = ContainerId,
name = ConnName},
writer = WriterPid} = State) ->
%% Subtract fixed frame header size.
@ -804,6 +805,7 @@ send_to_new_session(
OutgoingMaxFrameSize,
User,
Vhost,
ContainerId,
ConnName,
BeginFrame],
case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of

View File

@ -85,8 +85,10 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(HIBERNATE_AFTER, 6_000).
-define(CREDIT_REPLY_TIMEOUT, 30_000).
%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client.
-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>).
-export([start_link/8,
-export([start_link/9,
process_frame/2,
list_local/0,
conserve_resources/3,
@ -163,6 +165,7 @@
routing_key :: rabbit_types:routing_key() | to | subject,
%% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(),
dynamic :: boolean(),
max_message_size :: pos_integer(),
delivery_count :: sequence_no(),
credit :: rabbit_queue_type:credit(),
@ -206,6 +209,7 @@
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(),
queue_type :: rabbit_queue_type:queue_type(),
dynamic :: boolean(),
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),
@ -260,6 +264,7 @@
-record(cfg, {
outgoing_max_frame_size :: unlimited | pos_integer(),
container_id :: binary(),
reader_pid :: rabbit_types:connection(),
writer_pid :: pid(),
user :: rabbit_types:user(),
@ -382,15 +387,17 @@
-type state() :: #state{}.
start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) ->
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame},
start_link(ReaderPid, WriterPid, ChannelNum, FrameMax,
User, Vhost, ContainerId, ConnName, BeginFrame) ->
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax,
User, Vhost, ContainerId, ConnName, BeginFrame},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Args, Opts).
process_frame(Pid, FrameBody) ->
gen_server:cast(Pid, {frame_body, FrameBody}).
init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, ConnName,
#'v1_0.begin'{
%% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2]
remote_channel = undefined,
@ -401,6 +408,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
amqp_container => ContainerId,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),
@ -453,7 +461,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID,
cfg = #cfg{reader_pid = ReaderPid,
cfg = #cfg{container_id = ContainerId,
reader_pid = ReaderPid,
writer_pid = WriterPid,
outgoing_max_frame_size = MaxFrameSize,
user = User,
@ -470,14 +479,17 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
terminate(_Reason, #state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
queue_states = QStates}) ->
queue_states = QStates,
cfg = Cfg}) ->
maps:foreach(
fun (_, _) ->
rabbit_global_counters:publisher_deleted(?PROTOCOL)
fun (_, Link) ->
rabbit_global_counters:publisher_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
end, IncomingLinks),
maps:foreach(
fun (_, _) ->
rabbit_global_counters:consumer_deleted(?PROTOCOL)
fun (_, Link) ->
rabbit_global_counters:consumer_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
end, OutgoingLinks),
ok = rabbit_queue_type:close(QStates).
@ -1094,39 +1106,52 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach,
end;
handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
State0 = #state{incoming_links = IncomingLinks,
State0 = #state{incoming_links = IncomingLinks0,
outgoing_links = OutgoingLinks0,
outgoing_unsettled_map = Unsettled0,
outgoing_pending = Pending0,
queue_states = QStates0,
cfg = #cfg{user = #user{username = Username}}}) ->
cfg = Cfg = #cfg{user = #user{username = Username}}}) ->
{OutgoingLinks, Unsettled, Pending, QStates} =
case maps:take(HandleInt, OutgoingLinks0) of
{#outgoing_link{queue_name = QName}, OutgoingLinks1} ->
{#outgoing_link{queue_name = QName,
dynamic = Dynamic}, OutgoingLinks1} ->
Ctag = handle_to_ctag(HandleInt),
{Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
Spec = #{consumer_tag => Ctag,
reason => remove,
user => Username},
case rabbit_queue_type:cancel(Q, Spec, QStates0) of
{ok, QStates1} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates1};
{error, Reason} ->
protocol_error(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Failed to remove consumer from ~s: ~tp",
[rabbit_misc:rs(amqqueue:get_name(Q)), Reason])
end;
{error, not_found} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates0}
case Dynamic of
true ->
delete_dynamic_queue(QName, Cfg),
{OutgoingLinks1, Unsettled1, Pending1, QStates0};
false ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
Spec = #{consumer_tag => Ctag,
reason => remove,
user => Username},
case rabbit_queue_type:cancel(Q, Spec, QStates0) of
{ok, QStates1} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates1};
{error, Reason} ->
protocol_error(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Failed to remove consumer from ~s: ~tp",
[rabbit_misc:rs(amqqueue:get_name(Q)), Reason])
end;
{error, not_found} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates0}
end
end;
error ->
{OutgoingLinks0, Unsettled0, Pending0, QStates0}
end,
State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks),
IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of
{IncomingLink, IncomingLinks1} ->
maybe_delete_dynamic_queue(IncomingLink, Cfg),
IncomingLinks1;
error ->
IncomingLinks0
end,
State1 = State0#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled,
outgoing_pending = Pending,
@ -1271,29 +1296,33 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State);
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName = {utf8, LinkName0},
name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt),
source = Source,
snd_settle_mode = MaybeSndSettleMode,
target = Target = #'v1_0.target'{address = TargetAddress},
target = Target0,
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
},
State0 = #state{incoming_links = IncomingLinks0,
permission_cache = PermCache0,
cfg = #cfg{max_link_credit = MaxLinkCredit,
cfg = #cfg{container_id = ContainerId,
reader_pid = ReaderPid,
max_link_credit = MaxLinkCredit,
vhost = Vhost,
user = User}}) ->
case ensure_target(Target, Vhost, User, PermCache0) of
{ok, Exchange, RoutingKey, QNameBin, PermCache} ->
case ensure_target(Target0, LinkNameBin, Vhost, User,
ContainerId, ReaderPid, PermCache0) of
{ok, Exchange, RoutingKey, QNameBin, Target, PermCache} ->
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
name = LinkName0,
name = LinkNameBin,
snd_settle_mode = SndSettleMode,
target_address = address(TargetAddress),
target_address = address(Target#'v1_0.target'.address),
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
dynamic = default(Target#'v1_0.target'.dynamic, false),
max_message_size = MaxMessageSize,
delivery_count = DeliveryCountInt,
credit = MaxLinkCredit},
@ -1327,10 +1356,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end;
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName = {utf8, LinkName0},
name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt),
source = Source = #'v1_0.source'{address = SourceAddress,
filter = DesiredFilter},
source = Source0 = #'v1_0.source'{filter = DesiredFilter},
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
@ -1341,6 +1369,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
topic_permission_cache = TopicPermCache0,
cfg = #cfg{vhost = Vhost,
user = User = #user{username = Username},
container_id = ContainerId,
reader_pid = ReaderPid}}) ->
{SndSettled, EffectiveSndSettleMode} =
case SndSettleMode of
@ -1352,10 +1381,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% client only for durable messages.
{false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED}
end,
case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of
case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId,
ReaderPid, PermCache0, TopicPermCache0) of
{error, Reason} ->
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]);
{ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} ->
{ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} ->
PermCache = check_resource_access(QName, read, User, PermCache1),
case rabbit_amqqueue:with(
QName,
@ -1441,12 +1471,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize,
offered_capabilities = OfferedCaps},
{utf8, SourceAddress} = Source#'v1_0.source'.address,
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
name = LinkName0,
source_address = address(SourceAddress),
name = LinkNameBin,
source_address = SourceAddress,
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
dynamic = default(Source#'v1_0.source'.dynamic, false),
send_settled = SndSettled,
max_message_size = MaxMessageSize,
credit_api_version = CreditApiVsn,
@ -2616,17 +2648,53 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{Credit, []}.
-spec ensure_source(#'v1_0.source'{},
binary(),
rabbit_types:vhost(),
rabbit_types:user(),
binary(),
rabbit_types:connection(),
permission_cache(),
topic_permission_cache()) ->
{ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} |
{ok,
rabbit_amqqueue:name(),
#'v1_0.source'{},
permission_cache(),
topic_permission_cache()} |
{error, term()}.
ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) ->
exit_not_implemented("Dynamic sources not supported");
ensure_source(#'v1_0.source'{address = Address,
durable = Durable},
Vhost, User, PermCache, TopicPermCache) ->
ensure_source(#'v1_0.source'{
address = undefined,
dynamic = true,
%% We will reply with the actual node properties.
dynamic_node_properties = _IgnoreDesiredProperties,
capabilities = {array, symbol, Caps}
} = Source0,
LinkName, Vhost, User, ContainerId,
ConnPid, PermCache0, TopicPermCache) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
Source = Source0#'v1_0.source'{
address = {utf8, Address},
%% While Khepri stores queue records durably, the terminus
%% - i.e. the existence of this receiver - is not stored durably.
durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic_node_properties = Props,
distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE,
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
QName = queue_resource(Vhost, QNameBin),
{ok, QName, Source, PermCache, TopicPermCache};
false ->
exit_not_implemented("Dynamic source not supported: ~p", [Source0])
end;
ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic source not supported: ~p", [Source]);
ensure_source(Source = #'v1_0.source'{address = Address,
durable = Durable},
_LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) ->
case Address of
{utf8, <<"/queues/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
@ -2635,15 +2703,20 @@ ensure_source(#'v1_0.source'{address = Address,
QNameBin ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache}
{ok, QName, Source, PermCache, TopicPermCache}
catch error:_ ->
{error, {bad_address, Address}}
end;
{utf8, SourceAddr} ->
case address_v1_permitted() of
true ->
ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache);
case ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache) of
{ok, QName, PermCache1, TopicPermCache1} ->
{ok, QName, Source, PermCache1, TopicPermCache1};
Err ->
Err
end;
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end;
@ -2689,42 +2762,71 @@ ensure_source_v1(Address,
Err
end.
address(undefined) ->
null;
address({utf8, String}) ->
String.
-spec ensure_target(#'v1_0.target'{},
binary(),
rabbit_types:vhost(),
rabbit_types:user(),
binary(),
rabbit_types:connection(),
permission_cache()) ->
{ok,
rabbit_types:exchange() | rabbit_exchange:name() | to,
rabbit_types:routing_key() | to | subject,
rabbit_misc:resource_name() | undefined,
#'v1_0.target'{},
permission_cache()} |
{error, term()}.
ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
exit_not_implemented("Dynamic targets not supported");
ensure_target(#'v1_0.target'{address = Address,
durable = Durable},
Vhost, User, PermCache) ->
ensure_target(#'v1_0.target'{
address = undefined,
dynamic = true,
%% We will reply with the actual node properties.
dynamic_node_properties = _IgnoreDesiredProperties,
capabilities = {array, symbol, Caps}
} = Target0,
LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache1} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
{ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1),
Target = #'v1_0.target'{
address = {utf8, Address},
%% While Khepri stores queue records durably,
%% the terminus - i.e. the existence of this producer - is not stored durably.
durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic = true,
dynamic_node_properties = Props,
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
{ok, Exchange, QNameBin, QNameBin, Target, PermCache};
false ->
exit_not_implemented("Dynamic target not supported: ~p", [Target0])
end;
ensure_target(Target = #'v1_0.target'{dynamic = true}, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic target not supported: ~p", [Target]);
ensure_target(Target = #'v1_0.target'{address = Address,
durable = Durable},
_LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache0) ->
case target_address_version(Address) of
2 ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, to, RKey, QNameBin, Target, PermCache0};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache0),
{ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err ->
Err
end;
1 ->
case address_v1_permitted() of
true ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
{ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache1),
{ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err ->
Err
end;
@ -2733,7 +2835,7 @@ ensure_target(#'v1_0.target'{address = Address,
end
end.
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
check_exchange(XNameBin, User, Vhost, PermCache0) ->
XName = exchange_resource(Vhost, XNameBin),
PermCache = check_resource_access(XName, write, User, PermCache0),
case rabbit_exchange:lookup(XName) of
@ -2747,7 +2849,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
<<"amq.", _/binary>> -> X;
_ -> XName
end,
{ok, Exchange, RKey, QNameBin, PermCache};
{ok, Exchange, PermCache};
{error, not_found} ->
exit_not_found(XName)
end.
@ -3035,7 +3137,10 @@ credit_reply_timeout(QType, QName) ->
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
default(undefined, Default) -> Default;
default(Thing, _Default) -> Thing.
default(Thing, _Default) -> Thing.
address(undefined) -> null;
address({utf8, String}) -> String.
snd_settle_mode({ubyte, Val}) ->
case Val of
@ -3249,20 +3354,20 @@ ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durabil
ok = exit_if_absent(exchange, Vhost, XNameList),
case Type of
target -> {undefined, PermCache};
source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache)
source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache)
end;
ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) ->
%% exchange amq.topic exists
{undefined, PermCache};
ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) ->
%% exchange amq.topic exists
declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache);
declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache);
ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) ->
%% Target "/queue" means publish to default exchange with message subject as routing key.
%% Default exchange exists.
{undefined, PermCache};
ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) ->
declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache);
declare_queue_v1(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache);
ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) ->
%% Target "/amq/queue/" is handled specially due to AMQP legacy:
%% "Queue names starting with "amq." are reserved for pre-declared and
@ -3287,22 +3392,39 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) ->
false -> exit_not_found(ResourceName)
end.
generate_queue_name() ->
generate_queue_name_v1() ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen").
%% "The generated name of the address SHOULD include the link name and the
%% container-id of the remote container to allow for ease of identification." [3.5.4]
%% Let's include container-id and link name if they are not very long
%% because the generated address might be sent in every message.
generate_queue_name_dynamic(ContainerId, LinkName)
when byte_size(ContainerId) + byte_size(LinkName) < 150 ->
Prefix = <<"amq.dyn-", ContainerId/binary, "-", LinkName/binary>>,
rabbit_guid:binary(rabbit_guid:gen_secure(), Prefix);
generate_queue_name_dynamic(_, _) ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.dyn.gen").
declare_queue_v1(QNameBin, Vhost, User, TerminusDurability, PermCache0) ->
Durable = queue_is_durable(TerminusDurability),
{ok, PermCache} = declare_queue(QNameBin, Vhost, User, Durable, none, PermCache0),
{QNameBin, PermCache}.
declare_queue(QNameBin,
Vhost,
User = #user{username = Username},
TerminusDurability,
Durable,
QOwner,
PermCache0) ->
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName,
_Pid = none,
queue_is_durable(TerminusDurability),
Durable,
_AutoDelete = false,
_QOwner = none,
QOwner,
_QArgs = [],
Vhost,
#{user => Username},
@ -3322,7 +3444,40 @@ declare_queue(QNameBin,
"Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other])
end,
{QNameBin, PermCache}.
{ok, PermCache}.
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) ->
QNameBin = generate_queue_name_dynamic(ContainerId, LinkName),
{ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0),
QNameBinQuoted = uri_string:quote(QNameBin),
Address = <<"/queues/", QNameBinQuoted/binary>>,
Props = {map, [{{symbol, <<"lifetime-policy">>},
{described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}},
{{symbol, <<"supported-dist-modes">>},
{array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]},
{QNameBin, Address, Props, PermCache}.
maybe_delete_dynamic_queue(#incoming_link{dynamic = true,
queue_name_bin = QNameBin},
Cfg = #cfg{vhost = Vhost}) ->
QName = queue_resource(Vhost, QNameBin),
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(#outgoing_link{dynamic = true,
queue_name = QName},
Cfg) ->
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(_, _) ->
ok.
delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) ->
%% No real need to check for 'configure' access again since this queue is owned by
%% this connection and the user had 'configure' access when the queue got declared.
_ = rabbit_amqqueue:with(
QName,
fun(Q) ->
rabbit_queue_type:delete(Q, false, false, Username)
end),
ok.
outcomes(#'v1_0.source'{outcomes = undefined}) ->
{array, symbol, ?OUTCOMES};

View File

@ -55,9 +55,12 @@ groups() ->
[
%% authz
attach_source_queue,
attach_source_queue_dynamic,
attach_target_exchange,
attach_target_topic_exchange,
attach_target_queue,
attach_target_queue_dynamic_exchange_write,
attach_target_queue_dynamic_queue_configure,
target_per_message_exchange,
target_per_message_internal_exchange,
target_per_message_topic,
@ -437,6 +440,39 @@ attach_source_queue(Config) ->
end,
ok = close_connection_sync(Conn).
attach_source_queue_dynamic(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing configure permission to queue
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
Source = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
AttachArgs = #{name => <<"my link">>,
role => {receiver, Source, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
receive {amqp10_event,
{session, Session,
{ended, Error}}} ->
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, Description}} = Error,
?assertEqual(
match,
re:run(Description,
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
"'test vhost' refused for user 'test user'$">>,
[{capture, none}]))
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
attach_target_exchange(Config) ->
XName = <<"amq.fanout">>,
Address1 = rabbitmq_amqp_address:exchange(XName),
@ -485,6 +521,61 @@ attach_target_queue(Config) ->
end,
ok = amqp10_client:close_connection(Conn).
attach_target_queue_dynamic_exchange_write(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing write permission to default exchange
ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>),
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
AttachArgs = #{name => <<"my link">>,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
ExpectedErr = error_unauthorized(
<<"write access to exchange 'amq.default' ",
"in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
attach_target_queue_dynamic_queue_configure(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing configure permission to queue
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
AttachArgs = #{name => <<"my link">>,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
receive {amqp10_event,
{session, Session,
{ended, Error}}} ->
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, Description}} = Error,
?assertEqual(
match,
re:run(Description,
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
"'test vhost' refused for user 'test user'$">>,
[{capture, none}]))
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
target_per_message_exchange(Config) ->
TargetAddress = null,
To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),

View File

@ -130,6 +130,10 @@ groups() ->
handshake_timeout,
credential_expires,
attach_to_exclusive_queue,
dynamic_target_short_link_name,
dynamic_target_long_link_name,
dynamic_source_rpc,
dynamic_terminus_delete,
modified_classic_queue,
modified_quorum_queue,
modified_dead_letter_headers_exchange,
@ -4762,6 +4766,230 @@ attach_to_exclusive_queue(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
dynamic_target_short_link_name(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"my-container">>,
notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% "The address of the target MUST NOT be set" [3.5.4]
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
ShortLinkName = <<"my/sender">>,
AttachArgs = #{name => ShortLinkName,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
#'v1_0.attach'{
target = #'v1_0.target'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to contain both container ID and link name since they are short.
?assertMatch(<<"/queues/amq.dyn-my-container-my%2Fsender-", _GUID/binary>>, Addr),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = wait_for_accepted(<<"t1">>),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
ok = amqp10_client:accept_msg(Receiver, Msg),
%% The exclusive queue should be deleted when we close our connection.
?assertMatch([_ExclusiveQueue], rpc(Config, rabbit_amqqueue, list, [])),
ok = close_connection_sync(Connection),
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
ok.
dynamic_target_long_link_name(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"my-container">>,
notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% "The address of the target MUST NOT be set" [3.5.4]
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
LongLinkName = binary:copy(<<"z">>, 200),
AttachArgs = #{name => LongLinkName,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
#'v1_0.attach'{
target = #'v1_0.target'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to not contain the long link name.
?assertMatch(<<"/queues/amq.dyn.gen-", _GUID/binary>>, Addr),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = wait_for_accepted(<<"t1">>),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
ok = amqp10_client:accept_msg(Receiver, Msg),
flush(accepted),
%% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
%% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Receiver, {detached, Detach}}} ->
?assertMatch(
#'v1_0.detach'{error = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}},
Detach)
after 5000 -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
%% Test the following RPC workflow:
%% RPC client -> queue -> RPC server
%% RPC server -> dynamic queue -> RPC client
dynamic_source_rpc(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"rpc-client">>,
notify_with_performative => true},
{ok, ConnectionClient} = amqp10_client:open_connection(OpnConf),
{ok, SessionClient} = amqp10_client:begin_session_sync(ConnectionClient),
%% "The address of the source MUST NOT be set" [3.5.3]
Source = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>,
role => {receiver, Source, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
{ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs),
RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to contain both container ID and link name since they are short.
?assertMatch(<<"/queues/amq.dyn-rpc-client-rpc-client-receiver", _CarrotAndGUID/binary>>,
RespAddr),
%% Let's use a separate connection for the RPC server.
{_, SessionServer, LinkPair} = RpcServer = init(Config),
ReqQName = atom_to_binary(?FUNCTION_NAME),
ReqAddr = rabbitmq_amqp_address:queue(ReqQName),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, ReqQName, #{}),
{ok, ReceiverServer} = amqp10_client:attach_receiver_link(SessionServer, <<"rpc-server-receiver">>, ReqAddr, unsettled),
{ok, SenderServer} = amqp10_client:attach_sender_link(SessionServer, <<"rpc-server-sender">>, null),
ok = wait_for_credit(SenderServer),
{ok, SenderClient} = amqp10_client:attach_sender_link(SessionClient, <<"rpc-client-sender">>, ReqAddr),
wait_for_credit(SenderClient),
flush(attached),
ok = amqp10_client:send_msg(
SenderClient,
amqp10_msg:set_properties(
#{reply_to => RespAddr},
amqp10_msg:new(<<"t1">>, <<"hello">>))),
ok = wait_for_accepted(<<"t1">>),
{ok, ReqMsg} = amqp10_client:get_msg(ReceiverServer),
ReqBody = amqp10_msg:body_bin(ReqMsg),
RespBody = string:uppercase(ReqBody),
#{reply_to := ReplyTo} = amqp10_msg:properties(ReqMsg),
ok = amqp10_client:send_msg(
SenderServer,
amqp10_msg:set_properties(
#{to => ReplyTo},
amqp10_msg:new(<<"t2">>, RespBody))),
ok = wait_for_accepted(<<"t2">>),
ok = amqp10_client:accept_msg(ReceiverServer, ReqMsg),
{ok, RespMsg} = amqp10_client:get_msg(ReceiverClient),
?assertEqual(<<"HELLO">>, amqp10_msg:body_bin(RespMsg)),
ok = amqp10_client:accept_msg(ReceiverClient, RespMsg),
ok = detach_link_sync(ReceiverServer),
ok = detach_link_sync(SenderClient),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, ReqQName),
ok = detach_link_sync(SenderServer),
ok = close(RpcServer),
ok = close_connection_sync(ConnectionClient).
dynamic_terminus_delete(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session1} = amqp10_client:begin_session_sync(Connection),
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
Terminus = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
RcvAttachArgs = #{role => {receiver, Terminus, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
SndAttachArgs = #{role => {sender, Terminus},
snd_settle_mode => mixed,
rcv_settle_mode => first},
RcvAttachArgs1 = RcvAttachArgs#{name => <<"receiver 1">>},
RcvAttachArgs2 = RcvAttachArgs#{name => <<"receiver 2">>},
RcvAttachArgs3 = RcvAttachArgs#{name => <<"receiver 3">>},
SndAttachArgs1 = SndAttachArgs#{name => <<"sender 1">>},
SndAttachArgs2 = SndAttachArgs#{name => <<"sender 2">>},
SndAttachArgs3 = SndAttachArgs#{name => <<"sender 3">>},
{ok, _R1} = amqp10_client:attach_link(Session1, RcvAttachArgs1),
{ok, _R2} = amqp10_client:attach_link(Session2, RcvAttachArgs2),
{ok, R3} = amqp10_client:attach_link(Session2, RcvAttachArgs3),
{ok, _S1} = amqp10_client:attach_link(Session1, SndAttachArgs1),
{ok, _S2} = amqp10_client:attach_link(Session2, SndAttachArgs2),
{ok, S3} = amqp10_client:attach_link(Session2, SndAttachArgs3),
[receive {amqp10_event, {link, _LinkRef, attached}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end
|| _ <- lists:seq(1, 6)],
%% We should now have 6 exclusive queues.
?assertEqual(6, rpc(Config, rabbit_amqqueue, count, [])),
%% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
%% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
ok = detach_link_sync(R3),
ok = detach_link_sync(S3),
?assertEqual(4, rpc(Config, rabbit_amqqueue, count, [])),
%% When a session is ended, the sessions's links cease to exist.
ok = end_session_sync(Session2),
eventually(?_assertEqual(2, rpc(Config, rabbit_amqqueue, count, []))),
%% When a connection is closed, the connection's links cease to exist.
ok = close_connection_sync(Connection),
eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))),
ok.
priority_classic_queue(Config) ->
QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>},
<<"x-max-priority">> => {ulong, 10}},

View File

@ -14,6 +14,10 @@
-compile(nowarn_export_all).
-compile(export_all).
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/3]).
-import(amqp_utils,
[init/1,
close/1,
@ -30,8 +34,15 @@ all() ->
groups() ->
[{cluster_size_1, [shuffle],
[
%% CT test case per Java class
jms_connection,
jms_temporary_queue,
%% CT test case per test in Java class JmsTest
message_types_jms_to_jms,
message_types_jms_to_amqp
message_types_jms_to_amqp,
temporary_queue_rpc,
temporary_queue_delete
]
}].
@ -54,7 +65,9 @@ end_per_suite(Config) ->
init_per_group(cluster_size_1, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}),
Config1 = rabbit_ct_helpers:set_config(
Config,
{rmq_nodename_suffix, Suffix}),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit,
@ -82,6 +95,9 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, [])), 1000, 5),
eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, [])), 1000, 5),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
build_maven_test_project(Config) ->
@ -98,11 +114,17 @@ build_maven_test_project(Config) ->
%% Testcases.
%% -------------------------------------------------------------------
jms_connection(Config) ->
ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsConnectionTest">>]}], Config).
jms_temporary_queue(Config) ->
ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsTemporaryQueueTest">>]}], Config).
%% Send different message types from JMS client to JMS client.
message_types_jms_to_jms(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"quorum">>, Config),
ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
%% Send different message types from JMS client to Erlang AMQP 1.0 client.
@ -112,7 +134,7 @@ message_types_jms_to_amqp(Config) ->
Address = rabbitmq_amqp_address:queue(QName),
%% The JMS client sends messaegs.
ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config),
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config),
%% The Erlang AMQP 1.0 client receives messages.
OpnConf = connection_config(Config),
@ -120,6 +142,7 @@ message_types_jms_to_amqp(Config) ->
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled),
{ok, Msg1} = amqp10_client:get_msg(Receiver),
?assertEqual(
#'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}},
amqp10_msg:body(Msg1)),
@ -149,16 +172,31 @@ message_types_jms_to_amqp(Config) ->
ok = close_connection_sync(Connection),
ok = delete_queue(QName, Config).
temporary_queue_rpc(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"classic">>, Config),
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
temporary_queue_delete(Config) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ok = run_jms_test(TestName, [], Config).
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
run_jms_test(TestName, JavaProps, Config) ->
run(TestName, [{"-Dtest=JmsTest#~ts", [TestName]} | JavaProps], Config).
run(TestName, JavaProps, Config) ->
TestProjectDir = ?config(data_dir, Config),
Cmd = [filename:join([TestProjectDir, "mvnw"]),
"test",
{"-Dtest=JmsTest#~ts", [TestName]},
{"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]}
{"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]},
{"-Dnodename=~ts", [rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)]},
{"-Drabbitmqctl.bin=~ts", [rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd)]}
] ++ JavaProps,
case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of
{ok, _Stdout_} ->

View File

@ -0,0 +1,163 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.UnknownHostException;
final class Cli {
private Cli() {}
static void startBroker() {
rabbitmqctl("start_app");
}
static void stopBroker() {
rabbitmqctl("stop_app");
}
private static ProcessState rabbitmqctl(String command) {
return rabbitmqctl(command, nodename());
}
private static ProcessState rabbitmqctl(String command, String nodename) {
return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
}
private static String rabbitmqctlCommand() {
return System.getProperty("rabbitmqctl.bin");
}
public static String nodename() {
return System.getProperty("nodename", "rabbit@" + hostname());
}
public static String hostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
try {
return executeCommand("hostname").output();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
private static ProcessState executeCommand(String command) {
return executeCommand(command, false);
}
private static ProcessState executeCommand(String command, boolean ignoreError) {
Process pr = executeCommandProcess(command);
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
int ev = waitForExitValue(pr, inputState, errorState);
inputState.pump();
errorState.pump();
if (ev != 0 && !ignoreError) {
throw new RuntimeException(
"unexpected command exit value: "
+ ev
+ "\ncommand: "
+ command
+ "\n"
+ "\nstdout:\n"
+ inputState.buffer.toString()
+ "\nstderr:\n"
+ errorState.buffer.toString()
+ "\n");
}
return new ProcessState(inputState);
}
private static int waitForExitValue(
Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) {
while (true) {
try {
inputState.pump();
errorState.pump();
pr.waitFor();
break;
} catch (InterruptedException ignored) {
}
}
return pr.exitValue();
}
private static Process executeCommandProcess(String command) {
String[] finalCommand;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
finalCommand = new String[4];
finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
finalCommand[1] = "/y";
finalCommand[2] = "/c";
finalCommand[3] = command;
} else {
finalCommand = new String[3];
finalCommand[0] = "/bin/sh";
finalCommand[1] = "-c";
finalCommand[2] = command;
}
try {
return Runtime.getRuntime().exec(finalCommand);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class ProcessState {
private final InputStreamPumpState inputState;
ProcessState(InputStreamPumpState inputState) {
this.inputState = inputState;
}
String output() {
return inputState.buffer.toString();
}
}
private static class InputStreamPumpState {
private final BufferedReader reader;
private final StringBuilder buffer;
private InputStreamPumpState(InputStream in) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.buffer = new StringBuilder();
}
void pump() {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
throw new RuntimeException(e);
}
buffer.append(line).append("\n");
}
}
}
}

View File

@ -0,0 +1,199 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.Cli.startBroker;
import static com.rabbitmq.amqp.tests.jms.Cli.stopBroker;
import static com.rabbitmq.amqp.tests.jms.TestUtils.*;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import jakarta.jms.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
/**
* Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
*/
public class JmsConnectionTest {
@Test
@Timeout(30)
public void testCreateConnection() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
}
}
@Test
@Timeout(30)
public void testCreateConnectionAndStart() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
// Currently not supported by RabbitMQ.
@Disabled
public void testCreateWithDuplicateClientIdFails() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
JmsConnection connection1 = (JmsConnection) factory.createConnection();
connection1.setClientID("Test");
assertNotNull(connection1);
connection1.start();
JmsConnection connection2 = (JmsConnection) factory.createConnection();
try {
connection2.setClientID("Test");
fail("should have thrown a JMSException");
} catch (InvalidClientIDException ex) {
// OK
} catch (Exception unexpected) {
fail("Wrong exception type thrown: " + unexpected);
}
connection1.close();
connection2.close();
}
@Test
public void testSetClientIdAfterStartedFails() {
assertThrows(
JMSException.class,
() -> {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
try (Connection connection = factory.createConnection()) {
connection.setClientID("Test");
connection.start();
connection.setClientID("NewTest");
}
});
}
@Test
@Timeout(30)
public void testCreateConnectionAsSystemAdmin() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
factory.setUsername(adminUsername());
factory.setPassword(adminPassword());
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
public void testCreateConnectionCallSystemAdmin() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
public void testCreateConnectionAsUnknwonUser() {
assertThrows(
JMSSecurityException.class,
() -> {
JmsConnectionFactory factory = new JmsConnectionFactory(TestUtils.brokerUri());
factory.setUsername("unknown");
factory.setPassword("unknown");
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
connection.start();
}
});
}
@Test
@Timeout(30)
public void testCreateConnectionCallUnknwonUser() {
assertThrows(
JMSSecurityException.class,
() -> {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri());
try (Connection connection = factory.createConnection("unknown", "unknown")) {
assertNotNull(connection);
connection.start();
}
});
}
@Test
@Timeout(30)
public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exception {
Connection connection = new JmsConnectionFactory(brokerUri()).createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// TODO use a "regular" queue
TemporaryQueue queue = session.createTemporaryQueue();
// String destinationName = name(info);
// Queue queue = session.createQueue("/queues/" + destinationName);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
producer.send(m);
try {
stopBroker();
try {
connection.close();
} catch (Exception ex) {
fail("Should not have thrown an exception.");
}
} finally {
startBroker();
}
}
@Test
@Timeout(60)
public void testConnectionExceptionBrokerStop() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (Connection connection = new JmsConnectionFactory(brokerUri()).createConnection()) {
connection.setExceptionListener(exception -> latch.countDown());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
try {
stopBroker();
assertTrue(latch.await(10, TimeUnit.SECONDS));
} finally {
startBroker();
}
}
}
}

View File

@ -0,0 +1,135 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.fail;
import jakarta.jms.*;
import jakarta.jms.IllegalStateException;
import java.util.UUID;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
*/
public class JmsTemporaryQueueTest {
Connection connection;
@AfterEach
void tearDown() throws JMSException {
connection.close();
}
@Test
@Timeout(60)
public void testCreatePublishConsumeTemporaryQueue() throws Exception {
connection = new JmsConnectionFactory(brokerUri()).createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
TemporaryQueue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
String body = UUID.randomUUID().toString();
producer.send(session.createTextMessage(body));
assertEquals(body, consumer.receive(60_000).getBody(String.class));
}
@Test
@Timeout(60)
public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception {
connection = new JmsConnectionFactory(brokerUri()).createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
session.createConsumer(tempQueue);
Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session2.createConsumer(tempQueue);
fail("should not be able to consumer from temporary queue from another connection");
} catch (InvalidDestinationException ide) {
// expected
}
} finally {
connection2.close();
}
}
@Test
@Timeout(60)
public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception {
connection = new JmsConnectionFactory(brokerUri()).createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Message msg = session2.createMessage();
MessageProducer producer = session2.createProducer(tempQueue);
// Close the original connection
connection.close();
try {
producer.send(msg);
fail("should not be able to send to temporary queue from closed connection");
} catch (jakarta.jms.IllegalStateException ide) {
// expected
}
} finally {
connection2.close();
}
}
@Test
@Timeout(60)
public void testCantDeleteTemporaryQueueWithConsumers() throws Exception {
connection = new JmsConnectionFactory(brokerUri()).createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(tempQueue);
try {
tempQueue.delete();
fail("should not be able to delete temporary queue with active consumers");
} catch (IllegalStateException ide) {
// expected
}
consumer.close();
// Now it should be allowed
tempQueue.delete();
}
}

View File

@ -1,7 +1,6 @@
package com.rabbitmq.amqp.tests.jms;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.*;
import jakarta.jms.*;
import java.util.*;
@ -104,8 +103,6 @@ public class JmsTest {
Session session = connection.createSession();
Destination queue = (Destination) context.lookup("myQueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
// TextMessage
String msg1 = "msg1🥕";
@ -128,5 +125,57 @@ public class JmsTest {
streamMessage.writeLong(-1L);
producer.send(streamMessage);
}
}
// Test that Request/reply pattern using a TemporaryQueue works.
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee
@Test
public void temporary_queue_rpc() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
try (JMSContext clientContext = factory.createContext()) {
Destination responseQueue = clientContext.createTemporaryQueue();
JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue);
Destination requestQueue = (Destination) context.lookup("myQueue");
TextMessage clientRequestMessage = clientContext.createTextMessage("hello");
clientContext.createProducer().
setJMSReplyTo(responseQueue).
send(requestQueue, clientRequestMessage);
// Let's open a new connection to simulate the RPC server.
try (JMSContext serverContext = factory.createContext()) {
JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue);
TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000);
TextMessage serverResponseMessage = serverContext.createTextMessage(
serverRequestMessage.getText().toUpperCase());
serverContext.createProducer().
send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage);
}
TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000);
assertEquals("HELLO", clientResponseMessage.getText());
}
}
// Test that a temporary queue can be deleted.
@Test
public void temporary_queue_delete() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
try (JMSContext clientContext = factory.createContext()) {
TemporaryQueue queue = clientContext.createTemporaryQueue();
queue.delete();
try {
clientContext.createProducer().send(queue, "hello");
fail("should not be able to create producer for deleted temporary queue");
} catch (IllegalStateRuntimeException expectedException) {
assertEquals("Temporary destination has been deleted", expectedException.getMessage());
}
}
}
}

View File

@ -0,0 +1,66 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static java.lang.String.format;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.UUID;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
final class TestUtils {
private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672";
private TestUtils() { }
static String brokerUri() {
String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672");
return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri;
}
static String adminUsername() {
return "guest";
}
static String adminPassword() {
return "guest";
}
static String name(TestInfo info) {
return name(info.getTestClass().get(), info.getTestMethod().get());
}
private static String name(Class<?> testClass, Method testMethod) {
return name(testClass, testMethod.getName());
}
private static String name(Class<?> testClass, String testMethod) {
String uuid = UUID.randomUUID().toString();
return format(
"%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2));
}
}