Display AMQP filters in Management UI

## What?

This commit displays effective filters of AMQP receivers in the Management UI.
There is a new column `Filters` for outgoing links.

Solves #13429

 ## Why?

This allows validating if the desired filters set by the receiver are
actually in place by the server.
In addition, it's convenient for a developer to check any filter values
including SQL filter expressions.

 ## How?

The session process stores the the formatted and effective filters in
its state.

The Management UI displays a box containing the filter name. This way
the table for the outgoing links is kept concise. Hovering with the
mouse over a box will show additionally the descriptor and the actual
filter-value/definition.
This commit is contained in:
David Ansari 2025-06-26 16:08:42 +02:00
parent 93db480bc4
commit 23c67304c9
4 changed files with 171 additions and 53 deletions

View File

@ -213,6 +213,7 @@
dynamic :: boolean(),
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),
filter :: list(),
%% When feature flag rabbitmq_4.0.0 becomes required,
%% the following 2 fields should be deleted.
@ -1487,6 +1488,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
dynamic = default(Source#'v1_0.source'.dynamic, false),
send_settled = SndSettled,
max_message_size = MaxMessageSize,
filter = format_filter(EffectiveFilter),
credit_api_version = CreditApiVsn,
delivery_count = DeliveryCount,
client_flow_ctl = ClientFlowCtl,
@ -3984,7 +3986,7 @@ info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress,
info_outgoing_management_links(Links) ->
[info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>,
true, MaxMessageSize, DeliveryCount, Credit)
true, MaxMessageSize, [], DeliveryCount, Credit)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
@ -4001,28 +4003,49 @@ info_outgoing_links(Links) ->
{'', ''}
end,
info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name,
SendSettled, MaxMessageSize, DeliveryCount, Credit)
SendSettled, MaxMessageSize, Filter, DeliveryCount, Credit)
end
|| Handle := #outgoing_link{
name = Name,
source_address = SourceAddress,
queue_name = QueueName,
max_message_size = MaxMessageSize,
send_settled = SendSettled,
max_message_size = MaxMessageSize,
filter = Filter,
client_flow_ctl = ClientFlowCtl} <- Links].
info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
MaxMessageSize, DeliveryCount, Credit) ->
MaxMessageSize, Filter, DeliveryCount, Credit) ->
[{handle, Handle},
{link_name, LinkName},
{source_address, SourceAddress},
{queue_name, QueueNameBin},
{send_settled, SendSettled},
{max_message_size, MaxMessageSize},
{filter, Filter},
{delivery_count, DeliveryCount},
{credit, Credit}].
format_filter(undefined) ->
[];
format_filter({map, KVList}) ->
[[{name, Name},
{descriptor, Descriptor},
{value, format_filter_value(Value)}]
|| {{symbol, Name}, {described, {_Type, Descriptor}, Value}} <- KVList].
format_filter_value({list, List}) ->
lists:map(fun format_filter_value/1, List);
format_filter_value({map, KVList}) ->
[[{key, Key},
{value, format_filter_value(Val)}]
|| {{_T, Key}, Val} <- KVList, is_binary(Key)];
format_filter_value({_Type, Val}) ->
Val;
format_filter_value(Val) ->
Val.
unwrap_simple_type(V = {list, _}) ->
V;
unwrap_simple_type(V = {map, _}) ->

View File

@ -617,7 +617,10 @@ var HELP = {
'"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.',
'outgoing-unsettled-deliveries':
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.'
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.',
'amqp-filter':
'Filters are predicates that define which messages RabbitMQ sends to the receiver. Each filter in the <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set">Filter Set</a> has a name displayed in the boxes below. Hovering over a box will display the filter descriptor and the filter definition.'
};
///////////////////////////////////////////////////////////////////////////

View File

