Merge branch 'main' into issue-12324

This commit is contained in:
Michael Klishin 2025-02-12 18:41:47 -05:00 committed by GitHub
commit 8d0609eca0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1720 additions and 202 deletions

View File

@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
make_source(#{role := {sender, _}}) -> make_source(#{role := {sender, _}}) ->
#'v1_0.source'{}; #'v1_0.source'{};
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> make_source(#{role := {receiver, Source, _Pid},
filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)), Durable = translate_terminus_durability(maps:get(durable, Source, none)),
Dynamic = maps:get(dynamic, Source, false),
TranslatedFilter = translate_filters(Filter), TranslatedFilter = translate_filters(Filter),
#'v1_0.source'{address = {utf8, Address}, #'v1_0.source'{address = make_address(Source),
durable = {uint, Durable}, durable = {uint, Durable},
filter = TranslatedFilter}. dynamic = Dynamic,
filter = TranslatedFilter,
capabilities = make_capabilities(Source)}.
make_target(#{role := {receiver, _Source, _Pid}}) -> make_target(#{role := {receiver, _Source, _Pid}}) ->
#'v1_0.target'{}; #'v1_0.target'{};
make_target(#{role := {sender, #{address := Address} = Target}}) -> make_target(#{role := {sender, Target}}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)), Durable = translate_terminus_durability(maps:get(durable, Target, none)),
TargetAddr = case is_binary(Address) of Dynamic = maps:get(dynamic, Target, false),
true -> {utf8, Address}; #'v1_0.target'{address = make_address(Target),
false -> Address durable = {uint, Durable},
end, dynamic = Dynamic,
#'v1_0.target'{address = TargetAddr, capabilities = make_capabilities(Target)}.
durable = {uint, Durable}}.
make_address(#{address := Addr}) ->
if is_binary(Addr) ->
{utf8, Addr};
is_atom(Addr) ->
Addr
end.
make_capabilities(#{capabilities := Caps0}) ->
Caps = [{symbol, C} || C <- Caps0],
{array, symbol, Caps};
make_capabilities(_) ->
undefined.
max_message_size(#{max_message_size := Size}) max_message_size(#{max_message_size := Size})
when is_integer(Size) andalso when is_integer(Size) andalso

View File

@ -3,6 +3,8 @@
-define(CLOSING_TIMEOUT, 30_000). -define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000). -define(SILENT_CLOSE_DELAY, 3_000).
-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000).
%% Allow for potentially large sets of tokens during the SASL exchange. %% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915 %% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192). -define(INITIAL_MAX_FRAME_SIZE, 8192).

View File

@ -220,10 +220,17 @@ terminate(_, _) ->
%%-------------------------------------------------------------------------- %%--------------------------------------------------------------------------
%% error handling / termination %% error handling / termination
close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) ->
%% Client properties will be emitted in the connection_closed event by rabbit_reader. %% Client properties will be emitted in the connection_closed event by rabbit_reader.
ClientProperties = i(client_properties, State), ClientProperties = i(client_properties, State0),
put(client_properties, ClientProperties), put(client_properties, ClientProperties),
%% "It is illegal to send any more frames (or bytes of any other kind)
%% after sending a close frame." [2.7.9]
%% Sessions might send frames via the writer proc.
%% Therefore, let's first try to orderly shutdown our sessions.
State = shutdown_sessions(State0),
Time = case Timeout > 0 andalso Time = case Timeout > 0 andalso
Timeout < ?CLOSING_TIMEOUT of Timeout < ?CLOSING_TIMEOUT of
true -> Timeout; true -> Timeout;
@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing), ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing),
State#v1{connection_state = closed}. State#v1{connection_state = closed}.
shutdown_sessions(#v1{tracked_channels = Channels} = State) ->
maps:foreach(fun(_ChannelNum, Pid) ->
gen_server:cast(Pid, shutdown)
end, Channels),
TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT,
self(),
shutdown_sessions_timeout),
wait_for_shutdown_sessions(TimerRef, State).
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State)
when map_size(Channels) =:= 0 ->
ok = erlang:cancel_timer(TimerRef, [{async, false},
{info, false}]),
State;
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) ->
receive
{{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} ->
State = untrack_channel(ChannelNum, SessionPid, State0),
wait_for_shutdown_sessions(TimerRef, State);
shutdown_sessions_timeout ->
?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p",
[?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]),
State0
end.
handle_session_exit(ChannelNum, SessionPid, Reason, State0) -> handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
State = untrack_channel(ChannelNum, SessionPid, State0), State = untrack_channel(ChannelNum, SessionPid, State0),
S = case terminated_normally(Reason) of S = case terminated_normally(Reason) of
@ -760,6 +792,7 @@ send_to_new_session(
connection = #v1_connection{outgoing_max_frame_size = MaxFrame, connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
vhost = Vhost, vhost = Vhost,
user = User, user = User,
container_id = ContainerId,
name = ConnName}, name = ConnName},
writer = WriterPid} = State) -> writer = WriterPid} = State) ->
%% Subtract fixed frame header size. %% Subtract fixed frame header size.
@ -772,6 +805,7 @@ send_to_new_session(
OutgoingMaxFrameSize, OutgoingMaxFrameSize,
User, User,
Vhost, Vhost,
ContainerId,
ConnName, ConnName,
BeginFrame], BeginFrame],
case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of

View File

