Merge pull request #12670 from rabbitmq/amqp-connection-sessions

Show session and link details for AMQP 1.0 connection
This commit is contained in:
Michael Klishin 2024-11-07 14:39:47 -05:00 committed by GitHub
commit bcfca2be98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 556 additions and 40 deletions

View File

@ -967,7 +967,8 @@ silent_close_delay() ->
-spec info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, InfoItems) ->
case InfoItems -- ?INFO_ITEMS of
KnownItems = [session_pids | ?INFO_ITEMS],
case InfoItems -- KnownItems of
[] ->
case gen_server:call(Pid, {info, InfoItems}, infinity) of
{ok, InfoList} ->
@ -1065,6 +1066,8 @@ i(client_properties, #v1{connection = #v1_connection{properties = Props}}) ->
end;
i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(session_pids, #v1{tracked_channels = Map}) ->
maps:values(Map);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->

View File

@ -92,7 +92,8 @@
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4,
reset_authz/2
reset_authz/2,
info/1
]).
-export([init/1,
@ -148,7 +149,9 @@
}).
-record(incoming_link, {
name :: binary(),
snd_settle_mode :: snd_settle_mode(),
target_address :: null | binary(),
%% The exchange is either defined in the ATTACH frame and static for
%% the life time of the link or dynamically provided in each message's
%% "to" field (address v2).
@ -197,6 +200,8 @@
}).
-record(outgoing_link, {
name :: binary(),
source_address :: binary(),
%% Although the source address of a link might be an exchange name and binding key
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(),
@ -490,6 +495,8 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).
handle_call(infos, _From, State) ->
reply(infos(State), State);
handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
@ -1262,11 +1269,11 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State);
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source,
snd_settle_mode = MaybeSndSettleMode,
target = Target,
target = Target = #'v1_0.target'{address = TargetAddress},
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
},
State0 = #state{incoming_links = IncomingLinks0,
@ -1279,7 +1286,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
name = LinkName0,
snd_settle_mode = SndSettleMode,
target_address = address(TargetAddress),
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
@ -1316,9 +1325,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end;
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source = #'v1_0.source'{filter = DesiredFilter},
source = Source = #'v1_0.source'{address = SourceAddress,
filter = DesiredFilter},
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
@ -1431,6 +1441,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
offered_capabilities = OfferedCaps},
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
name = LinkName0,
source_address = address(SourceAddress),
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
send_settled = SndSettled,
@ -2672,6 +2684,11 @@ ensure_source_v1(Address,
Err
end.
address(undefined) ->
null;
address({utf8, String}) ->
String.
-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
@ -3702,6 +3719,118 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).
-spec info(pid()) ->
{ok, rabbit_types:infos()} | {error, term()}.
info(Pid) ->
try gen_server:call(Pid, infos) of
Infos ->
{ok, Infos}
catch _:Reason ->
{error, Reason}
end.
infos(#state{cfg = #cfg{channel_num = ChannelNum,
max_handle = MaxHandle},
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow,
next_outgoing_id = NextOutgoingId,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
outgoing_unsettled_map = OutgoingUnsettledMap,
incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
incoming_management_links = IncomingManagementLinks,
outgoing_management_links = OutgoingManagementLinks
}) ->
[
{channel_number, ChannelNum},
{handle_max, MaxHandle},
{next_incoming_id, NextIncomingId},
{incoming_window, IncomingWindow},
{next_outgoing_id, NextOutgoingId},
{remote_incoming_window, RemoteIncomingWindow},
{remote_outgoing_window, RemoteOutgoingWindow},
{outgoing_unsettled_deliveries, maps:size(OutgoingUnsettledMap)},
{incoming_links,
info_incoming_management_links(IncomingManagementLinks) ++
info_incoming_links(IncomingLinks)},
{outgoing_links,
info_outgoing_management_links(OutgoingManagementLinks) ++
info_outgoing_links(OutgoingLinks)}
].
info_incoming_management_links(Links) ->
[info_incoming_link(Handle, Name, settled, ?MANAGEMENT_NODE_ADDRESS,
MaxMessageSize, DeliveryCount, Credit, 0)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].
info_incoming_links(Links) ->
[info_incoming_link(Handle, Name, SndSettleMode, TargetAddress, MaxMessageSize,
DeliveryCount, Credit, maps:size(IncomingUnconfirmedMap))
|| Handle := #incoming_link{
name = Name,
snd_settle_mode = SndSettleMode,
target_address = TargetAddress,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit,
incoming_unconfirmed_map = IncomingUnconfirmedMap} <- Links].
info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress,
MaxMessageSize, DeliveryCount, Credit, UnconfirmedMessages) ->
[{handle, Handle},
{link_name, LinkName},
{snd_settle_mode, SndSettleMode},
{target_address, TargetAddress},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit},
{unconfirmed_messages, UnconfirmedMessages}].
info_outgoing_management_links(Links) ->
[info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>,
true, MaxMessageSize, DeliveryCount, Credit)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].
info_outgoing_links(Links) ->
[begin
{DeliveryCount, Credit} = case ClientFlowCtl of
#client_flow_ctl{delivery_count = DC,
credit = C} ->
{DC, C};
credit_api_v1 ->
{'', ''}
end,
info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name,
SendSettled, MaxMessageSize, DeliveryCount, Credit)
end
|| Handle := #outgoing_link{
name = Name,
source_address = SourceAddress,
queue_name = QueueName,
max_message_size = MaxMessageSize,
send_settled = SendSettled,
client_flow_ctl = ClientFlowCtl} <- Links].
info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
MaxMessageSize, DeliveryCount, Credit) ->
[{handle, Handle},
{link_name, LinkName},
{source_address, SourceAddress},
{queue_name, QueueNameBin},
{send_settled, SendSettled},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit}].
unwrap_simple_type(V = {list, _}) ->
V;

