rabbitmq-server/deps/rabbit/test/amqp_filtex_SUITE.erl

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

666 lines
30 KiB
Erlang
Raw Permalink Normal View History

Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% Test suite for
%% AMQP Filter Expressions Version 1.0 Working Draft 09
-module(amqp_filtex_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_common/include/amqp10_filtex.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
-compile([nowarn_export_all,
export_all]).
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/1]).
-import(amqp_utils,
[init/1,
connection_config/1,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
flush/1,
wait_for_credit/1,
wait_for_accepts/1,
send_messages/3,
detach_link_sync/1,
end_session_sync/1,
wait_for_session_end/1,
close_connection_sync/1]).
all() ->
[
{group, cluster_size_1}
].
groups() ->
[
{cluster_size_1, [shuffle],
[
properties_section,
application_properties_section,
multiple_sections,
filter_few_messages_from_many,
string_modifier
]}
].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [{quorum_tick_interval, 1000},
{stream_tick_interval, 1000}
]}).
end_per_suite(Config) ->
Config.
init_per_group(_Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
%% Wait for sessions to terminate before starting the next test case.
eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
properties_section(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
Now = erlang:system_time(millisecond),
To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>),
ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{message_id => {ulong, 999},
user_id => <<"guest">>,
to => To,
subject => <<"🐇"/utf8>>,
reply_to => ReplyTo,
correlation_id => <<"corr-123">>,
content_type => <<"text/plain">>,
content_encoding => <<"some encoding">>,
absolute_expiry_time => Now + 100_000,
creation_time => Now,
group_id => <<"my group ID">>,
group_sequence => 16#ff_ff_ff_ff,
reply_to_group_id => <<"other group ID">>},
amqp10_msg:new(<<"t1">>, <<"m1">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:new(<<"t2">>, <<"m2">>)),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:new(<<"t3">>, <<"m3">>))),
ok = wait_for_accepts(3),
ok = detach_link_sync(Sender),
flush(sent),
PropsFilter1 = [
{{symbol, <<"message-id">>}, {ulong, 999}},
{{symbol, <<"user-id">>}, {binary, <<"guest">>}},
{{symbol, <<"subject">>}, {utf8, <<"🐇"/utf8>>}},
{{symbol, <<"to">>}, {utf8, To}},
{{symbol, <<"reply-to">>}, {utf8, ReplyTo}},
{{symbol, <<"correlation-id">>}, {utf8, <<"corr-123">>}},
{{symbol, <<"content-type">>}, {symbol, <<"text/plain">>}},
{{symbol, <<"content-encoding">>}, {symbol, <<"some encoding">>}},
{{symbol, <<"absolute-expiry-time">>}, {timestamp, Now + 100_000}},
{{symbol, <<"creation-time">>}, {timestamp, Now}},
{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}},
{{symbol, <<"group-sequence">>}, {uint, 16#ff_ff_ff_ff}},
{{symbol, <<"reply-to-group-id">>}, {utf8, <<"other group ID">>}}
],
Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),
ok = amqp10_client:flow_link_credit(Receiver1, 10, never),
receive {amqp10_msg, Receiver1, R1M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1))
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver1),
PropsFilter2 = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}],
Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
unsettled, configuration, Filter2),
{ok, R2M1} = amqp10_client:get_msg(Receiver2),
{ok, R2M2} = amqp10_client:get_msg(Receiver2),
ok = amqp10_client:accept_msg(Receiver2, R2M1),
ok = amqp10_client:accept_msg(Receiver2, R2M2),
?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)),
?assertEqual([<<"m3">>], amqp10_msg:body(R2M2)),
ok = detach_link_sync(Receiver2),
%% Filter is in place, but no message matches.
PropsFilter3 = [{{symbol, <<"group-id">>}, {utf8, <<"no match">>}}],
Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
unsettled, configuration, Filter3),
receive {amqp10_event, {link, Receiver3, {attached, #'v1_0.attach'{}}}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
ok = amqp10_client:flow_link_credit(Receiver3, 10, never),
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver3),
%% Wrong type should fail validation in the server.
%% RabbitMQ should exclude this filter in its reply attach frame because
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
%% Hence, no filter expression is actually in place and we should receive all messages.
PropsFilter4 = [{{symbol, <<"group-id">>}, {uint, 3}}],
Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}},
{ok, Receiver4} = amqp10_client:attach_receiver_link(
Session, <<"receiver 4">>, Address,
unsettled, configuration, Filter4),
receive {amqp10_event,
{link, Receiver4,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter}}}}}} ->
?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}],
ActualFilter)
after 30000 -> ct:fail({missing_event, ?LINE})
end,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
{ok, R4M1} = amqp10_client:get_msg(Receiver4),
{ok, R4M2} = amqp10_client:get_msg(Receiver4),
{ok, R4M3} = amqp10_client:get_msg(Receiver4),
ok = amqp10_client:accept_msg(Receiver4, R4M1),
ok = amqp10_client:accept_msg(Receiver4, R4M2),
ok = amqp10_client:accept_msg(Receiver4, R4M3),
?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)),
?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)),
?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)),
ok = detach_link_sync(Receiver4),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
application_properties_section(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"k1">> => -2,
<<"k2">> => 10,
<<"k3">> => false,
<<"k4">> => true,
<<"k5">> => <<"hey">>},
amqp10_msg:new(<<"t1">>, <<"m1">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"k2">> => 10.1},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:new(<<"t3">>, <<"m3">>)),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"k2">> => 10.0},
amqp10_msg:new(<<"t4">>, <<"m4">>))),
ok = wait_for_accepts(4),
ok = detach_link_sync(Sender),
flush(sent),
AppPropsFilter0 = [{{utf8, <<"k5">>}, {symbol, <<"no match">>}}],
Filter0 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter0}},
{ok, Receiver0} = amqp10_client:attach_receiver_link(
Session, <<"receiver 0">>, Address,
unsettled, configuration, Filter0),
%% Wait for the attach so the detach command won't fail
receive {amqp10_event,
{link, Receiver0, {attached, #'v1_0.attach'{}}}} ->
ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
ok = amqp10_client:flow_link_credit(Receiver0, 10, never),
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver0),
AppPropsFilter1 = [
{{utf8, <<"k1">>}, {int, -2}},
{{utf8, <<"k5">>}, {symbol, <<"hey">>}},
{{utf8, <<"k4">>}, {boolean, true}},
{{utf8, <<"k3">>}, false}
],
Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),
receive {amqp10_event,
{link, Receiver1,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter1}}}}}} ->
?assertMatch(
{described, _Type, {map, [
{{utf8, <<"k1">>}, {int, -2}},
{{utf8, <<"k5">>}, {symbol, <<"hey">>}},
{{utf8, <<"k4">>}, true},
{{utf8, <<"k3">>}, false}
]}},
proplists:get_value({symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER}, ActualFilter1))
after 30000 -> ct:fail({missing_event, ?LINE})
end,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
ok = amqp10_client:flow_link_credit(Receiver1, 10, never),
receive {amqp10_msg, Receiver1, R1M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1))
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver1),
%% Due to simple type matching [filtex-v1.0-wd09 §4.1.1]
%% we expect integer 10 to also match number 10.0.
AppPropsFilter2 = [{{utf8, <<"k2">>}, {uint, 10}}],
Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter2}},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
unsettled, configuration, Filter2),
{ok, R2M1} = amqp10_client:get_msg(Receiver2),
{ok, R2M2} = amqp10_client:get_msg(Receiver2),
ok = amqp10_client:accept_msg(Receiver2, R2M1),
ok = amqp10_client:accept_msg(Receiver2, R2M2),
?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)),
?assertEqual([<<"m4">>], amqp10_msg:body(R2M2)),
ok = detach_link_sync(Receiver2),
%% A reference field value of NULL should always match. [filtex-v1.0-wd09 §4.1.1]
AppPropsFilter3 = [{{utf8, <<"k2">>}, null}],
Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter3}},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
unsettled, configuration, Filter3),
{ok, R3M1} = amqp10_client:get_msg(Receiver3),
{ok, R3M2} = amqp10_client:get_msg(Receiver3),
{ok, R3M3} = amqp10_client:get_msg(Receiver3),
ok = amqp10_client:accept_msg(Receiver3, R3M1),
ok = amqp10_client:accept_msg(Receiver3, R3M2),
ok = amqp10_client:accept_msg(Receiver3, R3M3),
?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)),
?assertEqual([<<"m2">>], amqp10_msg:body(R3M2)),
?assertEqual([<<"m4">>], amqp10_msg:body(R3M3)),
ok = detach_link_sync(Receiver3),
%% Wrong type should fail validation in the server.
%% RabbitMQ should exclude this filter in its reply attach frame because
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
%% Hence, no filter expression is actually in place and we should receive all messages.
AppPropsFilter4 = [{{symbol, <<"k2">>}, {uint, 10}}],
Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter4}},
{ok, Receiver4} = amqp10_client:attach_receiver_link(
Session, <<"receiver 4">>, Address,
unsettled, configuration, Filter4),
receive {amqp10_event,
{link, Receiver4,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter4}}}}}} ->
?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}],
ActualFilter4)
after 30000 -> ct:fail({missing_event, ?LINE})
end,
{ok, R4M1} = amqp10_client:get_msg(Receiver4),
{ok, R4M2} = amqp10_client:get_msg(Receiver4),
{ok, R4M3} = amqp10_client:get_msg(Receiver4),
{ok, R4M4} = amqp10_client:get_msg(Receiver4),
ok = amqp10_client:accept_msg(Receiver4, R4M1),
ok = amqp10_client:accept_msg(Receiver4, R4M2),
ok = amqp10_client:accept_msg(Receiver4, R4M3),
ok = amqp10_client:accept_msg(Receiver4, R4M4),
?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)),
?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)),
?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)),
?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)),
ok = detach_link_sync(Receiver4),
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% Test filter expressions matching multiple message sections.
multiple_sections(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
{Connection, Session, LinkPair} = init(Config),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{subject => <<"The Subject">>},
amqp10_msg:new(<<"t1">>, <<"m1">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"The Key">> => -123},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{subject => <<"The Subject">>},
amqp10_msg:set_application_properties(
#{<<"The Key">> => -123},
amqp10_msg:new(<<"t3">>, <<"m3">>)))),
ok = wait_for_accepts(3),
ok = detach_link_sync(Sender),
flush(sent),
PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"The Subject">>}}],
Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
unsettled, configuration, Filter1),
{ok, R1M1} = amqp10_client:get_msg(Receiver1),
{ok, R1M3} = amqp10_client:get_msg(Receiver1),
ok = amqp10_client:accept_msg(Receiver1, R1M1),
ok = amqp10_client:accept_msg(Receiver1, R1M3),
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)),
?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)),
ok = detach_link_sync(Receiver1),
AppPropsFilter = [{{utf8, <<"The Key">>}, {byte, -123}}],
Filter2 = #{?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
unsettled, configuration, Filter2),
{ok, R2M2} = amqp10_client:get_msg(Receiver2),
{ok, R2M3} = amqp10_client:get_msg(Receiver2),
ok = amqp10_client:accept_msg(Receiver2, R2M2),
ok = amqp10_client:accept_msg(Receiver2, R2M3),
?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)),
?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)),
ok = detach_link_sync(Receiver2),
Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter},
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
unsettled, configuration, Filter3),
{ok, R3M3} = amqp10_client:get_msg(Receiver3),
ok = amqp10_client:accept_msg(Receiver3, R3M3),
?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)),
ok = detach_link_sync(Receiver3),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% Filter a small subset from many messages.
%% We test here that flow control still works correctly.
filter_few_messages_from_many(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
{Connection, Session, LinkPair} = init(Config),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:new(<<"t1">>, <<"first msg">>))),
ok = send_messages(Sender, 1000, false),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:new(<<"t2">>, <<"last msg">>))),
ok = wait_for_accepts(1002),
ok = detach_link_sync(Sender),
flush(sent),
%% Our filter should cause us to receive only the first and
%% last message out of the 1002 messages in the stream.
PropsFilter = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}],
Filter = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address,
unsettled, configuration, Filter),
ok = amqp10_client:flow_link_credit(Receiver, 2, never),
receive {amqp10_msg, Receiver, M1} ->
?assertEqual([<<"first msg">>], amqp10_msg:body(M1)),
ok = amqp10_client:accept_msg(Receiver, M1)
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
receive {amqp10_msg, Receiver, M2} ->
?assertEqual([<<"last msg">>], amqp10_msg:body(M2)),
ok = amqp10_client:accept_msg(Receiver, M2)
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
ok = detach_link_sync(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
string_modifier(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
{Connection, Session, LinkPair} = init(Config),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{to => <<"abc 1">>,
reply_to => <<"abc 2">>,
subject => <<"abc 3">>,
group_id => <<"abc 4">>,
reply_to_group_id => <<"abc 5">>,
message_id => {utf8, <<"abc 6">>},
correlation_id => <<"abc 7">>,
group_sequence => 16#ff_ff_ff_ff},
amqp10_msg:set_application_properties(
#{<<"k1">> => <<"abc 8">>,
<<"k2">> => <<"abc 9">>},
amqp10_msg:new(<<"t1">>, <<"m1">>)))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"k1">> => <<"abc">>},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{subject => <<"&Hello">>,
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
reply_to_group_id => <<"xyz 5">>},
amqp10_msg:new(<<"t3">>, <<"m3">>))),
ok = wait_for_accepts(3),
ok = detach_link_sync(Sender),
flush(sent),
PropsFilter1 = [
{{symbol, <<"to">>}, {utf8, <<"&p:abc ">>}},
{{symbol, <<"reply-to">>}, {utf8, <<"&p:abc">>}},
{{symbol, <<"subject">>}, {utf8, <<"&p:ab">>}},
{{symbol, <<"group-id">>}, {utf8, <<"&p:a">>}},
{{symbol, <<"reply-to-group-id">>}, {utf8, <<"&s:5">>}},
{{symbol, <<"correlation-id">>}, {utf8, <<"&s:abc 7">>}},
{{symbol, <<"message-id">>}, {utf8, <<"&p:abc 6">>}}
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
],
AppPropsFilter1 = [
{{utf8, <<"k1">>}, {utf8, <<"&s: 8">>}},
{{utf8, <<"k2">>}, {utf8, <<"&p:abc ">>}}
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
],
Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1},
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),
ok = amqp10_client:flow_link_credit(Receiver1, 10, never),
receive {amqp10_msg, Receiver1, R1M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1))
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver1),
%% Same filters as before except for subject which shouldn't match anymore.
PropsFilter2 = lists:keyreplace(
{symbol, <<"subject">>}, 1, PropsFilter1,
{{symbol, <<"subject">>}, {utf8, <<"&s:xxxxxxxxxxxxxx">>}}),
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2},
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
settled, configuration, Filter2),
ok = amqp10_client:flow_link_credit(Receiver2, 10, never),
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver2),
PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"&s: 5">>}}],
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
settled, configuration, Filter3),
ok = amqp10_client:flow_link_credit(Receiver3, 10, never),
receive {amqp10_msg, Receiver3, R3M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R3M1))
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
receive {amqp10_msg, Receiver3, R3M3} ->
?assertEqual([<<"m3">>], amqp10_msg:body(R3M3))
after 30000 -> ct:fail({missing_msg, ?LINE})
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
end,
ok = detach_link_sync(Receiver3),
%% '&&" is the escape prefix for case-sensitive matching of a string starting with &
PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"&&Hello">>}}],
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver4} = amqp10_client:attach_receiver_link(
Session, <<"receiver 4">>, Address,
settled, configuration, Filter4),
{ok, R4M3} = amqp10_client:get_msg(Receiver4),
?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)),
ok = detach_link_sync(Receiver4),
%% Starting the reference field value with & is invalid without using a valid modifier
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
%% prefix is invalid.
%% RabbitMQ should exclude this filter in its reply attach frame because
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
%% Hence, no filter expression is actually in place and we should receive all messages.
PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"&Hello">>}}],
Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7.
2024-10-07 23:12:26 +08:00
Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver5} = amqp10_client:attach_receiver_link(
Session, <<"receiver 5">>, Address,
settled, configuration, Filter5),
{ok, R5M1} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)),
{ok, R5M2} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)),
{ok, R5M3} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)),
ok = detach_link_sync(Receiver5),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
assert_no_msg_received(Line) ->
receive {amqp10_msg, _, _} = Msg ->
ct:fail({received_unexpected_msg, Line, Msg})
after 10 ->
ok
end.