@ -85,8 +85,10 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12). -define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(HIBERNATE_AFTER, 6_000). -define(HIBERNATE_AFTER, 6_000).
-define(CREDIT_REPLY_TIMEOUT, 30_000). -define(CREDIT_REPLY_TIMEOUT, 30_000).
%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client.
-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>).
-export([start_link/8, -export([start_link/9,
process_frame/2, process_frame/2,
list_local/0, list_local/0,
conserve_resources/3, conserve_resources/3,
@ -163,6 +165,7 @@
routing_key :: rabbit_types:routing_key() | to | subject, routing_key :: rabbit_types:routing_key() | to | subject,
%% queue_name_bin is only set if the link target address refers to a queue. %% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(), queue_name_bin :: undefined | rabbit_misc:resource_name(),
dynamic :: boolean(),
max_message_size :: pos_integer(), max_message_size :: pos_integer(),
delivery_count :: sequence_no(), delivery_count :: sequence_no(),
credit :: rabbit_queue_type:credit(), credit :: rabbit_queue_type:credit(),
@ -206,6 +209,7 @@
%% or a topic filter, an outgoing link will always consume from a queue. %% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(), queue_name :: rabbit_amqqueue:name(),
queue_type :: rabbit_queue_type:queue_type(), queue_type :: rabbit_queue_type:queue_type(),
dynamic :: boolean(),
send_settled :: boolean(), send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(), max_message_size :: unlimited | pos_integer(),
@ -260,6 +264,7 @@
-record(cfg, { -record(cfg, {
outgoing_max_frame_size :: unlimited | pos_integer(), outgoing_max_frame_size :: unlimited | pos_integer(),
container_id :: binary(),
reader_pid :: rabbit_types:connection(), reader_pid :: rabbit_types:connection(),
writer_pid :: pid(), writer_pid :: pid(),
user :: rabbit_types:user(), user :: rabbit_types:user(),
@ -382,15 +387,17 @@
-type state() :: #state{}. -type state() :: #state{}.
start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) -> start_link(ReaderPid, WriterPid, ChannelNum, FrameMax,
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame}, User, Vhost, ContainerId, ConnName, BeginFrame) ->
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax,
User, Vhost, ContainerId, ConnName, BeginFrame},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}], Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Args, Opts). gen_server:start_link(?MODULE, Args, Opts).
process_frame(Pid, FrameBody) -> process_frame(Pid, FrameBody) ->
gen_server:cast(Pid, {frame_body, FrameBody}). gen_server:cast(Pid, {frame_body, FrameBody}).
init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, ConnName,
#'v1_0.begin'{ #'v1_0.begin'{
%% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2] %% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2]
remote_channel = undefined, remote_channel = undefined,
@ -401,6 +408,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
process_flag(trap_exit, true), process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(), rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum, logger:update_process_metadata(#{channel_number => ChannelNum,
amqp_container => ContainerId,
connection => ConnName, connection => ConnName,
vhost => Vhost, vhost => Vhost,
user => User#user.username}), user => User#user.username}),
@ -453,7 +461,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
remote_incoming_window = RemoteIncomingWindow, remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow, remote_outgoing_window = RemoteOutgoingWindow,
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID, outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID,
cfg = #cfg{reader_pid = ReaderPid, cfg = #cfg{container_id = ContainerId,
reader_pid = ReaderPid,
writer_pid = WriterPid, writer_pid = WriterPid,
outgoing_max_frame_size = MaxFrameSize, outgoing_max_frame_size = MaxFrameSize,
user = User, user = User,
@ -470,14 +479,17 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
terminate(_Reason, #state{incoming_links = IncomingLinks, terminate(_Reason, #state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks, outgoing_links = OutgoingLinks,
queue_states = QStates}) -> queue_states = QStates,
cfg = Cfg}) ->
maps:foreach( maps:foreach(
fun (_, _) -> fun (_, Link) ->
rabbit_global_counters:publisher_deleted(?PROTOCOL) rabbit_global_counters:publisher_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
end, IncomingLinks), end, IncomingLinks),
maps:foreach( maps:foreach(
fun (_, _) -> fun (_, Link) ->
rabbit_global_counters:consumer_deleted(?PROTOCOL) rabbit_global_counters:consumer_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
end, OutgoingLinks), end, OutgoingLinks),
ok = rabbit_queue_type:close(QStates). ok = rabbit_queue_type:close(QStates).
@ -602,7 +614,9 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
noreply(State) noreply(State)
catch exit:#'v1_0.error'{} = Error -> catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1) log_error_and_close_session(Error, State1)
end. end;
handle_cast(shutdown, State) ->
{stop, normal, State}.
log_error_and_close_session( log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
@ -1092,39 +1106,52 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach,
end; end;
handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
State0 = #state{incoming_links = IncomingLinks, State0 = #state{incoming_links = IncomingLinks0,
outgoing_links = OutgoingLinks0, outgoing_links = OutgoingLinks0,
outgoing_unsettled_map = Unsettled0, outgoing_unsettled_map = Unsettled0,
outgoing_pending = Pending0, outgoing_pending = Pending0,
queue_states = QStates0, queue_states = QStates0,
cfg = #cfg{user = #user{username = Username}}}) -> cfg = Cfg = #cfg{user = #user{username = Username}}}) ->
{OutgoingLinks, Unsettled, Pending, QStates} = {OutgoingLinks, Unsettled, Pending, QStates} =
case maps:take(HandleInt, OutgoingLinks0) of case maps:take(HandleInt, OutgoingLinks0) of
{#outgoing_link{queue_name = QName}, OutgoingLinks1} -> {#outgoing_link{queue_name = QName,
dynamic = Dynamic}, OutgoingLinks1} ->
Ctag = handle_to_ctag(HandleInt), Ctag = handle_to_ctag(HandleInt),
{Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0),
case rabbit_amqqueue:lookup(QName) of case Dynamic of
{ok, Q} -> true ->
Spec = #{consumer_tag => Ctag, delete_dynamic_queue(QName, Cfg),
reason => remove, {OutgoingLinks1, Unsettled1, Pending1, QStates0};
user => Username}, false ->
case rabbit_queue_type:cancel(Q, Spec, QStates0) of case rabbit_amqqueue:lookup(QName) of
{ok, QStates1} -> {ok, Q} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates1}; Spec = #{consumer_tag => Ctag,
{error, Reason} -> reason => remove,
protocol_error( user => Username},
?V_1_0_AMQP_ERROR_INTERNAL_ERROR, case rabbit_queue_type:cancel(Q, Spec, QStates0) of
"Failed to remove consumer from ~s: ~tp", {ok, QStates1} ->
[rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) {OutgoingLinks1, Unsettled1, Pending1, QStates1};
end; {error, Reason} ->
{error, not_found} -> protocol_error(
{OutgoingLinks1, Unsettled1, Pending1, QStates0} ?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Failed to remove consumer from ~s: ~tp",
[rabbit_misc:rs(amqqueue:get_name(Q)), Reason])
end;
{error, not_found} ->
{OutgoingLinks1, Unsettled1, Pending1, QStates0}
end
end; end;
error -> error ->
{OutgoingLinks0, Unsettled0, Pending0, QStates0} {OutgoingLinks0, Unsettled0, Pending0, QStates0}
end, end,
IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of
State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks), {IncomingLink, IncomingLinks1} ->
maybe_delete_dynamic_queue(IncomingLink, Cfg),
IncomingLinks1;
error ->
IncomingLinks0
end,
State1 = State0#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks, outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled, outgoing_unsettled_map = Unsettled,
outgoing_pending = Pending, outgoing_pending = Pending,
@ -1269,29 +1296,33 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State); reply_frames([Reply], State);
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName = {utf8, LinkName0}, name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt), handle = Handle = ?UINT(HandleInt),
source = Source, source = Source,
snd_settle_mode = MaybeSndSettleMode, snd_settle_mode = MaybeSndSettleMode,
target = Target = #'v1_0.target'{address = TargetAddress}, target = Target0,
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
}, },
State0 = #state{incoming_links = IncomingLinks0, State0 = #state{incoming_links = IncomingLinks0,
permission_cache = PermCache0, permission_cache = PermCache0,
cfg = #cfg{max_link_credit = MaxLinkCredit, cfg = #cfg{container_id = ContainerId,
reader_pid = ReaderPid,
max_link_credit = MaxLinkCredit,
vhost = Vhost, vhost = Vhost,
user = User}}) -> user = User}}) ->
case ensure_target(Target, Vhost, User, PermCache0) of case ensure_target(Target0, LinkNameBin, Vhost, User,
{ok, Exchange, RoutingKey, QNameBin, PermCache} -> ContainerId, ReaderPid, PermCache0) of
{ok, Exchange, RoutingKey, QNameBin, Target, PermCache} ->
SndSettleMode = snd_settle_mode(MaybeSndSettleMode), SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size), MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{ IncomingLink = #incoming_link{
name = LinkName0, name = LinkNameBin,
snd_settle_mode = SndSettleMode, snd_settle_mode = SndSettleMode,
target_address = address(TargetAddress), target_address = address(Target#'v1_0.target'.address),
exchange = Exchange, exchange = Exchange,
routing_key = RoutingKey, routing_key = RoutingKey,
queue_name_bin = QNameBin, queue_name_bin = QNameBin,
dynamic = default(Target#'v1_0.target'.dynamic, false),
max_message_size = MaxMessageSize, max_message_size = MaxMessageSize,
delivery_count = DeliveryCountInt, delivery_count = DeliveryCountInt,
credit = MaxLinkCredit}, credit = MaxLinkCredit},
@ -1325,10 +1356,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end; end;
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName = {utf8, LinkName0}, name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt), handle = Handle = ?UINT(HandleInt),
source = Source = #'v1_0.source'{address = SourceAddress, source = Source0 = #'v1_0.source'{filter = DesiredFilter},
filter = DesiredFilter},
snd_settle_mode = SndSettleMode, snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode, rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize, max_message_size = MaybeMaxMessageSize,
@ -1339,6 +1369,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
topic_permission_cache = TopicPermCache0, topic_permission_cache = TopicPermCache0,
cfg = #cfg{vhost = Vhost, cfg = #cfg{vhost = Vhost,
user = User = #user{username = Username}, user = User = #user{username = Username},
container_id = ContainerId,
reader_pid = ReaderPid}}) -> reader_pid = ReaderPid}}) ->
{SndSettled, EffectiveSndSettleMode} = {SndSettled, EffectiveSndSettleMode} =
case SndSettleMode of case SndSettleMode of
@ -1350,10 +1381,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% client only for durable messages. %% client only for durable messages.
{false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED}
end, end,
case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId,
ReaderPid, PermCache0, TopicPermCache0) of
{error, Reason} -> {error, Reason} ->
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]);
{ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} -> {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} ->
PermCache = check_resource_access(QName, read, User, PermCache1), PermCache = check_resource_access(QName, read, User, PermCache1),
case rabbit_amqqueue:with( case rabbit_amqqueue:with(
QName, QName,
@ -1439,12 +1471,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% Echo back that we will respect the client's requested max-message-size. %% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize, max_message_size = MaybeMaxMessageSize,
offered_capabilities = OfferedCaps}, offered_capabilities = OfferedCaps},
{utf8, SourceAddress} = Source#'v1_0.source'.address,
MaxMessageSize = max_message_size(MaybeMaxMessageSize), MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{ Link = #outgoing_link{
name = LinkName0, name = LinkNameBin,
source_address = address(SourceAddress), source_address = SourceAddress,
queue_name = queue_resource(Vhost, QNameBin), queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType, queue_type = QType,
dynamic = default(Source#'v1_0.source'.dynamic, false),
send_settled = SndSettled, send_settled = SndSettled,
max_message_size = MaxMessageSize, max_message_size = MaxMessageSize,
credit_api_version = CreditApiVsn, credit_api_version = CreditApiVsn,
@ -2614,17 +2648,53 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{Credit, []}. {Credit, []}.
-spec ensure_source(#'v1_0.source'{}, -spec ensure_source(#'v1_0.source'{},
binary(),
rabbit_types:vhost(), rabbit_types:vhost(),
rabbit_types:user(), rabbit_types:user(),
binary(),
rabbit_types:connection(),
permission_cache(), permission_cache(),
topic_permission_cache()) -> topic_permission_cache()) ->
{ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} | {ok,
rabbit_amqqueue:name(),
#'v1_0.source'{},
permission_cache(),
topic_permission_cache()} |
{error, term()}. {error, term()}.
ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) -> ensure_source(#'v1_0.source'{
exit_not_implemented("Dynamic sources not supported"); address = undefined,
ensure_source(#'v1_0.source'{address = Address, dynamic = true,
durable = Durable}, %% We will reply with the actual node properties.
Vhost, User, PermCache, TopicPermCache) -> dynamic_node_properties = _IgnoreDesiredProperties,
capabilities = {array, symbol, Caps}
} = Source0,
LinkName, Vhost, User, ContainerId,
ConnPid, PermCache0, TopicPermCache) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
Source = Source0#'v1_0.source'{
address = {utf8, Address},
%% While Khepri stores queue records durably, the terminus
%% - i.e. the existence of this receiver - is not stored durably.
durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic_node_properties = Props,
distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE,
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
QName = queue_resource(Vhost, QNameBin),
{ok, QName, Source, PermCache, TopicPermCache};
false ->
exit_not_implemented("Dynamic source not supported: ~p", [Source0])
end;
ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic source not supported: ~p", [Source]);
ensure_source(Source = #'v1_0.source'{address = Address,
durable = Durable},
_LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) ->
case Address of case Address of
{utf8, <<"/queues/", QNameBinQuoted/binary>>} -> {utf8, <<"/queues/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is: %% The only possible v2 source address format is:
@ -2633,15 +2703,20 @@ ensure_source(#'v1_0.source'{address = Address,
QNameBin -> QNameBin ->
QName = queue_resource(Vhost, QNameBin), QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName), ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache} {ok, QName, Source, PermCache, TopicPermCache}
catch error:_ -> catch error:_ ->
{error, {bad_address, Address}} {error, {bad_address, Address}}
end; end;
{utf8, SourceAddr} -> {utf8, SourceAddr} ->
case address_v1_permitted() of case address_v1_permitted() of
true -> true ->
ensure_source_v1(SourceAddr, Vhost, User, Durable, case ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache); PermCache, TopicPermCache) of
{ok, QName, PermCache1, TopicPermCache1} ->
{ok, QName, Source, PermCache1, TopicPermCache1};
Err ->
Err
end;
false -> false ->
{error, {amqp_address_v1_not_permitted, Address}} {error, {amqp_address_v1_not_permitted, Address}}
end; end;
@ -2687,42 +2762,71 @@ ensure_source_v1(Address,
Err Err
end. end.
address(undefined) ->
null;
address({utf8, String}) ->
String.
-spec ensure_target(#'v1_0.target'{}, -spec ensure_target(#'v1_0.target'{},
binary(),
rabbit_types:vhost(), rabbit_types:vhost(),
rabbit_types:user(), rabbit_types:user(),
binary(),
rabbit_types:connection(),
permission_cache()) -> permission_cache()) ->
{ok, {ok,
rabbit_types:exchange() | rabbit_exchange:name() | to, rabbit_types:exchange() | rabbit_exchange:name() | to,
rabbit_types:routing_key() | to | subject, rabbit_types:routing_key() | to | subject,
rabbit_misc:resource_name() | undefined, rabbit_misc:resource_name() | undefined,
#'v1_0.target'{},
permission_cache()} | permission_cache()} |
{error, term()}. {error, term()}.
ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> ensure_target(#'v1_0.target'{
exit_not_implemented("Dynamic targets not supported"); address = undefined,
ensure_target(#'v1_0.target'{address = Address, dynamic = true,
durable = Durable}, %% We will reply with the actual node properties.
Vhost, User, PermCache) -> dynamic_node_properties = _IgnoreDesiredProperties,
capabilities = {array, symbol, Caps}
} = Target0,
LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache1} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
{ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1),
Target = #'v1_0.target'{
address = {utf8, Address},
%% While Khepri stores queue records durably,
%% the terminus - i.e. the existence of this producer - is not stored durably.
durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic = true,
dynamic_node_properties = Props,
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
{ok, Exchange, QNameBin, QNameBin, Target, PermCache};
false ->
exit_not_implemented("Dynamic target not supported: ~p", [Target0])
end;
ensure_target(Target = #'v1_0.target'{dynamic = true}, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic target not supported: ~p", [Target]);
ensure_target(Target = #'v1_0.target'{address = Address,
durable = Durable},
_LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache0) ->
case target_address_version(Address) of case target_address_version(Address) of
2 -> 2 ->
case ensure_target_v2(Address, Vhost) of case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} -> {ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache}; {ok, to, RKey, QNameBin, Target, PermCache0};
{ok, XNameBin, RKey, QNameBin} -> {ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache0),
{ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
1 -> 1 ->
case address_v1_permitted() of case address_v1_permitted() of
true -> true ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache1} -> {ok, XNameBin, RKey, QNameBin, PermCache1} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1); {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache1),
{ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
@ -2731,7 +2835,7 @@ ensure_target(#'v1_0.target'{address = Address,
end end
end. end.
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> check_exchange(XNameBin, User, Vhost, PermCache0) ->
XName = exchange_resource(Vhost, XNameBin), XName = exchange_resource(Vhost, XNameBin),
PermCache = check_resource_access(XName, write, User, PermCache0), PermCache = check_resource_access(XName, write, User, PermCache0),
case rabbit_exchange:lookup(XName) of case rabbit_exchange:lookup(XName) of
@ -2745,7 +2849,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
<<"amq.", _/binary>> -> X; <<"amq.", _/binary>> -> X;
_ -> XName _ -> XName
end, end,
{ok, Exchange, RKey, QNameBin, PermCache}; {ok, Exchange, PermCache};
{error, not_found} -> {error, not_found} ->
exit_not_found(XName) exit_not_found(XName)
end. end.
@ -3033,7 +3137,10 @@ credit_reply_timeout(QType, QName) ->
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
default(undefined, Default) -> Default; default(undefined, Default) -> Default;
default(Thing, _Default) -> Thing. default(Thing, _Default) -> Thing.
address(undefined) -> null;
address({utf8, String}) -> String.
snd_settle_mode({ubyte, Val}) -> snd_settle_mode({ubyte, Val}) ->
case Val of case Val of
@ -3247,20 +3354,20 @@ ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durabil
ok = exit_if_absent(exchange, Vhost, XNameList), ok = exit_if_absent(exchange, Vhost, XNameList),
case Type of case Type of
target -> {undefined, PermCache}; target -> {undefined, PermCache};
source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache) source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache)
end; end;
ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) -> ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) ->
%% exchange amq.topic exists %% exchange amq.topic exists
{undefined, PermCache}; {undefined, PermCache};
ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) -> ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) ->
%% exchange amq.topic exists %% exchange amq.topic exists
declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache); declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache);
ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) -> ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) ->
%% Target "/queue" means publish to default exchange with message subject as routing key. %% Target "/queue" means publish to default exchange with message subject as routing key.
%% Default exchange exists. %% Default exchange exists.
{undefined, PermCache}; {undefined, PermCache};
ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) -> ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) ->
declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); declare_queue_v1(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache);
ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) -> ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) ->
%% Target "/amq/queue/" is handled specially due to AMQP legacy: %% Target "/amq/queue/" is handled specially due to AMQP legacy:
%% "Queue names starting with "amq." are reserved for pre-declared and %% "Queue names starting with "amq." are reserved for pre-declared and
@ -3285,22 +3392,39 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) ->
false -> exit_not_found(ResourceName) false -> exit_not_found(ResourceName)
end. end.
generate_queue_name() -> generate_queue_name_v1() ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"). rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen").
%% "The generated name of the address SHOULD include the link name and the
%% container-id of the remote container to allow for ease of identification." [3.5.4]
%% Let's include container-id and link name if they are not very long
%% because the generated address might be sent in every message.
generate_queue_name_dynamic(ContainerId, LinkName)
when byte_size(ContainerId) + byte_size(LinkName) < 150 ->
Prefix = <<"amq.dyn-", ContainerId/binary, "-", LinkName/binary>>,
rabbit_guid:binary(rabbit_guid:gen_secure(), Prefix);
generate_queue_name_dynamic(_, _) ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.dyn.gen").
declare_queue_v1(QNameBin, Vhost, User, TerminusDurability, PermCache0) ->
Durable = queue_is_durable(TerminusDurability),
{ok, PermCache} = declare_queue(QNameBin, Vhost, User, Durable, none, PermCache0),
{QNameBin, PermCache}.
declare_queue(QNameBin, declare_queue(QNameBin,
Vhost, Vhost,
User = #user{username = Username}, User = #user{username = Username},
TerminusDurability, Durable,
QOwner,
PermCache0) -> PermCache0) ->
QName = queue_resource(Vhost, QNameBin), QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0), PermCache = check_resource_access(QName, configure, User, PermCache0),
rabbit_core_metrics:queue_declared(QName), rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName, Q0 = amqqueue:new(QName,
_Pid = none, _Pid = none,
queue_is_durable(TerminusDurability), Durable,
_AutoDelete = false, _AutoDelete = false,
_QOwner = none, QOwner,
_QArgs = [], _QArgs = [],
Vhost, Vhost,
#{user => Username}, #{user => Username},
@ -3320,7 +3444,40 @@ declare_queue(QNameBin,
"Failed to declare ~s: ~p", "Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other]) [rabbit_misc:rs(QName), Other])
end, end,
{QNameBin, PermCache}. {ok, PermCache}.
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) ->
QNameBin = generate_queue_name_dynamic(ContainerId, LinkName),
{ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0),
QNameBinQuoted = uri_string:quote(QNameBin),
Address = <<"/queues/", QNameBinQuoted/binary>>,
Props = {map, [{{symbol, <<"lifetime-policy">>},
{described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}},
{{symbol, <<"supported-dist-modes">>},
{array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]},
{QNameBin, Address, Props, PermCache}.
maybe_delete_dynamic_queue(#incoming_link{dynamic = true,
queue_name_bin = QNameBin},
Cfg = #cfg{vhost = Vhost}) ->
QName = queue_resource(Vhost, QNameBin),
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(#outgoing_link{dynamic = true,
queue_name = QName},
Cfg) ->
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(_, _) ->
ok.
delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) ->
%% No real need to check for 'configure' access again since this queue is owned by
%% this connection and the user had 'configure' access when the queue got declared.
_ = rabbit_amqqueue:with(
QName,
fun(Q) ->
rabbit_queue_type:delete(Q, false, false, Username)
end),
ok.
outcomes(#'v1_0.source'{outcomes = undefined}) -> outcomes(#'v1_0.source'{outcomes = undefined}) ->
{array, symbol, ?OUTCOMES}; {array, symbol, ?OUTCOMES};

View File

@ -202,7 +202,7 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
server_properties(Protocol) -> server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, description), {ok, Product} = application:get_key(rabbit, description),
{ok, Version} = application:get_key(rabbit, vsn), Version = rabbit_misc:version(),
%% Get any configuration-specified server properties %% Get any configuration-specified server properties
{ok, RawConfigServerProps} = application:get_env(rabbit, {ok, RawConfigServerProps} = application:get_env(rabbit,

View File

@ -55,9 +55,12 @@ groups() ->
[ [
%% authz %% authz
attach_source_queue, attach_source_queue,
attach_source_queue_dynamic,
attach_target_exchange, attach_target_exchange,
attach_target_topic_exchange, attach_target_topic_exchange,
attach_target_queue, attach_target_queue,
attach_target_queue_dynamic_exchange_write,
attach_target_queue_dynamic_queue_configure,
target_per_message_exchange, target_per_message_exchange,
target_per_message_internal_exchange, target_per_message_internal_exchange,
target_per_message_topic, target_per_message_topic,
@ -437,6 +440,39 @@ attach_source_queue(Config) ->
end, end,
ok = close_connection_sync(Conn). ok = close_connection_sync(Conn).
attach_source_queue_dynamic(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing configure permission to queue
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
Source = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
AttachArgs = #{name => <<"my link">>,
role => {receiver, Source, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
receive {amqp10_event,
{session, Session,
{ended, Error}}} ->
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, Description}} = Error,
?assertEqual(
match,
re:run(Description,
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
"'test vhost' refused for user 'test user'$">>,
[{capture, none}]))
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
attach_target_exchange(Config) -> attach_target_exchange(Config) ->
XName = <<"amq.fanout">>, XName = <<"amq.fanout">>,
Address1 = rabbitmq_amqp_address:exchange(XName), Address1 = rabbitmq_amqp_address:exchange(XName),
@ -485,6 +521,61 @@ attach_target_queue(Config) ->
end, end,
ok = amqp10_client:close_connection(Conn). ok = amqp10_client:close_connection(Conn).
attach_target_queue_dynamic_exchange_write(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing write permission to default exchange
ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>),
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
AttachArgs = #{name => <<"my link">>,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
ExpectedErr = error_unauthorized(
<<"write access to exchange 'amq.default' ",
"in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
attach_target_queue_dynamic_queue_configure(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing configure permission to queue
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
AttachArgs = #{name => <<"my link">>,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
receive {amqp10_event,
{session, Session,
{ended, Error}}} ->
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, Description}} = Error,
?assertEqual(
match,
re:run(Description,
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
"'test vhost' refused for user 'test user'$">>,
[{capture, none}]))
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
target_per_message_exchange(Config) -> target_per_message_exchange(Config) ->
TargetAddress = null, TargetAddress = null,
To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),

View File

@ -130,6 +130,10 @@ groups() ->
handshake_timeout, handshake_timeout,
credential_expires, credential_expires,
attach_to_exclusive_queue, attach_to_exclusive_queue,
dynamic_target_short_link_name,
dynamic_target_long_link_name,
dynamic_source_rpc,
dynamic_terminus_delete,
modified_classic_queue, modified_classic_queue,
modified_quorum_queue, modified_quorum_queue,
modified_dead_letter_headers_exchange, modified_dead_letter_headers_exchange,
@ -4762,6 +4766,230 @@ attach_to_exclusive_queue(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_channel(Ch).
dynamic_target_short_link_name(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"my-container">>,
notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% "The address of the target MUST NOT be set" [3.5.4]
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
ShortLinkName = <<"my/sender">>,
AttachArgs = #{name => ShortLinkName,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
#'v1_0.attach'{
target = #'v1_0.target'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to contain both container ID and link name since they are short.
?assertMatch(<<"/queues/amq.dyn-my-container-my%2Fsender-", _GUID/binary>>, Addr),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = wait_for_accepted(<<"t1">>),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
ok = amqp10_client:accept_msg(Receiver, Msg),
%% The exclusive queue should be deleted when we close our connection.
?assertMatch([_ExclusiveQueue], rpc(Config, rabbit_amqqueue, list, [])),
ok = close_connection_sync(Connection),
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
ok.
dynamic_target_long_link_name(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"my-container">>,
notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% "The address of the target MUST NOT be set" [3.5.4]
Target = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>]},
LongLinkName = binary:copy(<<"z">>, 200),
AttachArgs = #{name => LongLinkName,
role => {sender, Target},
snd_settle_mode => mixed,
rcv_settle_mode => first},
{ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
#'v1_0.attach'{
target = #'v1_0.target'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to not contain the long link name.
?assertMatch(<<"/queues/amq.dyn.gen-", _GUID/binary>>, Addr),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = wait_for_accepted(<<"t1">>),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
ok = amqp10_client:accept_msg(Receiver, Msg),
flush(accepted),
%% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
%% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Receiver, {detached, Detach}}} ->
?assertMatch(
#'v1_0.detach'{error = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}},
Detach)
after 5000 -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
%% Test the following RPC workflow:
%% RPC client -> queue -> RPC server
%% RPC server -> dynamic queue -> RPC client
dynamic_source_rpc(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"rpc-client">>,
notify_with_performative => true},
{ok, ConnectionClient} = amqp10_client:open_connection(OpnConf),
{ok, SessionClient} = amqp10_client:begin_session_sync(ConnectionClient),
%% "The address of the source MUST NOT be set" [3.5.3]
Source = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>,
role => {receiver, Source, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
{ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs),
RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Address},
dynamic = true}} = Attach,
Address
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% The client doesn't really care what the address looks like.
%% However let's do whitebox testing here and check the address format.
%% We expect the address to contain both container ID and link name since they are short.
?assertMatch(<<"/queues/amq.dyn-rpc-client-rpc-client-receiver", _CarrotAndGUID/binary>>,
RespAddr),
%% Let's use a separate connection for the RPC server.
{_, SessionServer, LinkPair} = RpcServer = init(Config),
ReqQName = atom_to_binary(?FUNCTION_NAME),
ReqAddr = rabbitmq_amqp_address:queue(ReqQName),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, ReqQName, #{}),
{ok, ReceiverServer} = amqp10_client:attach_receiver_link(SessionServer, <<"rpc-server-receiver">>, ReqAddr, unsettled),
{ok, SenderServer} = amqp10_client:attach_sender_link(SessionServer, <<"rpc-server-sender">>, null),
ok = wait_for_credit(SenderServer),
{ok, SenderClient} = amqp10_client:attach_sender_link(SessionClient, <<"rpc-client-sender">>, ReqAddr),
wait_for_credit(SenderClient),
flush(attached),
ok = amqp10_client:send_msg(
SenderClient,
amqp10_msg:set_properties(
#{reply_to => RespAddr},
amqp10_msg:new(<<"t1">>, <<"hello">>))),
ok = wait_for_accepted(<<"t1">>),
{ok, ReqMsg} = amqp10_client:get_msg(ReceiverServer),
ReqBody = amqp10_msg:body_bin(ReqMsg),
RespBody = string:uppercase(ReqBody),
#{reply_to := ReplyTo} = amqp10_msg:properties(ReqMsg),
ok = amqp10_client:send_msg(
SenderServer,
amqp10_msg:set_properties(
#{to => ReplyTo},
amqp10_msg:new(<<"t2">>, RespBody))),
ok = wait_for_accepted(<<"t2">>),
ok = amqp10_client:accept_msg(ReceiverServer, ReqMsg),
{ok, RespMsg} = amqp10_client:get_msg(ReceiverClient),
?assertEqual(<<"HELLO">>, amqp10_msg:body_bin(RespMsg)),
ok = amqp10_client:accept_msg(ReceiverClient, RespMsg),
ok = detach_link_sync(ReceiverServer),
ok = detach_link_sync(SenderClient),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, ReqQName),
ok = detach_link_sync(SenderServer),
ok = close(RpcServer),
ok = close_connection_sync(ConnectionClient).
dynamic_terminus_delete(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session1} = amqp10_client:begin_session_sync(Connection),
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
Terminus = #{address => undefined,
dynamic => true,
capabilities => [<<"temporary-queue">>],
durable => none},
RcvAttachArgs = #{role => {receiver, Terminus, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
SndAttachArgs = #{role => {sender, Terminus},
snd_settle_mode => mixed,
rcv_settle_mode => first},
RcvAttachArgs1 = RcvAttachArgs#{name => <<"receiver 1">>},
RcvAttachArgs2 = RcvAttachArgs#{name => <<"receiver 2">>},
RcvAttachArgs3 = RcvAttachArgs#{name => <<"receiver 3">>},
SndAttachArgs1 = SndAttachArgs#{name => <<"sender 1">>},
SndAttachArgs2 = SndAttachArgs#{name => <<"sender 2">>},
SndAttachArgs3 = SndAttachArgs#{name => <<"sender 3">>},
{ok, _R1} = amqp10_client:attach_link(Session1, RcvAttachArgs1),
{ok, _R2} = amqp10_client:attach_link(Session2, RcvAttachArgs2),
{ok, R3} = amqp10_client:attach_link(Session2, RcvAttachArgs3),
{ok, _S1} = amqp10_client:attach_link(Session1, SndAttachArgs1),
{ok, _S2} = amqp10_client:attach_link(Session2, SndAttachArgs2),
{ok, S3} = amqp10_client:attach_link(Session2, SndAttachArgs3),
[receive {amqp10_event, {link, _LinkRef, attached}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end
|| _ <- lists:seq(1, 6)],
%% We should now have 6 exclusive queues.
?assertEqual(6, rpc(Config, rabbit_amqqueue, count, [])),
%% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
%% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
ok = detach_link_sync(R3),
ok = detach_link_sync(S3),
?assertEqual(4, rpc(Config, rabbit_amqqueue, count, [])),
%% When a session is ended, the sessions's links cease to exist.
ok = end_session_sync(Session2),
eventually(?_assertEqual(2, rpc(Config, rabbit_amqqueue, count, []))),
%% When a connection is closed, the connection's links cease to exist.
ok = close_connection_sync(Connection),
eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))),
ok.
priority_classic_queue(Config) -> priority_classic_queue(Config) ->
QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>}, QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>},
<<"x-max-priority">> => {ulong, 10}}, <<"x-max-priority">> => {ulong, 10}},