View File

@ -35,7 +35,7 @@
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
lookup/1, count/0]).
lookup/1, lookup/2, count/0]).
-export([count_local_tracked_items_in_vhost/1,
count_local_tracked_items_of_user/1]).
@ -233,8 +233,8 @@ lookup(Name, [Node | Nodes]) when Node == node() ->
end;
lookup(Name, [Node | Nodes]) ->
case rabbit_misc:rpc_call(Node, ?MODULE, lookup, [Name, [Node]]) of
[] -> lookup(Name, Nodes);
[Row] -> Row
not_found -> lookup(Name, Nodes);
Row = #tracked_connection{} -> Row
end.
lookup_internal(Name, Node) ->

View File

@ -22,7 +22,7 @@ define PROJECT_APP_EXTRA_KEYS
endef
DEPS = rabbit_common rabbit amqp_client cowboy cowlib rabbitmq_web_dispatch rabbitmq_management_agent oauth2_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper amqp10_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper rabbitmq_amqp_client
LOCAL_DEPS += ranch ssl crypto public_key
# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.

View File

@ -48,6 +48,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
@ -182,6 +183,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
@ -361,6 +363,7 @@ def all_srcs(name = "all_srcs"):
"priv/www/js/tmpl/queues.ejs",
"priv/www/js/tmpl/rate-options.ejs",
"priv/www/js/tmpl/registry.ejs",
"priv/www/js/tmpl/sessions-list.ejs",
"priv/www/js/tmpl/status.ejs",
"priv/www/js/tmpl/topic-permissions.ejs",
"priv/www/js/tmpl/user.ejs",
@ -407,6 +410,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",

View File