@ -6,6 +6,46 @@ function getAddressClass(address) {
function getCreditClass(credit) {
return credit === 0 || credit === '0' ? 'yellow-background' : '';
}
function fmt_amqp_filter(filters) {
if (!filters || filters.length === 0) {
return '';
}
var entries = [];
for (var i = 0; i < filters.length; i++) {
var filter = filters[i];
var formatted_value = fmt_filter_value(filter.value);
var entry = '<abbr title="(descriptor: ' + fmt_escape_html(filter.descriptor) + ') ' +
fmt_escape_html(formatted_value) + '">' +
fmt_escape_html(filter.name) + '</abbr>';
entries.push(entry);
}
return entries.join(' ');
}
function fmt_filter_value(value) {
if (typeof value === 'string') {
return value;
} else if (Array.isArray(value)) {
if (value.length === 0) return '[]';
if (value[0] && value[0].key !== undefined) {
// array of key-value pairs
var props = value.map(function(kv) {
return kv.key + '=' + fmt_filter_value(kv.value);
}).join(', ');
return '{' + props + '}';
} else {
// regular array
return '[' + value.map(fmt_filter_value).join(', ') + ']';
}
} else if (typeof value === 'object' && value !== null) {
return JSON.stringify(value);
} else {
return String(value);
}
}
%>
<% if (sessions.length > 0) { %>
@ -91,6 +131,7 @@ function getCreditClass(credit) {
<th>max-message-size (bytes)</th>
<th>delivery-count</th>
<th>link-credit</th>
<th>Filters<span class="help" id="amqp-filter"></span></th>
</tr>
</thead>
<tbody>
@ -107,6 +148,7 @@ function getCreditClass(credit) {
<td class="c"><%= fmt_string(out_link.max_message_size) %></td>
<td class="c"><%= fmt_string(out_link.delivery_count) %></td>
<td class="c"><%= fmt_string(out_link.credit) %></td>
<td class="c"><%= fmt_amqp_filter(out_link.filter) %></td>
</tr>
<% } %>
</tbody>

View File

@ -8,6 +8,8 @@
-module(rabbit_mgmt_http_SUITE).
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-include_lib("amqp10_client/include/amqp10_client.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
@ -1127,16 +1129,34 @@ amqp_sessions(Config) ->
{ok, Session1} = amqp10_client:begin_session_sync(C),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(
Session1, <<"my link pair">>),
QName = <<"my queue">>,
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
QName = <<"my stream">>,
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}},
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link_sync(
Session1,
<<"my sender">>,
Session1, <<"my sender">>,
rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>)),
Filter = #{<<"ts filter">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
value = {timestamp, 1751023462000}},
<<"bloom filter">> => #filter{descriptor = <<"rabbitmq:stream-filter">>,
value = {list, [{utf8, <<"complaint">>},
{utf8, <<"user1">>}]}},
<<"match filter">> => #filter{descriptor = <<"rabbitmq:stream-match-unfiltered">>,
value = {boolean, true}},
<<"prop filter">> => #filter{descriptor = ?DESCRIPTOR_CODE_PROPERTIES_FILTER,
value = {map, [{{symbol, <<"subject">>},
{utf8, <<"complaint">>}},
{{symbol, <<"user-id">>},
{binary, <<"user1">>}}
]}},
<<"app prop filter">> => #filter{descriptor = ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER,
value = {map, [{{utf8, <<"k1">>}, {int, -4}},
{{utf8, <<"☀️"/utf8>>}, {utf8, <<"🙂"/utf8>>}}
]}}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session1,
<<"my receiver">>,
rabbitmq_amqp_address:queue(QName)),
Session1, <<"my receiver">>,
rabbitmq_amqp_address:queue(QName),
settled, none, Filter),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,
@ -1155,54 +1175,84 @@ amqp_sessions(Config) ->
next_outgoing_id := NextOutgoingId,
remote_incoming_window := RemoteIncomingWindow,
remote_outgoing_window := RemoteOutgoingWindow,
outgoing_unsettled_deliveries := 0,
incoming_links := [#{handle := 0,
link_name := <<"my link pair">>,
target_address := <<"/management">>,
delivery_count := DeliveryCount1,
credit := Credit1,
snd_settle_mode := <<"settled">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0},
#{handle := 2,
link_name := <<"my sender">>,
target_address := <<"/exchanges/amq.direct/my%20key">>,
delivery_count := DeliveryCount2,
credit := Credit2,
snd_settle_mode := <<"mixed">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0}],
outgoing_links := [#{handle := 1,
link_name := <<"my link pair">>,
source_address := <<"/management">>,
queue_name := <<>>,
delivery_count := DeliveryCount3,
credit := 0,
max_message_size := <<"unlimited">>,
send_settled := true},
#{handle := 3,
link_name := <<"my receiver">>,
source_address := <<"/queues/my%20queue">>,
queue_name := <<"my queue">>,
delivery_count := DeliveryCount4,
credit := 5000,
max_message_size := <<"unlimited">>,
send_settled := true}]
outgoing_unsettled_deliveries := 0
} when is_integer(HandleMax) andalso
is_integer(NextIncomingId) andalso
is_integer(IncomingWindow) andalso
is_integer(NextOutgoingId) andalso
is_integer(RemoteIncomingWindow) andalso
is_integer(RemoteOutgoingWindow) andalso
is_integer(Credit1) andalso
is_integer(Credit2) andalso
is_integer(IncomingMaxMsgSize) andalso
is_integer(DeliveryCount1) andalso
is_integer(DeliveryCount2) andalso
is_integer(DeliveryCount3) andalso
is_integer(DeliveryCount4),
is_integer(RemoteOutgoingWindow),
Session),
{ok, IncomingLinks} = maps:find(incoming_links, Session),
{ok, OutgoingLinks} = maps:find(outgoing_links, Session),
?assertEqual(2, length(IncomingLinks)),
?assertEqual(2, length(OutgoingLinks)),
?assertMatch([#{handle := 0,
link_name := <<"my link pair">>,
target_address := <<"/management">>,
delivery_count := DeliveryCount1,
credit := Credit1,
snd_settle_mode := <<"settled">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0},
#{handle := 2,
link_name := <<"my sender">>,
target_address := <<"/exchanges/amq.direct/my%20key">>,
delivery_count := DeliveryCount2,
credit := Credit2,
snd_settle_mode := <<"mixed">>,
max_message_size := IncomingMaxMsgSize,
unconfirmed_messages := 0}]
when is_integer(Credit1) andalso
is_integer(Credit2) andalso
is_integer(IncomingMaxMsgSize) andalso
is_integer(DeliveryCount1) andalso
is_integer(DeliveryCount2),
IncomingLinks),
[OutLink1, OutLink2] = OutgoingLinks,
?assertMatch(#{handle := 1,
link_name := <<"my link pair">>,
source_address := <<"/management">>,
queue_name := <<>>,
delivery_count := DeliveryCount3,
credit := 0,
max_message_size := <<"unlimited">>,
send_settled := true}
when is_integer(DeliveryCount3),
OutLink1),
#{handle := 3,
link_name := <<"my receiver">>,
source_address := <<"/queues/my%20stream">>,
queue_name := <<"my stream">>,
delivery_count := DeliveryCount4,
credit := 5000,
max_message_size := <<"unlimited">>,
send_settled := true,
filter := ActualFilter} = OutLink2,
?assert(is_integer(DeliveryCount4)),
ExpectedFilter = [#{name => <<"ts filter">>,
descriptor => <<"rabbitmq:stream-offset-spec">>,
value => 1751023462000},
#{name => <<"bloom filter">>,
descriptor => <<"rabbitmq:stream-filter">>,
value => [<<"complaint">>, <<"user1">>]},
#{name => <<"match filter">>,
descriptor => <<"rabbitmq:stream-match-unfiltered">>,
value => true},
#{name => <<"prop filter">>,
descriptor => ?DESCRIPTOR_CODE_PROPERTIES_FILTER,
value => [#{key => <<"subject">>, value => <<"complaint">>},
#{key => <<"user-id">>, value => <<"user1">>}]},
#{name => <<"app prop filter">>,
descriptor => ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER,
value => [#{key => <<"k1">>, value => -4},
#{key => <<"☀️"/utf8>>, value => <<"🙂"/utf8>>}]}],
?assertEqual(lists:sort(ExpectedFilter),
lists:sort(ActualFilter)),
{ok, _Session2} = amqp10_client:begin_session_sync(C),
Sessions = http_get(Config, Path),
?assertEqual(2, length(Sessions)),