View File

@ -14,6 +14,10 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/3]).
-import(amqp_utils, -import(amqp_utils,
[init/1, [init/1,
close/1, close/1,
@ -30,8 +34,15 @@ all() ->
groups() -> groups() ->
[{cluster_size_1, [shuffle], [{cluster_size_1, [shuffle],
[ [
%% CT test case per Java class
jms_connection,
jms_temporary_queue,
%% CT test case per test in Java class JmsTest
message_types_jms_to_jms, message_types_jms_to_jms,
message_types_jms_to_amqp message_types_jms_to_amqp,
temporary_queue_rpc,
temporary_queue_delete
] ]
}]. }].
@ -54,7 +65,9 @@ end_per_suite(Config) ->
init_per_group(cluster_size_1, Config) -> init_per_group(cluster_size_1, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}), Config1 = rabbit_ct_helpers:set_config(
Config,
{rmq_nodename_suffix, Suffix}),
Config2 = rabbit_ct_helpers:merge_app_env( Config2 = rabbit_ct_helpers:merge_app_env(
Config1, Config1,
{rabbit, {rabbit,
@ -82,6 +95,9 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase). rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, [])), 1000, 5),
eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, [])), 1000, 5),
rabbit_ct_helpers:testcase_finished(Config, Testcase). rabbit_ct_helpers:testcase_finished(Config, Testcase).
build_maven_test_project(Config) -> build_maven_test_project(Config) ->
@ -98,67 +114,49 @@ build_maven_test_project(Config) ->
%% Testcases. %% Testcases.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
jms_connection(Config) ->
ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsConnectionTest">>]}], Config).
jms_temporary_queue(Config) ->
ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsTemporaryQueueTest">>]}], Config).
%% Send different message types from JMS client to JMS client. %% Send different message types from JMS client to JMS client.
message_types_jms_to_jms(Config) -> message_types_jms_to_jms(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME), TestName = QName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"quorum">>, Config), ok = declare_queue(QName, <<"quorum">>, Config),
ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config). ok = delete_queue(QName, Config).
%% Send different message types from JMS client to Erlang AMQP 1.0 client. %% Send different message types from JMS client to Erlang AMQP 1.0 client.
message_types_jms_to_amqp(Config) -> message_types_jms_to_amqp(Config) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ok = run_jms_test(TestName, [], Config).
temporary_queue_rpc(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME), TestName = QName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"quorum">>, Config), ok = declare_queue(QName, <<"classic">>, Config),
Address = rabbitmq_amqp_address:queue(QName), ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
%% The JMS client sends messaegs.
ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config),
%% The Erlang AMQP 1.0 client receives messages.
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled),
{ok, Msg1} = amqp10_client:get_msg(Receiver),
?assertEqual(
#'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}},
amqp10_msg:body(Msg1)),
{ok, Msg2} = amqp10_client:get_msg(Receiver),
?assertEqual(
#'v1_0.amqp_value'{
content = {map, [
{{utf8, <<"key1">>}, {utf8, <<"value">>}},
{{utf8, <<"key2">>}, true},
{{utf8, <<"key3">>}, {double, -1.1}},
{{utf8, <<"key4">>}, {long, -1}}
]}},
amqp10_msg:body(Msg2)),
{ok, Msg3} = amqp10_client:get_msg(Receiver),
?assertEqual(
[
#'v1_0.amqp_sequence'{
content = [{utf8, <<"value">>},
true,
{double, -1.1},
{long, -1}]}
],
amqp10_msg:body(Msg3)),
ok = detach_link_sync(Receiver),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
ok = delete_queue(QName, Config). ok = delete_queue(QName, Config).
temporary_queue_delete(Config) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ok = run_jms_test(TestName, [], Config).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% Helpers %% Helpers
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
run_jms_test(TestName, JavaProps, Config) ->
run(TestName, [{"-Dtest=JmsTest#~ts", [TestName]} | JavaProps], Config).
run(TestName, JavaProps, Config) -> run(TestName, JavaProps, Config) ->
TestProjectDir = ?config(data_dir, Config), TestProjectDir = ?config(data_dir, Config),
Cmd = [filename:join([TestProjectDir, "mvnw"]), Cmd = [filename:join([TestProjectDir, "mvnw"]),
"test", "test",
{"-Dtest=JmsTest#~ts", [TestName]}, {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]},
{"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]} {"-Dnodename=~ts", [rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)]},
{"-Drabbitmqctl.bin=~ts", [rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd)]}
] ++ JavaProps, ] ++ JavaProps,
case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of
{ok, _Stdout_} -> {ok, _Stdout_} ->