@ -46,10 +46,21 @@ dispatcher_add(function(sammy) {
});
sammy.get('#/connections/:name', function() {
var name = esc(this.params['name']);
render({'connection': {path: '/connections/' + name,
options: {ranges: ['data-rates-conn']}},
'channels': '/connections/' + name + '/channels'},
'connection', '#/connections');
var connectionPath = '/connections/' + name;
var reqs = {
'connection': {
path: connectionPath,
options: { ranges: ['data-rates-conn'] }
}
};
// First, get the connection details to check the protocol
var connectionDetails = JSON.parse(sync_get(connectionPath));
if (connectionDetails.protocol === 'AMQP 1-0') {
reqs['sessions'] = connectionPath + '/sessions';
} else {
reqs['channels'] = connectionPath + '/channels';
}
render(reqs, 'connection', '#/connections');
});
sammy.del('#/connections', function() {
var options = {headers: {

View File

@ -586,8 +586,34 @@ var HELP = {
</dl> ',
'container-id':
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.'
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.',
'incoming-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the sender/publisher and RabbitMQ is the receiver of messages.',
'outgoing-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the receiver/consumer and RabbitMQ is the sender of messages.',
'target-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target">target</a>.',
'source-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source">source</a>.',
'amqp-source-queue':
'The client receives messages from this queue.',
'amqp-unconfirmed-messages':
'Number of messages that have been sent to queues but have not been confirmed by all queues.',
'snd-settle-mode':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-sender-settle-mode">Sender Settle Mode</a>',
'sender-settles':
'"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.',
'outgoing-unsettled-deliveries':
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.'
};
///////////////////////////////////////////////////////////////////////////

View File

@ -84,6 +84,17 @@
</div>
</div>
<% if (connection.protocol === 'AMQP 1-0') { %>
<div class="section">
<h2 class="updatable" >Sessions (<%=(sessions.length)%>)</h2>
<div class="hider updatable">
<%= format('sessions-list', {'sessions': sessions}) %>
</div>
</div>
<% } else { %>
<div class="section">
<h2 class="updatable" >Channels (<%=(channels.length)%>) </h2>
<div class="hider updatable">
@ -91,6 +102,8 @@
</div>
</div>
<% } %>
<% if (connection.ssl) { %>
<div class="section">
<h2>SSL</h2>

View File

@ -0,0 +1,112 @@
<% if (sessions.length > 0) { %>
<table class="list">
<thead>
<tr>
<th>Channel number</th>
<th>handle-max</th>
<th>next-incoming-id</th>
<th>incoming-window</th>
<th>next-outgoing-id</th>
<th>remote-incoming-window</th>
<th>remote-outgoing-window</th>
<th>Outgoing unsettled deliveries <span class="help" id="outgoing-unsettled-deliveries"></span></th>
</tr>
</thead>
<tbody>
<%
for (var i = 0; i < sessions.length; i++) {
var session = sessions[i];
%>
<tr>
<td class="c"><%= fmt_string(session.channel_number) %></td>
<td class="c"><%= fmt_string(session.handle_max) %></td>
<td class="c"><%= fmt_string(session.next_incoming_id) %></td>
<td class="c"><%= fmt_string(session.incoming_window) %></td>
<td class="c"><%= fmt_string(session.next_outgoing_id) %></td>
<td class="c"><%= fmt_string(session.remote_incoming_window) %></td>
<td class="c"><%= fmt_string(session.remote_outgoing_window) %></td>
<td class="c"><%= fmt_string(session.outgoing_unsettled_deliveries) %></td>
</tr>
<% if (session.incoming_links.length > 0) { %>
<tr>
<td colspan="8">
<p>Incoming Links (<%=(session.incoming_links.length)%>) <span class="help" id="incoming-links"></span></p>
<table class="list">
<thead>
<tr>
<th>Link handle</th>
<th>Link name</th>
<th>Target address <span class="help" id="target-address"></span></th>
<th>snd-settle-mode <span class="help" id="snd-settle-mode"></span></th>
<th>max-message-size (bytes)</th>
<th>delivery-count</th>
<th>link-credit</th>
<th>Unconfirmed messages <span class="help" id="amqp-unconfirmed-messages"></span></th>
</tr>
</thead>
<tbody>
<%
for (var j = 0; j < session.incoming_links.length; j++) {
var in_link = session.incoming_links[j];
%>
<tr>
<td class="c"><%= fmt_string(in_link.handle) %></td>
<td class="c"><%= fmt_string(in_link.link_name) %></td>
<td class="c"><%= fmt_string(in_link.target_address) %></td>
<td class="c"><%= fmt_string(in_link.snd_settle_mode) %></td>
<td class="c"><%= fmt_string(in_link.max_message_size) %></td>
<td class="c"><%= fmt_string(in_link.delivery_count) %></td>
<td class="c"><%= fmt_string(in_link.credit) %></td>
<td class="c"><%= fmt_string(in_link.unconfirmed_messages) %></td>
</tr>
<% } %>
</tbody>
</table>
</td>
</tr>
<% } %>
<% if (session.outgoing_links.length > 0) { %>
<tr>
<td colspan="8">
<p>Outgoing Links (<%=(session.outgoing_links.length)%>) <span class="help" id="outgoing-links"></span></p>
<table class="list">
<thead>
<tr>
<th>Link handle</th>
<th>Link name</th>
<th>Source address <span class="help" id="source-address"></span></th>
<th>Source queue <span class="help" id="amqp-source-queue"></span></th>
<th>Sender settles <span class="help" id="sender-settles"></span></th>
<th>max-message-size (bytes)</th>
<th>delivery-count</th>
<th>link-credit</th>
</tr>
</thead>
<tbody>
<%
for (var k = 0; k < session.outgoing_links.length; k++) {
var out_link = session.outgoing_links[k];
%>
<tr>
<td class="c"><%= fmt_string(out_link.handle) %></td>
<td class="c"><%= fmt_string(out_link.link_name) %></td>
<td class="c"><%= fmt_string(out_link.source_address) %></td>
<td class="c"><%= fmt_string(out_link.queue_name) %></td>
<td class="c"><%= fmt_boolean(out_link.send_settled) %></td>
<td class="c"><%= fmt_string(out_link.max_message_size) %></td>
<td class="c"><%= fmt_string(out_link.delivery_count) %></td>
<td class="c"><%= fmt_string(out_link.credit) %></td>
</tr>
<% } %>
</tbody>
</table>
</td>
</tr>
<% } %>
<% } %>
</tbody>
</table>
<% } else { %>
<p>No sessions</p>
<% } %>

View File

@ -132,6 +132,7 @@ dispatcher() ->
{"/connections/:connection", rabbit_mgmt_wm_connection, []},
{"/connections/username/:username", rabbit_mgmt_wm_connection_user_name, []},
{"/connections/:connection/channels", rabbit_mgmt_wm_connection_channels, []},
{"/connections/:connection/sessions", rabbit_mgmt_wm_connection_sessions, []},
{"/channels", rabbit_mgmt_wm_channels, []},
{"/channels/:channel", rabbit_mgmt_wm_channel, []},
{"/consumers", rabbit_mgmt_wm_consumers, []},

View File

@ -0,0 +1,91 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_mgmt_wm_connection_sessions).
-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([resource_exists/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%%--------------------------------------------------------------------
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
content_types_provided(ReqData, Context) ->
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
resource_exists(ReqData, Context) ->
case conn(ReqData) of
not_found ->
{false, ReqData, Context};
_Conn ->
{true, ReqData, Context}
end.
to_json(ReqData, Context) ->
Conn = conn(ReqData),
case proplists:get_value(protocol, Conn) of
{1, 0} ->
ConnPid = proplists:get_value(pid, Conn),
try rabbit_amqp_reader:info(ConnPid, [session_pids]) of
[{session_pids, Pids}] ->
rabbit_mgmt_util:reply_list(session_infos(Pids),
["channel_number"],
ReqData,
Context)
catch Type:Reason0 ->
Reason = unicode:characters_to_binary(
lists:flatten(
io_lib:format(
"failed to get sessions for connection ~p: ~s ~tp",
[ConnPid, Type, Reason0]))),
rabbit_mgmt_util:internal_server_error(Reason, ReqData, Context)
end;
_ ->
rabbit_mgmt_util:bad_request(<<"connection does not use AMQP 1.0">>,
ReqData,
Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_user(ReqData, Context, conn(ReqData)).
%%--------------------------------------------------------------------
conn(Req) ->
case rabbit_connection_tracking:lookup(rabbit_mgmt_util:id(connection, Req)) of
#tracked_connection{name = Name,
pid = Pid,
protocol = Protocol,
username = Username} ->
[{name, Name},
{pid, Pid},
{protocol, Protocol},
{user, Username}];
not_found ->
not_found
end.
session_infos(Pids) ->
lists:filtermap(
fun(Pid) ->
case rabbit_amqp_session:info(Pid) of
{ok, Infos} ->
{true, Infos};
{error, Reason} ->
rabbit_log:warning("failed to get infos for session ~p: ~tp",
[Pid, Reason]),
false
end
end, Pids).

View File

@ -200,8 +200,10 @@ all_tests() -> [
qq_status_test,
list_deprecated_features_test,
list_used_deprecated_features_test,
connections_test_amqpl,
connections_test_amqp,
connections_amqpl,
connections_amqp,
amqp_sessions,
amqpl_sessions,
enable_plugin_amqp
].
@ -239,7 +241,7 @@ finish_init(Group, Config) ->
merge_app_env(Config1).
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
{ok, _} = application:ensure_all_started(rabbitmq_amqp_client),
Config.
end_per_suite(Config) ->
@ -979,7 +981,7 @@ topic_permissions_test(Config) ->
http_delete(Config, "/vhosts/myvhost2", {group, '2xx'}),
passed.
connections_test_amqpl(Config) ->
connections_amqpl(Config) ->
{Conn, _Ch} = open_connection_and_channel(Config),
LocalPort = local_port(Conn),
Path = binary_to_list(
@ -1012,7 +1014,7 @@ connections_test_amqpl(Config) ->
passed.
%% Test that AMQP 1.0 connection can be listed and closed via the rabbitmq_management plugin.
connections_test_amqp(Config) ->
connections_amqp(Config) ->
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
User = <<"guest">>,
@ -1069,6 +1071,123 @@ connections_test_amqp(Config) ->
eventually(?_assertEqual([], http_get(Config, "/connections")), 10, 5),
?assertEqual(0, length(rpc(Config, rabbit_amqp1_0, list_local, []))).
%% Test that AMQP 1.0 sessions and links can be listed via the rabbitmq_management plugin.
amqp_sessions(Config) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
User = <<"guest">>,
OpnConf = #{address => ?config(rmq_hostname, Config),
port => Port,
container_id => <<"my container">>,
sasl => {plain, User, <<"guest">>}},
{ok, C} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, C, opened}} -> ok
after 5000 -> ct:fail(opened_timeout)
end,
{ok, Session1} = amqp10_client:begin_session_sync(C),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(
Session1, <<"my link pair">>),
QName = <<"my queue">>,
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link_sync(
Session1,
<<"my sender">>,
rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>)),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session1,
<<"my receiver">>,
rabbitmq_amqp_address:queue(QName)),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver, 5000, never),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
[Connection] = http_get(Config, "/connections"),
ConnectionName = maps:get(name, Connection),
Path = "/connections/" ++ binary_to_list(uri_string:quote(ConnectionName)) ++ "/sessions",
[Session] = http_get(Config, Path),
?assertMatch(
#{channel_number := 0,
handle_max := HandleMax,
next_incoming_id := NextIncomingId,
incoming_window := IncomingWindow,
next_outgoing_id := NextOutgoingId,
remote_incoming_window := RemoteIncomingWindow,
remote_outgoing_window := RemoteOutgoingWindow,
outgoing_unsettled_deliveries := 0,
incoming_links := [#{handle := 0,
link_name := <<"my link pair">>,
target_address := <<"/management">>,
delivery_count := DeliveryCount1,
credit := Credit1,
snd_settle_mode := <<"settled">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0},
#{handle := 2,
link_name := <<"my sender">>,
target_address := <<"/exchanges/amq.direct/my%20key">>,
delivery_count := DeliveryCount2,
credit := Credit2,
snd_settle_mode := <<"mixed">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0}],
outgoing_links := [#{handle := 1,
link_name := <<"my link pair">>,
source_address := <<"/management">>,
queue_name := <<>>,
delivery_count := DeliveryCount3,
credit := 0,
max_message_size := <<"unlimited">>,
send_settled := true},
#{handle := 3,
link_name := <<"my receiver">>,
source_address := <<"/queues/my%20queue">>,
queue_name := <<"my queue">>,
delivery_count := DeliveryCount4,
credit := 5000,
max_message_size := <<"unlimited">>,
send_settled := true}]
} when is_integer(HandleMax) andalso
is_integer(NextIncomingId) andalso
is_integer(IncomingWindow) andalso
is_integer(NextOutgoingId) andalso
is_integer(RemoteIncomingWindow) andalso
is_integer(RemoteOutgoingWindow) andalso
is_integer(Credit1) andalso
is_integer(Credit2) andalso
is_integer(IncomingMaxMsgSize) andalso
is_integer(DeliveryCount1) andalso
is_integer(DeliveryCount2) andalso
is_integer(DeliveryCount3) andalso
is_integer(DeliveryCount4),
Session),
{ok, _Session2} = amqp10_client:begin_session_sync(C),
Sessions = http_get(Config, Path),
?assertEqual(2, length(Sessions)),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(C).
%% Test that GET /connections/:name/sessions returns
%% 400 Bad Request for non-AMQP 1.0 connections.
amqpl_sessions(Config) ->
{Conn, _Ch} = open_connection_and_channel(Config),
LocalPort = local_port(Conn),
Path = binary_to_list(
rabbit_mgmt_format:print(
"/connections/127.0.0.1%3A~w%20-%3E%20127.0.0.1%3A~w/sessions",
[LocalPort, amqp_port(Config)])),
ok = await_condition(
fun() ->
http_get(Config, Path, 400),
true
end).
%% Test that AMQP 1.0 connection can be listed if the rabbitmq_management plugin gets enabled
%% after the connection was established.
enable_plugin_amqp(Config) ->