View File

@ -9,7 +9,9 @@
<url>https://www.rabbitmq.com</url> <url>https://www.rabbitmq.com</url>
<properties> <properties>
<junit.jupiter.version>5.10.2</junit.jupiter.version> <junit.jupiter.version>5.10.2</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version>
<qpid-jms-client.version>2.6.1</qpid-jms-client.version> <qpid-jms-client.version>2.6.1</qpid-jms-client.version>
<amqp-client.version>[0.5.0-SNAPSHOT,)</amqp-client.version>
<logback.version>1.2.13</logback.version> <logback.version>1.2.13</logback.version>
<spotless.version>2.43.0</spotless.version> <spotless.version>2.43.0</spotless.version>
<google-java-format.version>1.25.2</google-java-format.version> <google-java-format.version>1.25.2</google-java-format.version>
@ -30,13 +32,24 @@
<version>${qpid-jms-client.version}</version> <version>${qpid-jms-client.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
<version>${logback.version}</version> <version>${logback.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.rabbitmq.client</groupId>
<artifactId>amqp-client</artifactId>
<version>${amqp-client.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -81,4 +94,16 @@
</plugins> </plugins>
</build> </build>
<repositories>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
</project> </project>

View File

@ -0,0 +1,163 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.UnknownHostException;
final class Cli {
private Cli() {}
static void startBroker() {
rabbitmqctl("start_app");
}
static void stopBroker() {
rabbitmqctl("stop_app");
}
private static ProcessState rabbitmqctl(String command) {
return rabbitmqctl(command, nodename());
}
private static ProcessState rabbitmqctl(String command, String nodename) {
return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
}
private static String rabbitmqctlCommand() {
return System.getProperty("rabbitmqctl.bin");
}
public static String nodename() {
return System.getProperty("nodename", "rabbit@" + hostname());
}
public static String hostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
try {
return executeCommand("hostname").output();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
private static ProcessState executeCommand(String command) {
return executeCommand(command, false);
}
private static ProcessState executeCommand(String command, boolean ignoreError) {
Process pr = executeCommandProcess(command);
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
int ev = waitForExitValue(pr, inputState, errorState);
inputState.pump();
errorState.pump();
if (ev != 0 && !ignoreError) {
throw new RuntimeException(
"unexpected command exit value: "
+ ev
+ "\ncommand: "
+ command
+ "\n"
+ "\nstdout:\n"
+ inputState.buffer.toString()
+ "\nstderr:\n"
+ errorState.buffer.toString()
+ "\n");
}
return new ProcessState(inputState);
}
private static int waitForExitValue(
Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) {
while (true) {
try {
inputState.pump();
errorState.pump();
pr.waitFor();
break;
} catch (InterruptedException ignored) {
}
}
return pr.exitValue();
}
private static Process executeCommandProcess(String command) {
String[] finalCommand;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
finalCommand = new String[4];
finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
finalCommand[1] = "/y";
finalCommand[2] = "/c";
finalCommand[3] = command;
} else {
finalCommand = new String[3];
finalCommand[0] = "/bin/sh";
finalCommand[1] = "-c";
finalCommand[2] = command;
}
try {
return Runtime.getRuntime().exec(finalCommand);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class ProcessState {
private final InputStreamPumpState inputState;
ProcessState(InputStreamPumpState inputState) {
this.inputState = inputState;
}
String output() {
return inputState.buffer.toString();
}
}
private static class InputStreamPumpState {
private final BufferedReader reader;
private final StringBuilder buffer;
private InputStreamPumpState(InputStream in) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.buffer = new StringBuilder();
}
void pump() {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
throw new RuntimeException(e);
}
buffer.append(line).append("\n");
}
}
}
}

View File

@ -0,0 +1,196 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.Cli.startBroker;
import static com.rabbitmq.amqp.tests.jms.Cli.stopBroker;
import static com.rabbitmq.amqp.tests.jms.TestUtils.*;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import jakarta.jms.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Based on
* https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
*/
@JmsTestInfrastructure
public class JmsConnectionTest {
String destination;
@Test
@Timeout(30)
public void testCreateConnection() throws Exception {
try (Connection connection = connection()) {
assertNotNull(connection);
}
}
@Test
@Timeout(30)
public void testCreateConnectionAndStart() throws Exception {
try (Connection connection = connection()) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
// Currently not supported by RabbitMQ.
@Disabled
public void testCreateWithDuplicateClientIdFails() throws Exception {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
JmsConnection connection1 = (JmsConnection) factory.createConnection();
connection1.setClientID("Test");
assertNotNull(connection1);
connection1.start();
JmsConnection connection2 = (JmsConnection) factory.createConnection();
try {
connection2.setClientID("Test");
fail("should have thrown a JMSException");
} catch (InvalidClientIDException ex) {
// OK
} catch (Exception unexpected) {
fail("Wrong exception type thrown: " + unexpected);
}
connection1.close();
connection2.close();
}
@Test
public void testSetClientIdAfterStartedFails() {
assertThrows(
JMSException.class,
() -> {
try (Connection connection = connection()) {
connection.setClientID("Test");
connection.start();
connection.setClientID("NewTest");
}
});
}
@Test
@Timeout(30)
public void testCreateConnectionAsSystemAdmin() throws Exception {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
factory.setUsername(adminUsername());
factory.setPassword(adminPassword());
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
public void testCreateConnectionCallSystemAdmin() throws Exception {
try (Connection connection =
connectionFactory().createConnection(adminUsername(), adminPassword())) {
assertNotNull(connection);
connection.start();
}
}
@Test
@Timeout(30)
public void testCreateConnectionAsUnknwonUser() {
assertThrows(
JMSSecurityException.class,
() -> {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
factory.setUsername("unknown");
factory.setPassword("unknown");
try (Connection connection = factory.createConnection()) {
assertNotNull(connection);
connection.start();
}
});
}
@Test
@Timeout(30)
public void testCreateConnectionCallUnknwonUser() {
assertThrows(
JMSSecurityException.class,
() -> {
try (Connection connection = connectionFactory().createConnection("unknown", "unknown")) {
assertNotNull(connection);
connection.start();
}
});
}
@Test
@Timeout(30)
public void testBrokerStopWontHangConnectionClose() throws Exception {
Connection connection = connection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queue(destination);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
producer.send(m);
try {
stopBroker();
try {
connection.close();
} catch (Exception ex) {
fail("Should not have thrown an exception.");
}
} finally {
startBroker();
}
}
@Test
@Timeout(60)
public void testConnectionExceptionBrokerStop() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (Connection connection = connection()) {
connection.setExceptionListener(exception -> latch.countDown());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
try {
stopBroker();
assertTrue(latch.await(10, TimeUnit.SECONDS));
} finally {
startBroker();
}
}
}
}

View File

@ -0,0 +1,140 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri;
import static com.rabbitmq.amqp.tests.jms.TestUtils.connection;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.fail;
import jakarta.jms.*;
import jakarta.jms.IllegalStateException;
import java.util.UUID;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Based on
* https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
*/
public class JmsTemporaryQueueTest {
Connection connection;
@BeforeEach
void init() throws JMSException {
connection = connection();
}
@AfterEach
void tearDown() throws JMSException {
connection.close();
}
@Test
@Timeout(60)
public void testCreatePublishConsumeTemporaryQueue() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
TemporaryQueue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
String body = UUID.randomUUID().toString();
producer.send(session.createTextMessage(body));
assertEquals(body, consumer.receive(60_000).getBody(String.class));
}
@Test
@Timeout(60)
public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
session.createConsumer(tempQueue);
Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session2.createConsumer(tempQueue);
fail("should not be able to consumer from temporary queue from another connection");
} catch (InvalidDestinationException ide) {
// expected
}
} finally {
connection2.close();
}
}
@Test
@Timeout(60)
public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Message msg = session2.createMessage();
MessageProducer producer = session2.createProducer(tempQueue);
// Close the original connection
connection.close();
try {
producer.send(msg);
fail("should not be able to send to temporary queue from closed connection");
} catch (jakarta.jms.IllegalStateException ide) {
// expected
}
} finally {
connection2.close();
}
}
@Test
@Timeout(60)
public void testCantDeleteTemporaryQueueWithConsumers() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(tempQueue);
try {
tempQueue.delete();
fail("should not be able to delete temporary queue with active consumers");
} catch (IllegalStateException ide) {
// expected
}
consumer.close();
// Now it should be allowed
tempQueue.delete();
}
}

View File

@ -1,13 +1,22 @@
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static org.junit.jupiter.api.Assertions.assertEquals; import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import jakarta.jms.*; import jakarta.jms.*;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import javax.naming.Context; import javax.naming.Context;
import com.rabbitmq.qpid.protonj2.client.Client;
import com.rabbitmq.qpid.protonj2.client.Delivery;
import com.rabbitmq.qpid.protonj2.client.Receiver;
import jakarta.jms.Queue;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@JmsTestInfrastructure
public class JmsTest { public class JmsTest {
private javax.naming.Context getContext() throws Exception{ private javax.naming.Context getContext() throws Exception{
@ -95,20 +104,20 @@ public class JmsTest {
} }
} }
String destination;
@Test @Test
public void message_types_jms_to_amqp() throws Exception { public void message_types_jms_to_amqp() throws Exception {
Context context = getContext(); Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
Queue queue = TestUtils.queue(destination);
String msg1 = "msg1🥕";
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(); Session session = connection.createSession();
Destination queue = (Destination) context.lookup("myQueue");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
// TextMessage // TextMessage
String msg1 = "msg1🥕";
TextMessage textMessage = session.createTextMessage(msg1); TextMessage textMessage = session.createTextMessage(msg1);
producer.send(textMessage); producer.send(textMessage);
@ -128,5 +137,77 @@ public class JmsTest {
streamMessage.writeLong(-1L); streamMessage.writeLong(-1L);
producer.send(streamMessage); producer.send(streamMessage);
} }
try (Client client = protonClient();
com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) {
Receiver receiver = amqpConnection.openReceiver(queue.getQueueName());
Delivery delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
assertEquals(msg1, delivery.message().body());
delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
com.rabbitmq.qpid.protonj2.client.Message<Map<String, Object>> mapMessage = delivery.message();
assertThat(mapMessage.body()).containsEntry("key1", "value")
.containsEntry("key2", true)
.containsEntry("key3", -1.1)
.containsEntry("key4", -1L);
delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
com.rabbitmq.qpid.protonj2.client.Message<List<Object>> listMessage = delivery.message();
assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L);
}
}
// Test that Request/reply pattern using a TemporaryQueue works.
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee
@Test
public void temporary_queue_rpc() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
try (JMSContext clientContext = factory.createContext()) {
Destination responseQueue = clientContext.createTemporaryQueue();
JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue);
Destination requestQueue = (Destination) context.lookup("myQueue");
TextMessage clientRequestMessage = clientContext.createTextMessage("hello");
clientContext.createProducer().
setJMSReplyTo(responseQueue).
send(requestQueue, clientRequestMessage);
// Let's open a new connection to simulate the RPC server.
try (JMSContext serverContext = factory.createContext()) {
JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue);
TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000);
TextMessage serverResponseMessage = serverContext.createTextMessage(
serverRequestMessage.getText().toUpperCase());
serverContext.createProducer().
send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage);
}
TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000);
assertEquals("HELLO", clientResponseMessage.getText());
}
}
// Test that a temporary queue can be deleted.
@Test
public void temporary_queue_delete() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
try (JMSContext clientContext = factory.createContext()) {
TemporaryQueue queue = clientContext.createTemporaryQueue();
queue.delete();
try {
clientContext.createProducer().send(queue, "hello");
fail("should not be able to create producer for deleted temporary queue");
} catch (IllegalStateRuntimeException expectedException) {
assertEquals("Temporary destination has been deleted", expectedException.getMessage());
}
}
} }
} }