View File

@ -948,6 +948,7 @@ rabbitmq_management:
- rabbit_mgmt_wm_cluster_name
- rabbit_mgmt_wm_connection
- rabbit_mgmt_wm_connection_channels
- rabbit_mgmt_wm_connection_sessions
- rabbit_mgmt_wm_connection_user_name
- rabbit_mgmt_wm_connections
- rabbit_mgmt_wm_connections_vhost

View File

@ -11,6 +11,32 @@ This feature:
* adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order, and
* reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in.
### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation
[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation.
This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
### OAuth 2.0 Token Renewal on AMQP 1.0 Connections
[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections.
This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity.
If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection.
### Metrics for AMQP 1.0 Connections
[PR #12638](https://github.com/rabbitmq/rabbitmq-server/pull/12638) exposes the following AMQP 1.0 connection metrics in the RabbitMQ Management UI and the [/metrics/per-object](https://www.rabbitmq.com/docs/prometheus#per-object-endpoint) Prometheus endpoint:
* Bytes received and sent
* Reductions
* Garbage collections
* Number of channels/sessions
These metrics have already been emitted for AMQP 0.9.1 connections prior to RabbitMQ 4.1.
### AMQP 1.0 Sessions and Links in the Management UI
[PR #12670](https://github.com/rabbitmq/rabbitmq-server/pull/12670) displays detailed AMQP 1.0 session and link information on the Connection page of the Management UI including:
* Link names
* Link target and source addresses
* Link flow control state
* Session flow control state
* Number of unconfirmed and unacknowledged messages
### Prometheus histogram for message sizes
[PR #12342](https://github.com/rabbitmq/rabbitmq-server/pull/12342) exposes a Prometheus histogram for message sizes received by RabbitMQ.
@ -38,26 +64,6 @@ The introduction of required feature flags several minor versions ago showed the
See the [full GitHub project](https://github.com/orgs/rabbitmq/projects/4/views/1) for the complete list of improvements and fixes.
## New Features
### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation
[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation.
This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
### OAuth 2.0 Token Renewal on AMQP 1.0 Connections
[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections.
This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity.
If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection.
### Metrics for AMQP 1.0 Connections
[PR #12638](https://github.com/rabbitmq/rabbitmq-server/pull/12638) exposes the following AMQP 1.0 connection metrics in the RabbitMQ Management UI and the [/metrics/per-object](https://www.rabbitmq.com/docs/prometheus#per-object-endpoint) Prometheus endpoint:
* Bytes received and sent
* Reductions
* Garbage collections
* Number of channels/sessions
These metrics have already been emitted for AMQP 0.9.1 connections prior to RabbitMQ 4.1.
## Potential incompatibilities
* The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).