View File

@ -0,0 +1,26 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import java.lang.annotation.*;
import org.junit.jupiter.api.extension.ExtendWith;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ExtendWith(JmsTestInfrastructureExtension.class)
public @interface JmsTestInfrastructure {}

View File

@ -0,0 +1,83 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
import java.lang.reflect.Field;
import org.junit.jupiter.api.extension.*;
final class JmsTestInfrastructureExtension
implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
private static final ExtensionContext.Namespace NAMESPACE =
ExtensionContext.Namespace.create(JmsTestInfrastructureExtension.class);
private static ExtensionContext.Store store(ExtensionContext extensionContext) {
return extensionContext.getRoot().getStore(NAMESPACE);
}
private static Field field(Class<?> cls, String name) {
Field field = null;
while (field == null && cls != null) {
try {
field = cls.getDeclaredField(name);
} catch (NoSuchFieldException e) {
cls = cls.getSuperclass();
}
}
return field;
}
@Override
public void beforeAll(ExtensionContext context) {
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
Field field = field(context.getTestInstance().get().getClass(), "destination");
if (field != null) {
field.setAccessible(true);
String destination = TestUtils.name(context);
field.set(context.getTestInstance().get(), destination);
try (Environment environment = new AmqpEnvironmentBuilder().build();
Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) {
connection.management().queue(destination).declare();
}
}
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
Field field = field(context.getTestInstance().get().getClass(), "destination");
if (field != null) {
field.setAccessible(true);
String destination = (String) field.get(context.getTestInstance().get());
try (Environment environment = new AmqpEnvironmentBuilder().build();
Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) {
connection.management().queueDelete(destination);
}
}
}
@Override
public void afterAll(ExtensionContext context) {
}
}

View File

@ -0,0 +1,119 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms;
import static java.lang.String.format;
import com.rabbitmq.qpid.protonj2.client.Client;
import com.rabbitmq.qpid.protonj2.client.ConnectionOptions;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Queue;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtensionContext;
final class TestUtils {
private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672";
private TestUtils() {}
static String brokerUri() {
String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672");
return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri;
}
static String brokerHost() {
try {
URI uri = new URI(brokerUri());
return uri.getHost();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
static int brokerPort() {
try {
URI uri = new URI(brokerUri());
return uri.getPort();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
static String adminUsername() {
return "guest";
}
static String adminPassword() {
return "guest";
}
static ConnectionFactory connectionFactory() {
return new JmsConnectionFactory(brokerUri());
}
static Connection connection() throws JMSException {
return connectionFactory().createConnection();
}
static Queue queue(String name) {
// no path encoding, use names with e.g. ASCII characters only
return new JmsQueue("/queues/" + name);
}
static Client protonClient() {
return Client.create();
}
static com.rabbitmq.qpid.protonj2.client.Connection protonConnection(Client client) {
ConnectionOptions connectionOptions = new ConnectionOptions().virtualHost("vhost:/");
connectionOptions.saslOptions().addAllowedMechanism("ANONYMOUS");
try {
return client.connect(brokerHost(), brokerPort(), connectionOptions);
} catch (ClientException e) {
throw new RuntimeException(e);
}
}
static String name(TestInfo info) {
return name(info.getTestClass().get(), info.getTestMethod().get());
}
static String name(ExtensionContext context) {
return name(context.getTestInstance().get().getClass(), context.getTestMethod().get());
}
private static String name(Class<?> testClass, Method testMethod) {
return name(testClass, testMethod.getName());
}
private static String name(Class<?> testClass, String testMethod) {
String uuid = UUID.randomUUID().toString();
return format(
"%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2));
}
}

View File

@ -1565,13 +1565,13 @@ format(Config) ->
case length(Nodes) of case length(Nodes) of
3 -> 3 ->
[_, Server2, Server3] = Nodes, [_, Server2, Server3] = Nodes,
ok = rabbit_control_helper:command(stop_app, Server2),
ok = rabbit_control_helper:command(stop_app, Server3), ok = rabbit_control_helper:command(stop_app, Server3),
ok = rabbit_control_helper:command(stop_app, Server2),
Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue, Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue,
?FUNCTION_NAME, [QRecord, #{}]), ?FUNCTION_NAME, [QRecord, #{}]),
ok = rabbit_control_helper:command(start_app, Server2),
ok = rabbit_control_helper:command(start_app, Server3), ok = rabbit_control_helper:command(start_app, Server3),
ok = rabbit_control_helper:command(start_app, Server2),
?assertEqual(stream, proplists:get_value(type, Fmt2)), ?assertEqual(stream, proplists:get_value(type, Fmt2)),
?assertEqual(minority, proplists:get_value(state, Fmt2)), ?assertEqual(minority, proplists:get_value(state, Fmt2)),
?assertEqual(Server, proplists:get_value(leader, Fmt2)), ?assertEqual(Server, proplists:get_value(leader, Fmt2)),

View File

@ -19,6 +19,8 @@ endef
DEPS = rabbit_common rabbit DEPS = rabbit_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers
PLT_APPS += rabbitmqctl
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

View File

@ -0,0 +1,83 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand').
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
-export([
usage/0,
usage_additional/0,
usage_doc_guides/0,
flags/0,
validate/2,
merge_defaults/2,
banner/2,
run/2,
switches/0,
aliases/0,
output/2,
scopes/0,
formatter/0,
help_section/0,
description/0
]).
%%----------------------------------------------------------------------------
%% Callbacks
%%----------------------------------------------------------------------------
scopes() ->
[vmware, ctl].
switches() ->
[].
usage() ->
<<"clear_auth_backend_cache">>.
usage_additional() ->
[].
usage_doc_guides() ->
[].
help_section() ->
{plugin, rabbitmq_auth_backend_cache}.
description() ->
<<"Clears rabbitmq_auth_backend_cache plugin's cache on the target node">>.
flags() ->
[].
validate(_, _) ->
ok.
formatter() ->
'Elixir.RabbitMQ.CLI.Formatters.Table'.
merge_defaults(A, O) ->
{A, O}.
banner(_, _) ->
<<"Will clear rabbitmq_auth_backend_cache plugin's cache on the target node...">>.
run(_Args, #{node := Node}) ->
case rabbit_misc:rpc_call(Node, rabbit_auth_backend_cache, clear_cache_cluster_wide, []) of
{badrpc, _} = Error ->
Error;
Deleted ->
Deleted
end.
aliases() ->
[].
output(Value, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Value).

View File

@ -96,6 +96,7 @@ var ALL_COLUMNS =
['mode', 'Mode', true], ['mode', 'Mode', true],
['state', 'State', true]], ['state', 'State', true]],
'Details': [['msgs-unconfirmed', 'Unconfirmed', true], 'Details': [['msgs-unconfirmed', 'Unconfirmed', true],
['consumer-count', 'Consumer count', false],
['prefetch', 'Prefetch', true], ['prefetch', 'Prefetch', true],
['msgs-unacked', 'Unacked', true]], ['msgs-unacked', 'Unacked', true]],
'Transactions': [['msgs-uncommitted', 'Msgs uncommitted', false], 'Transactions': [['msgs-uncommitted', 'Msgs uncommitted', false],

View File

@ -35,6 +35,9 @@
<% if (show_column('channels', 'msgs-unconfirmed')) { %> <% if (show_column('channels', 'msgs-unconfirmed')) { %>
<th><%= fmt_sort('Unconfirmed', 'messages_unconfirmed') %></th> <th><%= fmt_sort('Unconfirmed', 'messages_unconfirmed') %></th>
<% } %> <% } %>
<% if (show_column('channels', 'consumer-count')) { %>
<th><%= fmt_sort('Consumer count', 'consumer_count') %></th>
<% } %>
<% if (show_column('channels', 'prefetch')) { %> <% if (show_column('channels', 'prefetch')) { %>
<th>Prefetch <span class="help" id="channel-prefetch"></span></th> <th>Prefetch <span class="help" id="channel-prefetch"></span></th>
<% } %> <% } %>
@ -85,6 +88,9 @@
<% if (show_column('channels', 'msgs-unconfirmed')) { %> <% if (show_column('channels', 'msgs-unconfirmed')) { %>
<th>Unconfirmed</th> <th>Unconfirmed</th>
<% } %> <% } %>
<% if (show_column('channels', 'consumer-count')) { %>
<th>Consumer count</th>
<% } %>
<% if (show_column('channels', 'prefetch')) { %> <% if (show_column('channels', 'prefetch')) { %>
<th>Prefetch <span class="help" id="channel-prefetch"></span></th> <th>Prefetch <span class="help" id="channel-prefetch"></span></th>
<% } %> <% } %>
@ -152,6 +158,9 @@
<% if (show_column('channels', 'msgs-unconfirmed')) { %> <% if (show_column('channels', 'msgs-unconfirmed')) { %>
<td class="c"><%= channel.messages_unconfirmed %></td> <td class="c"><%= channel.messages_unconfirmed %></td>
<% } %> <% } %>
<% if (show_column('channels', 'consumer-count')) { %>
<td class="c"><%= channel.consumer_count %></td>
<% } %>
<% if (show_column('channels', 'prefetch')) { %> <% if (show_column('channels', 'prefetch')) { %>
<td class="c"> <td class="c">
<% if (channel.prefetch_count != 0) { %> <% if (channel.prefetch_count != 0) { %>

View File

@ -70,9 +70,9 @@ sub_groups() ->
]}, ]},
{ssl_user_with_invalid_client_id_in_cert_san_dns, [], {ssl_user_with_invalid_client_id_in_cert_san_dns, [],
[invalid_client_id_from_cert_san_dns [invalid_client_id_from_cert_san_dns
]}, ]},
{ssl_user_with_client_id_in_cert_san_dns, [], {ssl_user_with_client_id_in_cert_san_dns, [],
[client_id_from_cert_san_dns [client_id_from_cert_san_dns
]}, ]},
{ssl_user_with_client_id_in_cert_san_dns_1, [], {ssl_user_with_client_id_in_cert_san_dns_1, [],
[client_id_from_cert_san_dns_1 [client_id_from_cert_san_dns_1
@ -209,8 +209,8 @@ mqtt_config(no_ssl_user) ->
mqtt_config(client_id_propagation) -> mqtt_config(client_id_propagation) ->
{rabbitmq_mqtt, [{ssl_cert_login, true}, {rabbitmq_mqtt, [{ssl_cert_login, true},
{allow_anonymous, true}]}; {allow_anonymous, true}]};
mqtt_config(T) when T == ssl_user_with_invalid_client_id_in_cert_san_dns; mqtt_config(T) when T == ssl_user_with_client_id_in_cert_san_dns;
ssl_user_with_client_id_in_cert_san_dns -> T == ssl_user_with_invalid_client_id_in_cert_san_dns ->
{rabbitmq_mqtt, [{ssl_cert_login, true}, {rabbitmq_mqtt, [{ssl_cert_login, true},
{allow_anonymous, false}, {allow_anonymous, false},
{ssl_cert_client_id_from, subject_alternative_name}, {ssl_cert_client_id_from, subject_alternative_name},
@ -591,8 +591,8 @@ client_id_from_cert_dn(Config) ->
invalid_client_id_from_cert_san_dns(Config) -> invalid_client_id_from_cert_san_dns(Config) ->
MqttClientId = <<"other_client_id">>, MqttClientId = <<"other_client_id">>,
{ok, C} = connect_ssl(MqttClientId, Config), {ok, C} = connect_ssl(MqttClientId, Config),
{error, {client_identifier_not_valid, _}} = emqtt:connect(C), unlink(C),
unlink(C). {error, {client_identifier_not_valid, _}} = emqtt:connect(C).
ssl_user_vhost_parameter_mapping_success(Config) -> ssl_user_vhost_parameter_mapping_success(Config) ->
expect_successful_connection(fun connect_ssl/1, Config). expect_successful_connection(fun connect_ssl/1, Config).

View File

@ -1665,7 +1665,8 @@ will_delay_node_restart(Config) ->
{ok, _, [0]} = emqtt:subscribe(Sub0a, Topic), {ok, _, [0]} = emqtt:subscribe(Sub0a, Topic),
Sub1 = connect(<<"sub1">>, Config, 1, []), Sub1 = connect(<<"sub1">>, Config, 1, []),
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic), {ok, _, [0]} = emqtt:subscribe(Sub1, Topic),
WillDelaySecs = 10, %% In mixed version mode with Khepri, draining the node can take 30 seconds.
WillDelaySecs = 40,
C0a = connect(<<"will">>, Config, 0, C0a = connect(<<"will">>, Config, 0,
[{properties, #{'Session-Expiry-Interval' => 900}}, [{properties, #{'Session-Expiry-Interval' => 900}},
{will_props, #{'Will-Delay-Interval' => WillDelaySecs}}, {will_props, #{'Will-Delay-Interval' => WillDelaySecs}},

View File

@ -304,22 +304,25 @@ collect_mf('detailed', Callback) ->
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback), collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback),
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback), collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards) %% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
emit_identity_info(Callback), emit_identity_info(<<"detailed">>, Callback),
ok; ok;
collect_mf('per-object', Callback) -> collect_mf('per-object', Callback) ->
collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback), totals(Callback),
emit_identity_info(Callback), emit_identity_info(<<"per-object">>, Callback),
ok; ok;
collect_mf('memory-breakdown', Callback) -> collect_mf('memory-breakdown', Callback) ->
collect(false, ?METRIC_NAME_PREFIX, false, ?METRICS_MEMORY_BREAKDOWN, Callback), collect(false, ?METRIC_NAME_PREFIX, false, ?METRICS_MEMORY_BREAKDOWN, Callback),
emit_identity_info(Callback), emit_identity_info(<<"memory-breakdown">>, Callback),
ok; ok;
collect_mf(_Registry, Callback) -> collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false), PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback), totals(Callback),
emit_identity_info(Callback), case PerObjectMetrics of
true -> emit_identity_info(<<"per-object">>, Callback);
false -> emit_identity_info(<<"aggregated">>, Callback)
end,
ok. ok.
collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) -> collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
@ -336,9 +339,9 @@ totals(Callback) ->
end || {Table, Name, Type, Help} <- ?TOTALS], end || {Table, Name, Type, Help} <- ?TOTALS],
ok. ok.
emit_identity_info(Callback) -> emit_identity_info(Endpoint, Callback) ->
add_metric_family(build_info(), Callback), add_metric_family(build_info(), Callback),
add_metric_family(identity_info(), Callback), add_metric_family(identity_info(Endpoint), Callback),
ok. ok.
%% Aggregated `auth``_attempt_detailed_metrics` and %% Aggregated `auth``_attempt_detailed_metrics` and
@ -387,7 +390,7 @@ build_info() ->
}] }]
}. }.
identity_info() -> identity_info(Endpoint) ->
{ {
identity_info, identity_info,
untyped, untyped,
@ -396,7 +399,8 @@ identity_info() ->
[ [
{rabbitmq_node, node()}, {rabbitmq_node, node()},
{rabbitmq_cluster, rabbit_nodes:cluster_name()}, {rabbitmq_cluster, rabbit_nodes:cluster_name()},
{rabbitmq_cluster_permanent_id, rabbit_nodes:persistent_cluster_id()} {rabbitmq_cluster_permanent_id, rabbit_nodes:persistent_cluster_id()},
{rabbitmq_endpoint, Endpoint}
], ],
1 1
}] }]

View File

@ -84,10 +84,6 @@ run([Name], #{node := Node, vhost := VHost}) ->
{badrpc, _} = Error -> {badrpc, _} = Error ->
Error; Error;
{error, not_found} -> {error, not_found} ->
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
"on the target node '~ts' and/or virtual host '~ts'. "
"It may be failing to connect and report its state, will delete its runtime parameter...",
[Node, VHost]),
try_force_removing(HostingNode, VHost, Name, ActingUser), try_force_removing(HostingNode, VHost, Name, ActingUser),
{error, rabbit_data_coercion:to_binary(ErrMsg)}; {error, rabbit_data_coercion:to_binary(ErrMsg)};
ok -> ok ->
@ -117,4 +113,4 @@ try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser) ->
_ = rabbit_misc:rpc_call(Node, rabbit_runtime_parameters, clear, [VHost, <<"shovel">>, ShovelName, ActingUser]). _ = rabbit_misc:rpc_call(Node, rabbit_runtime_parameters, clear, [VHost, <<"shovel">>, ShovelName, ActingUser]).
try_stopping_child_process(Node, VHost, ShovelName) -> try_stopping_child_process(Node, VHost, ShovelName) ->
_ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_and_delete_child, [{VHost, ShovelName}]). _ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_child, [{VHost, ShovelName}]).

View File

@ -245,6 +245,31 @@ This section is incomplete and will be expanded as 4.0 approaches its release ca
### Core Server ### Core Server
#### Bug Fixes
* A whole category of issues with binding inconsistency are addressed with the stabilization
of [Khepri](https://github.com/rabbitmq/khepri), a new [metadata store](https://www.rabbitmq.com/docs/metadata-store) that uses a tree of nested objects instead of multiple tables.
With Mnesia, the original metadata store, bindings are stored in two tables, one for durable
bindings (between durable exchanges and durable queues or streams) and another for semi-durable
and transient ones (where either the queue is transient or both the queue and the exchange are).
When a node was stopped or failed, all non-replicated transient queues on that node were deleted
by the remaining cluster peers. Due to high lock contention around these tables with Mnesia, this
could take a while. In the case where the restarted (or failed) node came online before all bindings
were removed, and/or clients could begin to create new bindings concurrently, the bindings table
rows could end up being inconsistent, resulting in obscure "binding not found" errors.
Khepri avoids this problem entirely by only supporting durable entities and using a very different
[tree-based data model](https://github.com/rabbitmq/rabbitmq-server/pull/11225) that makes bindings removal much more efficient and lock contention-free.
Mnesia users can work around this problem by using [quorum queues](https://www.rabbitmq.com/docs/quorum-queues) or durable classic queues
and durable exchanges. Their durable bindings will not be removed when a node stops.
Queues that are transient in nature can be declared as durable classic ones with a [TTL](https://www.rabbitmq.com/docs/ttl) of a few hours.
GitHub issues (discussions): [#11952](https://github.com/rabbitmq/rabbitmq-server/discussions/11952), [#13030](https://github.com/rabbitmq/rabbitmq-server/discussions/13030), [#12927](https://github.com/rabbitmq/rabbitmq-server/discussions/12927), [#12783](https://github.com/rabbitmq/rabbitmq-server/discussions/12783)
#### Enhancements #### Enhancements
* Efficient sub-linear quorum queue recovery on node startup using checkpoints. * Efficient sub-linear quorum queue recovery on node startup using checkpoints.

View File

@ -124,13 +124,6 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas
GitHub issue: [#12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) GitHub issue: [#12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599)
* Nodes will now fall back to system CA certificate list (if available) when no CA certificate
is explicitly configured.
Contributed by @LoisSotoLopez.
GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564)
* AMQP 1.0 filters now have capped complexity: filtering on more than 16 properties * AMQP 1.0 filters now have capped complexity: filtering on more than 16 properties
won't be possible. This is a protection mechanism recommended in the AMQP 1.0 spec. won't be possible. This is a protection mechanism recommended in the AMQP 1.0 spec.
@ -145,6 +138,19 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas
GitHub issue: [#12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) GitHub issue: [#12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559)
* Support field `dynamic` of AMQP 1.0 [source](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source) and [target](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target).
This allows AMQP clients to dynamically create [exclusive queues](https://www.rabbitmq.com/docs/queues#exclusive-queues), which can be useful for RPC workloads.
GitHub issue: [#13231](https://github.com/rabbitmq/rabbitmq-server/pull/13231)
* Nodes will now fall back to system CA certificate list (if available) when no CA certificate
is explicitly configured.
Contributed by @LoisSotoLopez.
GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564)
* Peer discovery resilience improvements. * Peer discovery resilience improvements.
GitHub issues: [#12801](https://github.com/rabbitmq/rabbitmq-server/pull/12801), [#12809](https://github.com/rabbitmq/rabbitmq-server/pull/12809) GitHub issues: [#12801](https://github.com/rabbitmq/rabbitmq-server/pull/12801), [#12809](https://github.com/rabbitmq/rabbitmq-server/pull/12809)

View File

@ -12,7 +12,7 @@
"author": "", "author": "",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"chromedriver": "^130.0.4", "chromedriver": "^132.0",
"ejs": "^3.1.8", "ejs": "^3.1.8",
"express": "^4.18.2", "express": "^4.18.2",
"geckodriver": "^3.0.2", "geckodriver": "^3.0.2",

View File

@ -45,6 +45,7 @@ module.exports = class BasePage {
return this.selectOption(SELECT_REFRESH, option) return this.selectOption(SELECT_REFRESH, option)
} }
async waitForOverviewTab() { async waitForOverviewTab() {
await this.driver.sleep(250)
return this.waitForDisplayed(OVERVIEW_TAB) return this.waitForDisplayed(OVERVIEW_TAB)
} }
@ -56,6 +57,7 @@ module.exports = class BasePage {
return this.click(CONNECTIONS_TAB) return this.click(CONNECTIONS_TAB)
} }
async waitForConnectionsTab() { async waitForConnectionsTab() {
await this.driver.sleep(250)
return this.waitForDisplayed(CONNECTIONS_TAB) return this.waitForDisplayed(CONNECTIONS_TAB)
} }
@ -63,6 +65,7 @@ module.exports = class BasePage {
return this.click(ADMIN_TAB) return this.click(ADMIN_TAB)
} }
async waitForAdminTab() { async waitForAdminTab() {
await this.driver.sleep(250)
return this.waitForDisplayed(ADMIN_TAB) return this.waitForDisplayed(ADMIN_TAB)
} }
@ -70,6 +73,7 @@ module.exports = class BasePage {
return this.click(CHANNELS_TAB) return this.click(CHANNELS_TAB)
} }
async waitForChannelsTab() { async waitForChannelsTab() {
await this.driver.sleep(250)
return this.waitForDisplayed(CHANNELS_TAB) return this.waitForDisplayed(CHANNELS_TAB)
} }
@ -77,6 +81,7 @@ module.exports = class BasePage {
return this.click(EXCHANGES_TAB) return this.click(EXCHANGES_TAB)
} }
async waitForExchangesTab() { async waitForExchangesTab() {
await this.driver.sleep(250)
return this.waitForDisplayed(EXCHANGES_TAB) return this.waitForDisplayed(EXCHANGES_TAB)
} }
@ -180,42 +185,69 @@ module.exports = class BasePage {
} }
async waitForLocated (locator) { async waitForLocated (locator) {
try { let attempts = 3
return this.driver.wait(until.elementLocated(locator), this.timeout, let retry = false
'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] seconds locating ' + locator, let rethrowError = null
this.polling) do {
}catch(error) { try {
if (!error.name.includes("NoSuchSessionError")) { return this.driver.wait(until.elementLocated(locator), this.timeout,
console.error("Failed waitForLocated " + locator + " due to " + error) 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] seconds locating ' + locator,
} this.polling)
throw error }catch(error) {
} if (error.name.includes("StaleElementReferenceError")) {
retry = true
}else if (!error.name.includes("NoSuchSessionError")) {
console.error("Failed waitForLocated " + locator + " due to " + error)
retry = false
}
rethrowError = error
}
} while (retry && --attempts > 0)
throw rethrowError
} }
async waitForVisible (element) { async waitForVisible (element) {
try { let attempts = 3
return this.driver.wait(until.elementIsVisible(element), this.timeout, let retry = false
'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] awaiting till visible ' + element, let rethrowError = null
this.polling) do {
}catch(error) { try {
if (!error.name.includes("NoSuchSessionError")) { return this.driver.wait(until.elementIsVisible(element), this.timeout,
console.error("Failed to find visible element " + element + " due to " + error) 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] awaiting till visible ' + element,
this.polling)
}catch(error) {
if (error.name.includes("StaleElementReferenceError")) {
retry = true
}else if (!error.name.includes("NoSuchSessionError")) {
console.error("Failed to find visible element " + element + " due to " + error)
retry = false
}
rethrowError = error
} }
throw error } while (retry && --attempts > 0)
} throw rethrowError
} }
async waitForDisplayed (locator) { async waitForDisplayed (locator) {
if (this.interactionDelay && this.interactionDelay > 0) await this.driver.sleep(this.interactionDelay) let attempts = 3
try { let retry = false
return this.waitForVisible(await this.waitForLocated(locator)) let rethrowError = null
}catch(error) { do {
if (!error.name.includes("NoSuchSessionError")) { if (this.interactionDelay && this.interactionDelay > 0) await this.driver.sleep(this.interactionDelay)
console.error("Failed to waitForDisplayed " + locator + " due to " + error) try {
} return this.waitForVisible(await this.waitForLocated(locator))
throw error }catch(error) {
} if (error.name.includes("StaleElementReferenceError")) {
retry = true
}else if (!error.name.includes("NoSuchSessionError")) {
retry = false
console.error("Failed to waitForDisplayed " + locator + " due to " + error)
}
rethrowError = error
}
} while (retry && --attempts > 0 )
throw rethrowError
} }
async getText (locator) { async getText (locator) {