Support SQL filter expressions for streams

## What?

This commit allows AMQP 1.0 clients to define SQL-like filter expressions when consuming from streams, enabling server-side message filtering.
RabbitMQ will only dispatch messages that match the provided filter expression, reducing network traffic and client-side processing overhead.
SQL filter expressions are a more powerful alternative to the [AMQP Property Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) introduced in RabbitMQ 4.1.

SQL filter expressions are based on the [JMS message selector syntax](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax) and support:
* Comparison operators (`=`, `<>`, `>`, `<`, `>=`, `<=`)
* Logical operators (`AND`, `OR`, `NOT`)
* Arithmetic operators (`+`, `-`, `*`, `/`)
* Special operators (`BETWEEN`, `LIKE`, `IN`, `IS NULL`)
* Access to the properties and application-properties sections

**Examples**

Simple expression:

```sql
header.priority > 4
```

Complex expression:

```sql
order_type IN ('premium', 'express') AND
total_amount BETWEEN 100 AND 5000 AND
(customer_region LIKE 'EU-%' OR customer_region = 'US-CA') AND
properties.creation-time >= 1750772279000 AND
NOT cancelled
```

Like AMQP property filter expressions, SQL filter expressions can be
combined with Bloom filters. Combining both allows for highly customisable
expressions (SQL) and extremely fast evaluation (Bloom filter) if only a
subset of the chunks need to be read from disk.

 ## Why?

Compared to AMQP property filter expressions, SQL filter expressions provide the following advantage:
* High expressiveness and flexibility in defining the filter

Like for AMQP property filter expressions, the following advantages apply:
* No false positives (as is the case for Bloom filters)
* Multiple concurrent clients can attach to the same stream each consuming
  only a specific subset of messages while preserving message order.
* Low network overhead as only messages that match the filter are
  transferred to the client
* Likewise, lower resource usage (CPU and memory) on clients since they
  don't need to deserialise messages that they are not interested in.
* If the SQL expression is simple, even the broker will save resources
  because it doesn't need to serialse and send messages that the client
  isn't interested in.

 ## How?

 ### JMS Message Selector Syntax vs. AMQP Extension Spec

The AMQP Filter Expressions Version 1.0 extension Working Draft 09 defines SQL Filter Expressions in Section 6.
This spec differs from the JMS message selector spec. Neither is a subset of the other. We can choose to follow either.
However, I think it makes most sense to follow the JMS spec because:
* The JMS spec is better defined
* The JMS spec is far more widespread than the AMQP Working Draft spec. (A slight variation of the AMQP Working
  Draft is used by Azure Service Bus: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter)
* The JMS spec is mostly simpler (partly because matching on only simple types)
* This will allow for a single SQL parser in RabbitMQ for both AMQP clients consuming from a stream and possibly in future for JMS clients consuming from queues or topics.

<details>
<summary>AMQP extension spec vs JMS spec</summary>

AMQP

!= is synonym for <>

JMS

defines only <>

Conclusion

<> is sufficient

AMQP

Strings can be tested for “greater than”

“both operands are of type string or of type symbol (any combination is permitted) and

the lexicographical rank of the left operand is greater than the lexicographical rank of the right operand”

JMS

“String and Boolean comparison is restricted to = and <>.”

Conclusion

The JMS behaviour is sufficient.

AMQP

IN <set-expression>

set-expression can contain non-string literals

JMS:

set-expression can contain only string literals

Conclusion

The JMS behaviour is sufficient.

AMQP

EXISTS predicate to check for composite types

JMS

Only simple types

Conclusion

We want to match only for simple types, i.e. allowing matching only against values in the application-properties, properties sections and priority field of the header section.

AMQP:

Modulo operator %

Conclusion

JMS doesn't define the modulo operator. Let's start without it.

We can decide in future to add support since it can actually be useful, for example for two receivers who want to process every other message.

AMQP:

The ‘+’ operator can concatenate string and symbol values

Conclusion

Such string concatenation isn't defined in JMS. We don't need it.

AMQP:

Define NAN and INF

JMS:

“Approximate literals use the Java floating-point literal syntax.”

Examples include "7."

Conclusion

We can go with the JMS spec given that needs to be implemented anyway
for JMS support.
Scientific notations are supported in both the AMQP spec and JMS spec.

AMQP

String literals can be surrounded by single or double quotation marks

JMS

A string literal is enclosed in single quotes

Conclusion

Supporting single quotes is good enough.

AMQP

“A binary constant is a string of pairs of hexadecimal digits prefixed by ‘0x’ that are not enclosed in quotation marks”

Conclusion

JMS doesn't support binary constants. We can start without binary constants.

Matching against binary values are still supported if these binary values can be expressed as UTF-8 strings.

AMQP

Functions DATE, UTC, SUBSTRING, LOWER, UPPER, LEFT, RIGHT

Vendor specific functions

Conclusion

JMS doesn't define such functions. We can start without those functions.

AMQP

<field><array_element_reference>

<field>‘.’<composite_type_reference>

to access map and array elements

Conclusion

Same as above:

We want to match only for simple types, i.e.  allowing matching only against values in the application-properties, properties sections and priority field of the header section.

AMQP

allows for delimited identifiers

JMS

Java identifier part characters

Conclusion

We can go with the Java identifiers extending the allowed characters by
`.` and `-` to reference field names such as `properties.group-id`.

JMS:

BETWEEN operator

Conclusion

The BETWEEN operator isn't supported in the AMQP spec. Let's support it as convenience since it's already available in JMS.

</details>

  ### Filter Name

The client provides a filter with name `sql-filter` instead of name
`jms-selector` to allow to differentiate between JMS clients and other
native AMQP 1.0 clients using SQL expressions. This way, we can also
optionally extend the SQL grammar in future.

  ### Identifiers

JMS message selectors allow identifiers to contain some well known JMS headers that match to well known AMQP fields, for example:
 ```erl
jms_header_to_amqp_field_name(<<"JMSDeliveryMode">>) -> durable;
jms_header_to_amqp_field_name(<<"JMSPriority">>) -> priority;
jms_header_to_amqp_field_name(<<"JMSMessageID">>) -> message_id;
jms_header_to_amqp_field_name(<<"JMSTimestamp">>) -> creation_time;
jms_header_to_amqp_field_name(<<"JMSCorrelationID">>) -> correlation_id;
jms_header_to_amqp_field_name(<<"JMSType">>) -> subject;
%% amqp-bindmap-jms-v1.0-wd10 § 3.2.2 JMS-defined ’JMSX’ Properties
jms_header_to_amqp_field_name(<<"JMSXUserID">>) -> user_id;
jms_header_to_amqp_field_name(<<"JMSXGroupID">>) -> group_id;
jms_header_to_amqp_field_name(<<"JMSXGroupSeq">>) -> group_sequence;
```

This commit does a similar matching for `header.` and `properties.` prefixed identifiers to field names in the AMQP property section.
The only field that is supported to filter on in the AMQP header section is `priority`, that is identifier `header.priority`.

By default, as described in the AMQP extension spec, if an identifier is not prefixed, it refers to a key in the application-properties section.

Hence, all identifiers prefixed with `header.`, and `properties.` have special meanings and MUST be avoided by applications unless they want to refer to those specific fields.

Azure Service Bus uses the `sys.` and `user.` prefixes for well known field names and arbitrary application-provided keys, respectively.

  ### SQL lexer, parser and evaluator

This commit implements the SQL lexer and parser in files rabbit_jms_selector_lexer.xrl and
rabbit_jms_selector_parser.yrl, respectively.

Advantages:
* Both the definitions in the lexer and the grammar in the parser are defined **declaratively**.
* In total, the entire SQL syntax and grammar is defined in only 240 lines.
* Therefore, lexer and parser are simple to maintain.

The idea of this commit is to use the same lexer and parser for native AMQP clients consumings
from streams (this commit) as for JMS clients (in the future).
All native AMQP client vs JMS client bits are then manipulated after
the Abstract Syntax Tree (AST) has been created by the parser.

For example, this commit transforms the AST specifically for native AMQP clients
by mapping `properties.` prefixed identifiers (field names) to atoms.
A JMS client's mapping from `JMS` prefixed headers can transform the AST
differently.

Likewise, this commit transforms the AST to compile a regex for complex LIKE
expressions when consuming from a stream while a future version
might not want to compile a regex when consuming from quorum queues.

Module `rabbit_jms_ast` provides such AST helper methods.

The lexer and parser are not performance critical as this work happens
upon receivers attaching to the stream.

The evaluator however is performance critical as message evaluation
happens on the hot path.

 ### LIKE expressions

The evaluator has been optimised to only compile a regex when necessary.
If the LIKE expression-value contains no wildcard or only a single `%`
wildcard, Erlang pattern matching is used as it's more efficient.
Since `_` can match any UTF-8 character, a regex will be compiled with
the `[unicode]` options.

  ### Filter errors

Any errors upon a receiver attaching to a stream causes the filter to
not become active. RabbitMQ will log a warning describing the reason and
will omit the named filter in its attach reply frame. The client lib is
responsible for detaching the link as explained in the AMQP spec:
> The receiving endpoint sets its desired filter, the sending endpoint sets the filter actually in place
(including any filters defaulted at the node). The receiving endpoint MUST check that the filter in
place meets its needs and take responsibility for detaching if it does not.

This applies to lexer and parser errors.

Errors during message evaluation will result in an unknown value.
Conditional operators on unknown are described in the JMS spec. If the
entire selector condition is unknown, the message does not match, and
will therefore not be delivered to the client.

 ## Clients

Support for passing the SQL expression from app to broker is provided by
the Java client in https://github.com/rabbitmq/rabbitmq-amqp-java-client/pull/216
This commit is contained in:
David Ansari 2025-06-10 16:43:14 +02:00
parent 268ff69556
commit 93db480bc4
28 changed files with 6115 additions and 149 deletions

View File

@ -0,0 +1,11 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-record(filter, {
descriptor :: binary() | non_neg_integer(),
value :: term()
}).

View File

@ -7,7 +7,7 @@
-module(amqp10_client).
-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-export([open_connection/1,

View File

@ -9,7 +9,7 @@
-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

View File

@ -8,7 +8,7 @@
-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-ifdef(TEST).

View File

@ -9,6 +9,7 @@
-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
@ -86,7 +87,7 @@
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
% http://www.amqp.org/specification/1.0/filters
-type filter() :: #{binary() => binary() | map() | list(binary())}.
-type filter() :: #{binary() => #filter{} | binary() | map() | list(binary())}.
-type max_message_size() :: undefined | non_neg_integer().
-type footer_opt() :: crc32 | adler32.
@ -781,29 +782,39 @@ translate_filters(Filters)
when map_size(Filters) =:= 0 ->
undefined;
translate_filters(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
{map, lists:map(
fun({Name, #filter{descriptor = Desc,
value = V}})
when is_binary(Name) ->
Descriptor = if is_binary(Desc) -> {symbol, Desc};
is_integer(Desc) -> {ulong, Desc}
end,
{{symbol, Name}, {described, Descriptor, V}};
({<<"apache.org:legacy-amqp-headers-binding:map">> = K, V})
when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
(K, V, Acc) when is_binary(K) ->
%% try treat any filter value generically
Key = sym(K),
Value = filter_value_type(V),
[{Key, {described, Key, Value}} | Acc]
end, [], Filters)}.
Val = translate_legacy_amqp_headers_binding(V),
{Key, {described, Key, Val}};
({K, V})
when is_binary(K) ->
Key = {symbol, K},
Val = filter_value_type(V),
{Key, {described, Key, Val}}
end, maps:to_list(Filters))}.
filter_value_type(V) when is_binary(V) ->
filter_value_type(V)
when is_binary(V) ->
%% this is clearly not always correct
{utf8, V};
filter_value_type(V)
when is_integer(V) andalso V >= 0 ->
{uint, V};
filter_value_type(VList) when is_list(VList) ->
filter_value_type(VList)
when is_list(VList) ->
{list, [filter_value_type(V) || V <- VList]};
filter_value_type({T, _} = V) when is_atom(T) ->
filter_value_type({T, _} = V)
when is_atom(T) ->
%% looks like an already tagged type, just pass it through
V.
@ -1507,16 +1518,17 @@ translate_filters_selector_filter_test() ->
} = translate_filters(#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}).
translate_filters_multiple_filters_test() ->
{map,
[
{{symbol, <<"apache.org:selector-filter:string">>},
{map, Actual} = translate_filters(
#{
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
}),
Expected = [{{symbol, <<"apache.org:selector-filter:string">>},
{described, {symbol, <<"apache.org:selector-filter:string">>},
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}
]
} = translate_filters(#{
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
}).
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}],
ActualSorted = lists:sort(Actual),
ExpectedSorted = lists:sort(Expected),
ExpectedSorted = ActualSorted.
-endif.

View File

@ -16,7 +16,7 @@
recv_amqp_header_step/1
]).
-include("src/amqp10_client.hrl").
-include("src/amqp10_client_internal.hrl").
start(Port) ->
{ok, LSock} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, false}]),

View File

@ -0,0 +1,31 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% A filter with this name contains a JMS message selector.
%% We use the same name as sent by the Qpid JMS client in
%% https://github.com/apache/qpid-jms/blob/2.7.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
-define(FILTER_NAME_JMS, <<"jms-selector">>).
%% A filter with this name contains an SQL expression.
%% In the current version, such a filter must comply with the JMS message selector syntax.
%% However, we use a name other than "jms-selector" in case we want to extend the allowed syntax
%% in the future, for example allowing for some of the extended grammar described in
%% §6 "SQL Filter Expressions" of
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-define(FILTER_NAME_SQL, <<"sql-filter">>).
%% SQL-based filtering syntax
%% These descriptors are defined in
%% https://www.amqp.org/specification/1.0/filters
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).
%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).

View File

@ -1,15 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).

View File

@ -258,7 +258,7 @@ define ct_master.erl
endef
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_jms_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
@ -363,6 +363,9 @@ ifdef TRACE_SUPERVISOR2
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
endif
# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
export ERL_COMPILER_OPTIONS := deterministic
# --------------------------------------------------------------------
# Documentation.
# --------------------------------------------------------------------

View File

@ -45,7 +45,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
-type x_ann_value() :: str() | number() | TaggedValue :: tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.
@ -76,8 +76,7 @@
-type property_value() :: undefined |
string() |
binary() |
integer() |
float() |
number() |
boolean().
-type tagged_value() :: {uuid, binary()} |
{utf8, binary()} |
@ -155,9 +154,9 @@ init(Proto, Data, Anns) ->
-spec init(protocol(), term(), annotations(), environment()) -> state().
init(Proto, Data, Anns0, Env) ->
{ProtoData, ProtoAnns} = Proto:init(Data),
Anns1 = case map_size(Env) =:= 0 of
true -> Anns0;
false -> Anns0#{env => Env}
Anns1 = case map_size(Env) of
0 -> Anns0;
_ -> Anns0#{env => Env}
end,
Anns2 = maps:merge(ProtoAnns, Anns1),
Anns = ensure_received_at_timestamp(Anns2),

24
deps/rabbit/src/rabbit_amqp_filter.erl vendored Normal file
View File

@ -0,0 +1,24 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(rabbit_amqp_filter).
-export([eval/2]).
-type expression() :: undefined |
{property, rabbit_amqp_filter_prop:parsed_expressions()} |
{jms, rabbit_amqp_filter_jms:parsed_expression()}.
-export_type([expression/0]).
-spec eval(expression(), mc:state()) -> boolean().
eval(undefined, _Mc) ->
%% A receiver without filter wants all messages.
true;
eval({property, Expr}, Mc) ->
rabbit_amqp_filter_prop:eval(Expr, Mc);
eval({jms, Expr}, Mc) ->
rabbit_amqp_filter_jms:eval(Expr, Mc).

View File

@ -0,0 +1,476 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(rabbit_amqp_filter_jms).
-feature(maybe_expr, enable).
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-type parsed_expression() :: {ApplicationProperties :: boolean(),
rabbit_jms_ast:ast()}.
-export_type([parsed_expression/0]).
-export([parse/1,
eval/2]).
%% [filtex-v1.0-wd09 7.1]
-define(MAX_EXPRESSION_LENGTH, 4096).
-define(MAX_TOKENS, 200).
%% defined in both AMQP and JMS
-define(DEFAULT_MSG_PRIORITY, 4).
-define(IS_CONTROL_CHAR(C), C < 32 orelse C =:= 127).
-spec parse(tuple()) ->
{ok, parsed_expression()} | error.
parse({described, Descriptor, {utf8, JmsSelector}}) ->
maybe
ok ?= check_descriptor(Descriptor),
{ok, String} ?= jms_selector_to_list(JmsSelector),
ok ?= check_length(String),
{ok, Tokens} ?= tokenize(String, JmsSelector),
ok ?= check_token_count(Tokens, JmsSelector),
{ok, Ast0} ?= parse(Tokens, JmsSelector),
{ok, Ast} ?= transform_ast(Ast0, JmsSelector),
AppProps = has_binary_identifier(Ast),
{ok, {AppProps, Ast}}
end.
%% Evaluates a parsed JMS message selector expression.
-spec eval(parsed_expression(), mc:state()) -> boolean().
eval({ApplicationProperties, Ast}, Msg) ->
State = case ApplicationProperties of
true ->
AppProps = mc:routing_headers(Msg, []),
{AppProps, Msg};
false ->
Msg
end,
%% "a selector that evaluates to true matches;
%% a selector that evaluates to false or unknown does not match."
eval0(Ast, State) =:= true.
%% Literals
eval0({Type, Value}, _Msg)
when Type =:= integer orelse
Type =:= float orelse
Type =:= string orelse
Type =:= boolean ->
Value;
%% Identifier lookup
eval0({identifier, Key}, State) when is_binary(Key) ->
{AppProps, _Msg} = State,
maps:get(Key, AppProps, undefined);
eval0({identifier, FieldName}, State) when is_atom(FieldName) ->
Msg = case mc:is(State) of
true ->
State;
false ->
{_AppProps, Mc} = State,
Mc
end,
get_field_value(FieldName, Msg);
%% Logical operators
%%
%% Table 3-4 in
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#null-values
eval0({'and', Expr1, Expr2}, Msg) ->
case eval0(Expr1, Msg) of
true ->
case eval0(Expr2, Msg) of
true -> true;
false -> false;
_Unknown -> undefined
end;
false ->
% Short-circuit
false;
_Unknown ->
case eval0(Expr2, Msg) of
false -> false;
_ -> undefined
end
end;
%% Table 3-5 in
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#null-values
eval0({'or', Expr1, Expr2}, Msg) ->
case eval0(Expr1, Msg) of
true ->
%% Short-circuit
true;
false ->
case eval0(Expr2, Msg) of
true -> true;
false -> false;
_Unknown -> undefined
end;
_Unknown ->
case eval0(Expr2, Msg) of
true -> true;
_ -> undefined
end
end;
%% Table 3-6 in
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#null-values
eval0({'not', Expr}, Msg) ->
case eval0(Expr, Msg) of
true -> false;
false -> true;
_Unknown -> undefined
end;
%% Comparison operators
eval0({Op, Expr1, Expr2}, Msg)
when Op =:= '=' orelse
Op =:= '<>' orelse
Op =:= '>' orelse
Op =:= '<' orelse
Op =:= '>=' orelse
Op =:= '<=' ->
compare(Op, eval0(Expr1, Msg), eval0(Expr2, Msg));
%% Arithmetic operators
eval0({Op, Expr1, Expr2}, Msg)
when Op =:= '+' orelse
Op =:= '-' orelse
Op =:= '*' orelse
Op =:= '/' ->
arithmetic(Op, eval0(Expr1, Msg), eval0(Expr2, Msg));
%% Unary operators
eval0({unary_plus, Expr}, Msg) ->
Val = eval0(Expr, Msg),
case is_number(Val) of
true -> Val;
false -> undefined
end;
eval0({unary_minus, Expr}, Msg) ->
Val = eval0(Expr, Msg),
case is_number(Val) of
true -> -Val;
false -> undefined
end;
%% Special operators
eval0({'between', Expr, From, To}, Msg) ->
Value = eval0(Expr, Msg),
FromVal = eval0(From, Msg),
ToVal = eval0(To, Msg),
between(Value, FromVal, ToVal);
eval0({'in', Expr, ValueList}, Msg) ->
Value = eval0(Expr, Msg),
is_in(Value, ValueList);
eval0({'is_null', Expr}, Msg) ->
eval0(Expr, Msg) =:= undefined;
eval0({'like', Expr, {pattern, Pattern}}, Msg) ->
Subject = eval0(Expr, Msg),
case is_binary(Subject) of
true ->
like(Subject, Pattern);
false ->
%% "If identifier of a LIKE or NOT LIKE operation is NULL,
%% the value of the operation is unknown."
undefined
end.
%% "Comparison or arithmetic with an unknown value always yields an unknown value."
compare(_Op, Left, Right) when Left =:= undefined orelse Right =:= undefined ->
undefined;
%% "Only like type values can be compared.
%% One exception is that it is valid to compare exact numeric values and approximate numeric values.
%% String and Boolean comparison is restricted to = and <>."
compare('=', Left, Right) ->
Left == Right;
compare('<>', Left, Right) ->
Left /= Right;
compare('>', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left > Right;
compare('<', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left < Right;
compare('>=', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left >= Right;
compare('<=', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left =< Right;
compare(_, _, _) ->
%% "If the comparison of non-like type values is attempted,
%% the value of the operation is false."
false.
arithmetic(_Op, Left, Right) when Left =:= undefined orelse Right =:= undefined ->
undefined;
arithmetic('+', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left + Right;
arithmetic('-', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left - Right;
arithmetic('*', Left, Right) when is_number(Left) andalso is_number(Right) ->
Left * Right;
arithmetic('/', Left, Right) when is_number(Left) andalso is_number(Right) andalso Right /= 0 ->
Left / Right;
arithmetic(_, _, _) ->
undefined.
between(Value, From, To)
when Value =:= undefined orelse
From =:= undefined orelse
To =:= undefined ->
undefined;
between(Value, From, To)
when is_number(Value) andalso
is_number(From) andalso
is_number(To) ->
From =< Value andalso Value =< To;
between(_, _, _) ->
%% BETWEEN requires arithmetic expressions
%% "a string cannot be used in an arithmetic expression"
false.
is_in(undefined, _) ->
%% "If identifier of an IN or NOT IN operation is NULL,
%% the value of the operation is unknown."
undefined;
is_in(Value, List) ->
lists:member(Value, List).
like(Subject, {exact, Pattern}) ->
Subject =:= Pattern;
like(Subject, {prefix, PrefixSize, Prefix}) ->
case Subject of
<<Prefix:PrefixSize/binary, _/binary>> ->
true;
_ ->
false
end;
like(Subject, {suffix, SuffixSize, Suffix}) ->
case Subject of
<<_:(byte_size(Subject) - SuffixSize)/binary, Suffix:SuffixSize/binary>> ->
true;
_ ->
false
end;
like(Subject,{{prefix, PrefixSize, _} = Prefix,
{suffix, SuffixSize, _} = Suffix}) ->
byte_size(Subject) >= PrefixSize + SuffixSize andalso
like(Subject, Prefix) andalso
like(Subject, Suffix);
like(Subject, CompiledRe)
when element(1, CompiledRe) =:= re_pattern ->
try re:run(Subject, CompiledRe, [{capture, none}]) of
match ->
true;
_ ->
false
catch error:badarg ->
%% This branch is hit if Subject is not a UTF-8 string.
undefined
end.
get_field_value(priority, Msg) ->
case mc:priority(Msg) of
undefined ->
?DEFAULT_MSG_PRIORITY;
P ->
P
end;
get_field_value(creation_time, Msg) ->
mc:timestamp(Msg);
get_field_value(Name, Msg) ->
case mc:property(Name, Msg) of
{_Type, Val} ->
Val;
undefined ->
undefined
end.
check_descriptor({symbol, ?DESCRIPTOR_NAME_SELECTOR_FILTER}) ->
ok;
check_descriptor({ulong, ?DESCRIPTOR_CODE_SELECTOR_FILTER}) ->
ok;
check_descriptor(_) ->
error.
jms_selector_to_list(JmsSelector) ->
case unicode:characters_to_list(JmsSelector) of
String when is_list(String) ->
{ok, String};
Error ->
rabbit_log:warning("JMS message selector ~p is not UTF-8 encoded: ~p",
[JmsSelector, Error]),
error
end.
check_length(String)
when length(String) > ?MAX_EXPRESSION_LENGTH ->
rabbit_log:warning("JMS message selector length ~b exceeds maximum length ~b",
[length(String), ?MAX_EXPRESSION_LENGTH]),
error;
check_length(_) ->
ok.
tokenize(String, JmsSelector) ->
case rabbit_jms_selector_lexer:string(String) of
{ok, Tokens, _EndLocation} ->
{ok, Tokens};
{error, {_Line, _Mod, ErrDescriptor}, _Location} ->
rabbit_log:warning("failed to scan JMS message selector '~ts': ~tp",
[JmsSelector, ErrDescriptor]),
error
end.
check_token_count(Tokens, JmsSelector)
when length(Tokens) > ?MAX_TOKENS ->
rabbit_log:warning("JMS message selector '~ts' with ~b tokens exceeds token limit ~b",
[JmsSelector, length(Tokens), ?MAX_TOKENS]),
error;
check_token_count(_, _) ->
ok.
parse(Tokens, JmsSelector) ->
case rabbit_jms_selector_parser:parse(Tokens) of
{error, Reason} ->
rabbit_log:warning("failed to parse JMS message selector '~ts': ~p",
[JmsSelector, Reason]),
error;
Ok ->
Ok
end.
transform_ast(Ast0, JmsSelector) ->
try rabbit_jms_ast:map(
fun({identifier, Ident})
when is_binary(Ident) ->
{identifier, rabbit_amqp_util:section_field_name_to_atom(Ident)};
({'like', _Ident, _Pattern, _Escape} = Node) ->
transform_pattern_node(Node);
(Node) ->
Node
end, Ast0) of
Ast ->
{ok, Ast}
catch {unsupported_field, Name} ->
rabbit_log:warning(
"identifier ~ts in JMS message selector ~tp is unsupported",
[Name, JmsSelector]),
error;
{invalid_pattern, Reason} ->
rabbit_log:warning(
"failed to parse LIKE pattern for JMS message selector ~tp: ~tp",
[JmsSelector, Reason]),
error
end.
has_binary_identifier(Ast) ->
rabbit_jms_ast:search(fun({identifier, Val}) ->
is_binary(Val);
(_Node) ->
false
end, Ast).
%% If the Pattern contains no wildcard or a single % wildcard,
%% we will optimise message evaluation by using Erlang pattern matching.
%% Otherwise, we will match with a regex. Even though we compile regexes,
%% they are slower compared to Erlang pattern matching.
transform_pattern_node({Op, Ident, Pattern, Escape}) ->
Pat = transform_pattern(Pattern, Escape),
{Op, Ident, {pattern, Pat}}.
transform_pattern(Pattern, Escape) ->
case scan_wildcards(Pattern, Escape) of
{none, Chars} ->
{exact, unicode:characters_to_binary(Chars)};
{single_percent, Chars, PercentPos} ->
single_percent(Chars, PercentPos);
regex ->
Re = jms_pattern_to_regex(Pattern, Escape, []),
case re:compile("^" ++ Re ++ "$", [unicode]) of
{ok, CompiledRe} ->
CompiledRe;
{error, Reason} ->
throw({invalid_pattern, Reason})
end
end.
scan_wildcards(Pattern, Escape) ->
scan_wildcards_1(Pattern, Escape, [], -1).
scan_wildcards_1([], _, Acc, -1) ->
{none, lists:reverse(Acc)};
scan_wildcards_1([], _, Acc, PctPos) ->
{single_percent, lists:reverse(Acc), PctPos};
scan_wildcards_1([EscapeChar | Rest], EscapeChar, Acc, PctPos) ->
case Rest of
[] ->
throw({invalid_pattern, invalid_escape_at_end});
[NextChar | Rest1] ->
scan_wildcards_1(Rest1, EscapeChar, [check_char(NextChar) | Acc], PctPos)
end;
scan_wildcards_1([$_ | _Rest], _, _, _) ->
regex;
scan_wildcards_1([$% | Rest], Escape, Acc, -1) ->
%% This is the 1st % character.
Pos = length(Acc),
scan_wildcards_1(Rest, Escape, Acc, Pos);
scan_wildcards_1([$% | _], _, _, _) ->
%% This is the 2nd % character.
regex;
scan_wildcards_1([Char | Rest], Escape, Acc, PctPos) ->
scan_wildcards_1(Rest, Escape, [check_char(Char) | Acc], PctPos).
single_percent(Chars, 0) ->
%% % at start - suffix match
Bin = unicode:characters_to_binary(Chars),
{suffix, byte_size(Bin), Bin};
single_percent(Chars, Pos) when length(Chars) =:= Pos ->
%% % at end - prefix match
Bin = unicode:characters_to_binary(Chars),
{prefix, byte_size(Bin), Bin};
single_percent(Chars, Pos) ->
%% % in middle - prefix and suffix match
{Prefix, Suffix} = lists:split(Pos, Chars),
PrefixBin = unicode:characters_to_binary(Prefix),
SuffixBin = unicode:characters_to_binary(Suffix),
{{prefix, byte_size(PrefixBin), PrefixBin},
{suffix, byte_size(SuffixBin), SuffixBin}}.
jms_pattern_to_regex([], _Escape, Acc) ->
lists:reverse(Acc);
jms_pattern_to_regex([EscapeChar | Rest], EscapeChar, Acc) ->
case Rest of
[] ->
throw({invalid_pattern, invalid_escape_at_end});
[NextChar | Rest1] ->
jms_pattern_to_regex(Rest1, EscapeChar, escape_regex_char(NextChar) ++ Acc)
end;
jms_pattern_to_regex([$% | Rest], Escape, Acc) ->
%% % matches any sequence of characters (0 or more)
jms_pattern_to_regex(Rest, Escape, [$*, $. | Acc]);
jms_pattern_to_regex([$_ | Rest], Escape, Acc) ->
%% _ matches exactly one character
jms_pattern_to_regex(Rest, Escape, [$. | Acc]);
jms_pattern_to_regex([Char | Rest], Escape, Acc) ->
jms_pattern_to_regex(Rest, Escape, escape_regex_char(Char) ++ Acc).
%% Escape user provided characters that have special meaning in Erlang regex.
escape_regex_char(Char0) ->
Char = check_char(Char0),
case lists:member(Char, ".\\|()[]{}^$*+?#") of
true ->
[Char, $\\];
false ->
[Char]
end.
%% Let's disallow control characters in the user provided pattern.
check_char(C) when ?IS_CONTROL_CHAR(C) ->
throw({invalid_pattern, {prohibited_control_character, C}});
check_char(C) ->
C.

View File

@ -5,13 +5,14 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% §4: Property Filter Expressions
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-module(rabbit_amqp_filtex).
-module(rabbit_amqp_filter_prop).
-include_lib("amqp10_common/include/amqp10_filtex.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-export([validate/1,
filter/2]).
-export([parse/1,
eval/2]).
%% "Impose a limit on the complexity of each filter expression."
%% [filtex-v1.0-wd09 7.1]
@ -20,38 +21,38 @@
-type simple_type() :: number() | binary() | atom().
-type affix() :: {suffix, non_neg_integer(), binary()} |
{prefix, non_neg_integer(), binary()}.
-type filter_expression_value() :: simple_type() | affix().
-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}, ...]} |
{application_properties, [{binary(), filter_expression_value()}, ...]}.
-type filter_expressions() :: [filter_expression()].
-export_type([filter_expressions/0]).
-type parsed_expression_value() :: simple_type() | affix().
-type parsed_expression() :: {properties, [{FieldName :: atom(), parsed_expression_value()}, ...]} |
{application_properties, [{binary(), parsed_expression_value()}, ...]}.
-type parsed_expressions() :: [parsed_expression()].
-export_type([parsed_expressions/0]).
-spec validate(tuple()) ->
{ok, filter_expression()} | error.
validate({described, Descriptor, {map, KVList}})
-spec parse(tuple()) ->
{ok, parsed_expression()} | error.
parse({described, Descriptor, {map, KVList}})
when KVList =/= [] andalso
length(KVList) =< ?MAX_FILTER_FIELDS ->
try validate0(Descriptor, KVList)
try parse0(Descriptor, KVList)
catch throw:{?MODULE, _, _} ->
error
end;
validate(_) ->
parse(_) ->
error.
-spec filter(filter_expressions(), mc:state()) ->
-spec eval(parsed_expressions(), mc:state()) ->
boolean().
filter(Filters, Mc) ->
eval(Filters, Mc) ->
%% "A message will pass through a filter-set if and only if
%% it passes through each of the named filters." [3.5.8]
lists:all(fun(Filter) ->
filter0(Filter, Mc)
filter(Filter, Mc)
end, Filters).
%%%%%%%%%%%%%%%%
%%% Internal %%%
%%%%%%%%%%%%%%%%
filter0({properties, KVList}, Mc) ->
filter({properties, KVList}, Mc) ->
%% "The filter evaluates to true if all properties enclosed in the filter expression
%% match the respective properties in the message."
%% [filtex-v1.0-wd09 4.2.4]
@ -60,7 +61,7 @@ filter0({properties, KVList}, Mc) ->
Val = unwrap(TaggedVal),
match_simple_type(RefVal, Val)
end, KVList);
filter0({application_properties, KVList}, Mc) ->
filter({application_properties, KVList}, Mc) ->
AppProps = mc:routing_headers(Mc, []),
%% "The filter evaluates to true if all properties enclosed in the filter expression
%% match the respective entries in the application-properties section in the message."
@ -113,56 +114,56 @@ match_simple_type(RefVal, Val) ->
%% and the values are equal when treated as a floating-point"
RefVal == Val.
validate0(Descriptor, KVList) when
parse0(Descriptor, KVList) when
Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER} ->
validate_props(KVList, []);
validate0(Descriptor, KVList) when
parse_props(KVList, []);
parse0(Descriptor, KVList) when
Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER} ->
validate_app_props(KVList, []);
validate0(_, _) ->
parse_app_props(KVList, []);
parse0(_, _) ->
error.
validate_props([], Acc) ->
parse_props([], Acc) ->
{ok, {properties, lists:reverse(Acc)}};
validate_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) ->
parse_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) ->
case parse_message_id(TaggedVal) of
{ok, Val} ->
validate_props(Rest, [{message_id, Val} | Acc]);
parse_props(Rest, [{message_id, Val} | Acc]);
error ->
error
end;
validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) ->
validate_props(Rest, [{user_id, Val} | Acc]);
validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) ->
parse_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) ->
parse_props(Rest, [{user_id, Val} | Acc]);
parse_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) ->
parse_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]);
parse_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) ->
parse_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]);
parse_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) ->
parse_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]);
parse_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) ->
case parse_message_id(TaggedVal) of
{ok, Val} ->
validate_props(Rest, [{correlation_id, Val} | Acc]);
parse_props(Rest, [{correlation_id, Val} | Acc]);
error ->
error
end;
validate_props([{{symbol, <<"content-type">>}, {symbol, Val}} | Rest], Acc) ->
validate_props(Rest, [{content_type, Val} | Acc]);
validate_props([{{symbol, <<"content-encoding">>}, {symbol, Val}} | Rest], Acc) ->
validate_props(Rest, [{content_encoding, Val} | Acc]);
validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest], Acc) ->
validate_props(Rest, [{absolute_expiry_time, Val} | Acc]);
validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) ->
validate_props(Rest, [{creation_time, Val} | Acc]);
validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_sequence, Val} | Acc]);
validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]);
validate_props(_, _) ->
parse_props([{{symbol, <<"content-type">>}, {symbol, Val}} | Rest], Acc) ->
parse_props(Rest, [{content_type, Val} | Acc]);
parse_props([{{symbol, <<"content-encoding">>}, {symbol, Val}} | Rest], Acc) ->
parse_props(Rest, [{content_encoding, Val} | Acc]);
parse_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest], Acc) ->
parse_props(Rest, [{absolute_expiry_time, Val} | Acc]);
parse_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) ->
parse_props(Rest, [{creation_time, Val} | Acc]);
parse_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) ->
parse_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]);
parse_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) ->
parse_props(Rest, [{group_sequence, Val} | Acc]);
parse_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) ->
parse_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]);
parse_props(_, _) ->
error.
parse_message_id({ulong, Val}) ->
@ -176,13 +177,13 @@ parse_message_id({utf8, Val}) ->
parse_message_id(_) ->
error.
validate_app_props([], Acc) ->
parse_app_props([], Acc) ->
{ok, {application_properties, lists:reverse(Acc)}};
validate_app_props([{{utf8, Key}, {utf8, String}} | Rest], Acc) ->
validate_app_props(Rest, [{Key, parse_string_modifier_prefix(String)} | Acc]);
validate_app_props([{{utf8, Key}, TaggedVal} | Rest], Acc) ->
validate_app_props(Rest, [{Key, unwrap(TaggedVal)} | Acc]);
validate_app_props(_, _) ->
parse_app_props([{{utf8, Key}, {utf8, String}} | Rest], Acc) ->
parse_app_props(Rest, [{Key, parse_string_modifier_prefix(String)} | Acc]);
parse_app_props([{{utf8, Key}, TaggedVal} | Rest], Acc) ->
parse_app_props(Rest, [{Key, unwrap(TaggedVal)} | Acc]);
parse_app_props(_, _) ->
error.
%% [filtex-v1.0-wd09 4.1.1]

View File

@ -14,6 +14,7 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-include("rabbit_amqp.hrl").
-include("mc.hrl").
@ -3187,10 +3188,10 @@ parse_attach_properties({map, KVList}) ->
end.
parse_filter(undefined) ->
{undefined, [], []};
{undefined, undefined, []};
parse_filter({map, DesiredKVList}) ->
{EffectiveKVList, ConsusumerFilter, ConsumerArgs} =
lists:foldr(fun parse_filters/2, {[], [], []}, DesiredKVList),
lists:foldr(fun parse_filters/2, {[], undefined, []}, DesiredKVList),
{{map, EffectiveKVList}, ConsusumerFilter, ConsumerArgs}.
parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-offset-spec">>}, Value}},
@ -3200,7 +3201,9 @@ parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-
%% 0.9.1 uses second based timestamps
Arg = {<<"x-stream-offset">>, timestamp, Ts div 1000},
{[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]};
{utf8, Spec} ->
{Type, Spec}
when Type =:= utf8 orelse
Type =:= symbol ->
%% next, last, first and "10m" etc
Arg = {<<"x-stream-offset">>, longstr, Spec},
{[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]};
@ -3242,19 +3245,44 @@ parse_filters({Symbol = {symbol, <<"rabbitmq:stream-", _/binary>>}, Value}, Acc)
false ->
Acc
end;
parse_filters(Filter = {{symbol, ?FILTER_NAME_SQL}, Value},
Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) ->
case ConsumerFilter of
undefined ->
case rabbit_amqp_filter_jms:parse(Value) of
{ok, ParsedSql} ->
{[Filter | EffectiveFilters], {jms, ParsedSql}, ConsumerArgs};
error ->
Acc
end;
_ ->
%% SQL filter expression is mutually exclusive with AMQP property filter expression.
Acc
end;
parse_filters(Filter = {{symbol, _Key}, Value},
Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) ->
case rabbit_amqp_filtex:validate(Value) of
{ok, FilterExpression = {FilterType, _}} ->
case proplists:is_defined(FilterType, ConsumerFilter) of
case rabbit_amqp_filter_prop:parse(Value) of
{ok, ParsedExpression = {Section, _}} ->
case ConsumerFilter of
undefined ->
{[Filter | EffectiveFilters],
{property, [ParsedExpression]},
ConsumerArgs};
{property, ParsedExpressions} ->
case proplists:is_defined(Section, ParsedExpressions) of
true ->
%% For now, let's prohibit multiple top level filters of the same type
%% (properties or application-properties). There should be no use case.
%% In future, we can allow multiple times the same top level grouping
%% filter expression type (all/any/not).
%% Let's prohibit multiple top level filters of the
%% same section (properties or application-properties).
Acc;
false ->
{[Filter | EffectiveFilters], [FilterExpression | ConsumerFilter], ConsumerArgs}
{[Filter | EffectiveFilters],
{property, [ParsedExpression | ParsedExpressions]},
ConsumerArgs}
end;
{jms, _} ->
%% SQL filter expression is mutually exclusive with
%% AMQP property filter expressions.
Acc
end;
error ->
Acc

View File

@ -8,16 +8,38 @@
-module(rabbit_amqp_util).
-include("rabbit_amqp.hrl").
-export([protocol_error/3,
capabilities/1]).
-export([section_field_name_to_atom/1,
capabilities/1,
protocol_error/3]).
-spec protocol_error(term(), io:format(), [term()]) ->
no_return().
protocol_error(Condition, Msg, Args) ->
Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))),
Reason = #'v1_0.error'{condition = Condition,
description = {utf8, Description}},
exit(Reason).
-type header_field_name() :: priority.
-type properties_field_name() :: message_id | user_id | to | subject | reply_to |
correlation_id | content_type | content_encoding |
absolute_expiry_time | creation_time | group_id |
group_sequence | reply_to_group_id.
-type field_name() :: header_field_name() | properties_field_name().
-export_type([field_name/0]).
-spec section_field_name_to_atom(binary()) -> field_name() | binary().
section_field_name_to_atom(<<"header.priority">>) -> priority;
%% ttl, first-acquirer, and delivery-count are unsupported
%% because setting a JMS message selector on these fields is invalid.
section_field_name_to_atom(<<"header.", _/binary>> = Bin) -> throw({unsupported_field, Bin});
section_field_name_to_atom(<<"properties.message-id">>) -> message_id;
section_field_name_to_atom(<<"properties.user-id">>) -> user_id;
section_field_name_to_atom(<<"properties.to">>) -> to;
section_field_name_to_atom(<<"properties.subject">>) -> subject;
section_field_name_to_atom(<<"properties.reply-to">>) -> reply_to;
section_field_name_to_atom(<<"properties.correlation-id">>) -> correlation_id;
section_field_name_to_atom(<<"properties.content-type">>) -> content_type;
section_field_name_to_atom(<<"properties.content-encoding">>) -> content_encoding;
section_field_name_to_atom(<<"properties.absolute-expiry-time">>) -> absolute_expiry_time;
section_field_name_to_atom(<<"properties.creation-time">>) -> creation_time;
section_field_name_to_atom(<<"properties.group-id">>) -> group_id;
section_field_name_to_atom(<<"properties.group-sequence">>) -> group_sequence;
section_field_name_to_atom(<<"properties.reply-to-group-id">>) -> reply_to_group_id;
section_field_name_to_atom(<<"properties.", _/binary>> = Bin) -> throw({unsupported_field, Bin});
section_field_name_to_atom(Other) -> Other.
-spec capabilities([binary()]) ->
undefined | {array, symbol, [{symbol, binary()}]}.
@ -26,3 +48,11 @@ capabilities([]) ->
capabilities(Capabilities) ->
Caps = [{symbol, C} || C <- Capabilities],
{array, symbol, Caps}.
-spec protocol_error(term(), io:format(), [term()]) ->
no_return().
protocol_error(Condition, Msg, Args) ->
Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))),
Reason = #'v1_0.error'{condition = Condition,
description = {utf8, Description}},
exit(Reason).

110
deps/rabbit/src/rabbit_jms_ast.erl vendored Normal file
View File

@ -0,0 +1,110 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% Helper functions operating on the Abstract Syntax Tree (AST)
%% as returned by rabbit_jms_selector_parser:parse/1
-module(rabbit_jms_ast).
-export([search/2,
map/2]).
-type ast() :: tuple().
-export_type([ast/0]).
-spec search(fun((term()) -> boolean()), ast()) -> boolean().
search(Pred, Node) ->
case Pred(Node) of
true ->
true;
false ->
case Node of
{Op, Arg} when is_atom(Op) ->
search(Pred, Arg);
{Op, Arg1, Arg2} when is_atom(Op) ->
search(Pred, Arg1) orelse
search(Pred, Arg2);
{Op, Arg1, Arg2, Arg3} when is_atom(Op) ->
search(Pred, Arg1) orelse
search(Pred, Arg2) orelse
search(Pred, Arg3);
_Other ->
false
end
end.
-spec map(fun((tuple()) -> tuple()), ast()) ->
ast().
map(Fun, Ast) when is_function(Fun, 1) ->
map_1(Ast, Fun).
map_1(Pattern, _Fun) when element(1, Pattern) =:= pattern ->
Pattern;
map_1(Node, Fun) when is_atom(element(1, Node)) ->
map_2(Fun(Node), Fun);
map_1(Other, _Fun) ->
Other.
map_2({Op, Arg1}, Fun) ->
{Op, map_1(Arg1, Fun)};
map_2({Op, Arg1, Arg2}, Fun) ->
{Op, map_1(Arg1, Fun), map_1(Arg2, Fun)};
map_2({Op, Arg1, Arg2, Arg3}, Fun) ->
{Op, map_1(Arg1, Fun), map_1(Arg2, Fun), map_1(Arg3, Fun)}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
has_binary_identifier_test() ->
false = has_binary_identifier("TRUE"),
true = has_binary_identifier("user_key_1 <> 'fake'"),
false = has_binary_identifier("properties.subject = 'fake'"),
false = has_binary_identifier("NOT properties.group-id = 'test'"),
false = has_binary_identifier("properties.group-sequence IS NULL"),
false = has_binary_identifier("properties.group-sequence IS NOT NULL"),
true = has_binary_identifier("NOT user_key = 'test'"),
true = has_binary_identifier("custom_field IS NULL"),
false = has_binary_identifier("properties.group-id = 'g1' AND header.priority > 5"),
false = has_binary_identifier("properties.group-sequence * 10 < 100"),
false = has_binary_identifier("properties.creation-time >= 12345 OR properties.subject = 'test'"),
true = has_binary_identifier("user_key = 'g1' AND header.priority > 5"),
true = has_binary_identifier("header.priority > 5 and user_key = 'g1'"),
true = has_binary_identifier("custom_metric * 10 < 100"),
true = has_binary_identifier("properties.creation-time >= 12345 OR user_data = 'test'"),
false = has_binary_identifier("properties.group-sequence BETWEEN 1 AND 10"),
true = has_binary_identifier("user_score BETWEEN 1 AND 10"),
false = has_binary_identifier("properties.group-id LIKE 'group_%' ESCAPE '!'"),
true = has_binary_identifier("user_tag LIKE 'group_%' ESCAPE '!'"),
false = has_binary_identifier("properties.group-id IN ('g1', 'g2', 'g3')"),
true = has_binary_identifier("user_category IN ('g1', 'g2', 'g3')"),
false = has_binary_identifier(
"(properties.group-sequence + 1) * 2 <= 100 AND " ++
"(properties.group-id LIKE 'prod_%' OR header.priority BETWEEN 5 AND 10)"),
true = has_binary_identifier(
"(properties.group-sequence + 1) * 2 <= 100 AND " ++
"(user_value LIKE 'prod_%' OR properties.absolute-expiry-time BETWEEN 5 AND 10)"),
ok.
has_binary_identifier(Selector) ->
{ok, Tokens, _EndLocation} = rabbit_jms_selector_lexer:string(Selector),
{ok, Ast0} = rabbit_jms_selector_parser:parse(Tokens),
Ast = map(fun({identifier, Ident}) when is_binary(Ident) ->
{identifier, rabbit_amqp_util:section_field_name_to_atom(Ident)};
(Node) ->
Node
end, Ast0),
search(fun({identifier, Val}) ->
is_binary(Val);
(_Node) ->
false
end, Ast).
-endif.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,102 @@
%%% This is the definitions file for JMS message selectors:
%%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector
%%%
%%% To manually generate the scanner file rabbit_jms_selector_lexer.erl run:
%%% leex:file("rabbit_jms_selector_lexer.xrl", [deterministic]).
Definitions.
WHITESPACE = [\s\t\f\n\r]
DIGIT = [0-9]
INT = {DIGIT}+
% Approximate numeric literal with a decimal
FLOAT = ({DIGIT}+\.{DIGIT}*|\.{DIGIT}+)([eE][\+\-]?{INT})?
% Approximate numeric literal in scientific notation without a decimal
EXPONENT = {DIGIT}+[eE][\+\-]?{DIGIT}+
% We extend the allowed JMS identifier syntax with '.' and '-' even though
% these two characters return false for Character.isJavaIdentifierPart()
% to allow identifiers such as properties.group-id
IDENTIFIER = [a-zA-Z_$][a-zA-Z0-9_$.\-]*
STRING = '([^']|'')*'
Rules.
{WHITESPACE}+ : skip_token.
% Logical operators (case insensitive)
[aA][nN][dD] : {token, {'AND', TokenLine}}.
[oO][rR] : {token, {'OR', TokenLine}}.
[nN][oO][tT] : {token, {'NOT', TokenLine}}.
% Special operators (case insensitive)
[bB][eE][tT][wW][eE][eE][nN] : {token, {'BETWEEN', TokenLine}}.
[lL][iI][kK][eE] : {token, {'LIKE', TokenLine}}.
[iI][nN] : {token, {'IN', TokenLine}}.
[iI][sS] : {token, {'IS', TokenLine}}.
[nN][uU][lL][lL] : {token, {'NULL', TokenLine}}.
[eE][sS][cC][aA][pP][eE] : {token, {'ESCAPE', TokenLine}}.
% Boolean literals (case insensitive)
[tT][rR][uU][eE] : {token, {boolean, TokenLine, true}}.
[fF][aA][lL][sS][eE] : {token, {boolean, TokenLine, false}}.
% Comparison operators
= : {token, {'=', TokenLine}}.
<> : {token, {'<>', TokenLine}}.
>= : {token, {'>=', TokenLine}}.
<= : {token, {'<=', TokenLine}}.
> : {token, {'>', TokenLine}}.
< : {token, {'<', TokenLine}}.
% Arithmetic operators
\+ : {token, {'+', TokenLine}}.
- : {token, {'-', TokenLine}}.
\* : {token, {'*', TokenLine}}.
/ : {token, {'/', TokenLine}}.
% Parentheses and comma
\( : {token, {'(', TokenLine}}.
\) : {token, {')', TokenLine}}.
, : {token, {',', TokenLine}}.
% Literals
{INT} : {token, {integer, TokenLine, list_to_integer(TokenChars)}}.
{FLOAT} : {token, {float, TokenLine, list_to_float(to_float(TokenChars))}}.
{EXPONENT} : {token, {float, TokenLine, parse_scientific_notation(TokenChars)}}.
{STRING} : {token, {string, TokenLine, process_string(TokenChars)}}.
{IDENTIFIER} : {token, {identifier, TokenLine, unicode:characters_to_binary(TokenChars)}}.
% Catch any other characters as errors
. : {error, {illegal_character, TokenChars}}.
Erlang code.
%% "Approximate literals use the Java floating-point literal syntax."
to_float([$. | _] = Chars) ->
%% . Digits [ExponentPart]
"0" ++ Chars;
to_float(Chars) ->
%% Digits . [Digits] [ExponentPart]
case lists:last(Chars) of
$. ->
Chars ++ "0";
_ ->
Chars1 = string:lowercase(Chars),
Chars2 = string:replace(Chars1, ".e", ".0e"),
lists:flatten(Chars2)
end.
parse_scientific_notation(Chars) ->
Str = string:lowercase(Chars),
{Before, After0} = lists:splitwith(fun(C) -> C =/= $e end, Str),
[$e | After] = After0,
Base = list_to_integer(Before),
Exp = list_to_integer(After),
Base * math:pow(10, Exp).
process_string(Chars) ->
%% remove surrounding quotes
Chars1 = lists:sublist(Chars, 2, length(Chars) - 2),
Bin = unicode:characters_to_binary(Chars1),
process_escaped_quotes(Bin).
process_escaped_quotes(Binary) ->
binary:replace(Binary, <<"''">>, <<"'">>, [global]).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,140 @@
%%% This is the grammar file for JMS message selectors:
%%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector
%%%
%%% To manually generate the parser file rabbit_jms_selector_parser.erl run:
%%% yecc:file("rabbit_jms_selector_parser.yrl", [deterministic]).
Nonterminals
selector
conditional_expr
comparison_expr
logical_expr
additive_expr
multiplicative_expr
unary_expr
primary
literal
identifier_expr
string_list
string_item
between_expr
in_expr
like_expr
is_null_expr.
Terminals
integer float boolean string identifier
'=' '<>' '>' '<' '>=' '<='
'+' '-' '*' '/'
'AND' 'OR' 'NOT'
'BETWEEN' 'LIKE' 'IN' 'IS' 'NULL' 'ESCAPE'
'(' ')' ','.
Rootsymbol selector.
%% operator precedences (lowest to highest)
Left 100 'OR'.
Left 200 'AND'.
Nonassoc 300 '=' '<>' '>' '<' '>=' '<='.
Left 400 '+' '-'.
Left 500 '*' '/'.
Unary 600 'NOT'.
%% "A selector is a conditional expression"
selector -> conditional_expr : '$1'.
%% Conditional expressions
conditional_expr -> logical_expr : '$1'.
%% Logical expressions
logical_expr -> logical_expr 'AND' logical_expr : {'and', '$1', '$3'}.
logical_expr -> logical_expr 'OR' logical_expr : {'or', '$1', '$3'}.
logical_expr -> 'NOT' logical_expr : {'not', '$2'}.
logical_expr -> comparison_expr : '$1'.
%% Comparison expressions
comparison_expr -> additive_expr '=' additive_expr : {'=', '$1', '$3'}.
comparison_expr -> additive_expr '<>' additive_expr : {'<>', '$1', '$3'}.
comparison_expr -> additive_expr '>' additive_expr : {'>', '$1', '$3'}.
comparison_expr -> additive_expr '<' additive_expr : {'<', '$1', '$3'}.
comparison_expr -> additive_expr '>=' additive_expr : {'>=', '$1', '$3'}.
comparison_expr -> additive_expr '<=' additive_expr : {'<=', '$1', '$3'}.
comparison_expr -> between_expr : '$1'.
comparison_expr -> like_expr : '$1'.
comparison_expr -> in_expr : '$1'.
comparison_expr -> is_null_expr : '$1'.
comparison_expr -> additive_expr : '$1'.
%% BETWEEN expression
between_expr -> additive_expr 'BETWEEN' additive_expr 'AND' additive_expr : {'between', '$1', '$3', '$5'}.
between_expr -> additive_expr 'NOT' 'BETWEEN' additive_expr 'AND' additive_expr : {'not', {'between', '$1', '$4', '$6'}}.
%% LIKE expression
like_expr -> additive_expr 'LIKE' string :
{'like', '$1', process_like_pattern('$3'), no_escape}.
like_expr -> additive_expr 'LIKE' string 'ESCAPE' string :
{'like', '$1', process_like_pattern('$3'), process_escape_char('$5')}.
like_expr -> additive_expr 'NOT' 'LIKE' string :
{'not', {'like', '$1', process_like_pattern('$4'), no_escape}}.
like_expr -> additive_expr 'NOT' 'LIKE' string 'ESCAPE' string :
{'not', {'like', '$1', process_like_pattern('$4'), process_escape_char('$6')}}.
%% IN expression
in_expr -> additive_expr 'IN' '(' string_list ')' : {'in', '$1', lists:uniq('$4')}.
in_expr -> additive_expr 'NOT' 'IN' '(' string_list ')' : {'not', {'in', '$1', lists:uniq('$5')}}.
string_list -> string_item : ['$1'].
string_list -> string_item ',' string_list : ['$1'|'$3'].
string_item -> string : extract_value('$1').
%% IS NULL expression
is_null_expr -> identifier_expr 'IS' 'NULL' : {'is_null', '$1'}.
is_null_expr -> identifier_expr 'IS' 'NOT' 'NULL' : {'not', {'is_null', '$1'}}.
%% Arithmetic expressions
additive_expr -> additive_expr '+' multiplicative_expr : {'+', '$1', '$3'}.
additive_expr -> additive_expr '-' multiplicative_expr : {'-', '$1', '$3'}.
additive_expr -> multiplicative_expr : '$1'.
multiplicative_expr -> multiplicative_expr '*' unary_expr : {'*', '$1', '$3'}.
multiplicative_expr -> multiplicative_expr '/' unary_expr : {'/', '$1', '$3'}.
multiplicative_expr -> unary_expr : '$1'.
%% Handle unary operators through grammar structure instead of precedence
unary_expr -> '+' primary : {unary_plus, '$2'}.
unary_expr -> '-' primary : {unary_minus, '$2'}.
unary_expr -> primary : '$1'.
%% Primary expressions
primary -> '(' conditional_expr ')' : '$2'.
primary -> literal : '$1'.
primary -> identifier_expr : '$1'.
%% Identifiers (header fields or property references)
identifier_expr -> identifier :
{identifier, extract_value('$1')}.
%% Literals
literal -> integer : {integer, extract_value('$1')}.
literal -> float : {float, extract_value('$1')}.
literal -> string : {string, extract_value('$1')}.
literal -> boolean : {boolean, extract_value('$1')}.
Erlang code.
extract_value({_Token, _Line, Value}) -> Value.
process_like_pattern({string, Line, Value}) ->
case unicode:characters_to_list(Value) of
L when is_list(L) ->
L;
_ ->
return_error(Line, "pattern-value in LIKE must be valid Unicode")
end.
process_escape_char({string, Line, Value}) ->
case unicode:characters_to_list(Value) of
[SingleChar] ->
SingleChar;
_ ->
return_error(Line, "ESCAPE must be a single-character string literal")
end.

View File

@ -133,7 +133,7 @@
consumer_tag := rabbit_types:ctag(),
exclusive_consume => boolean(),
args => rabbit_framing:amqp_table(),
filter => rabbit_amqp_filtex:filter_expressions(),
filter => rabbit_amqp_filter:expression(),
ok_msg := term(),
acting_user := rabbit_types:username()}.
-type cancel_reason() :: cancel | remove.

View File

@ -93,7 +93,7 @@
%% were part of an uncompressed sub batch, and are buffered in
%% reversed order until the consumer has more credits to consume them.
buffer_msgs_rev = [] :: [rabbit_amqqueue:qmsg()],
filter :: rabbit_amqp_filtex:filter_expressions(),
filter :: rabbit_amqp_filter:expression(),
reader_options :: map()}).
-record(stream_client, {stream_id :: string(),
@ -358,7 +358,7 @@ consume(Q, Spec, #stream_client{} = QState0)
%% begins sending
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
Filter = maps:get(filter, Spec, []),
Filter = maps:get(filter, Spec, undefined),
begin_stream(QState, ConsumerTag, OffsetSpec, Mode,
AckRequired, Filter, filter_spec(Args));
{error, Reason} ->
@ -1319,7 +1319,7 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName},
Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>,
?ANN_ROUTING_KEYS => [QName],
<<"x-stream-offset">> => Offset}),
case rabbit_amqp_filtex:filter(Filter, Mc) of
case rabbit_amqp_filter:eval(Filter, Mc) of
true ->
{Name, LocalPid, Offset, false, Mc};
false ->

View File

@ -5,12 +5,14 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% Test suite for
%% Test suite for §4 Property Filter Expressions of
%% AMQP Filter Expressions Version 1.0 Working Draft 09
-module(amqp_filtex_SUITE).
%% filtering from a stream.
-module(amqp_filter_prop_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_common/include/amqp10_filtex.hrl").
-include_lib("amqp10_client/include/amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-compile([nowarn_export_all,
@ -53,9 +55,7 @@ init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [{quorum_tick_interval, 1000},
{stream_tick_interval, 1000}
]}).
Config, {rabbit, [{stream_tick_interval, 1000}]}).
end_per_suite(Config) ->
Config.
@ -148,8 +148,10 @@ properties_section(Config) ->
{{symbol, <<"group-sequence">>}, {uint, 16#ff_ff_ff_ff}},
{{symbol, <<"reply-to-group-id">>}, {utf8, <<"other group ID">>}}
],
Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}},
Filter1 = #{<<"from start">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
value = {symbol, <<"first">>}},
<<"props">> => #filter{descriptor = ?DESCRIPTOR_NAME_PROPERTIES_FILTER,
value = {map, PropsFilter1}}},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),

View File

@ -0,0 +1,441 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% Test suite for SQL expressions filtering from a stream.
-module(amqp_filter_sql_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_client/include/amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-compile([nowarn_export_all,
export_all]).
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/1]).
-import(amqp_utils,
[init/1,
connection_config/1,
flush/1,
wait_for_credit/1,
wait_for_accepts/1,
send_messages/3,
detach_link_sync/1,
end_session_sync/1,
close_connection_sync/1]).
all() ->
[
{group, cluster_size_1}
].
groups() ->
[
{cluster_size_1, [shuffle],
[
multiple_sections,
filter_few_messages_from_many,
sql_and_bloom_filter,
invalid_filter
]}
].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [{stream_tick_interval, 1000}]}).
end_per_suite(Config) ->
Config.
init_per_group(_Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
%% Wait for sessions to terminate before starting the next test case.
eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
multiple_sections(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair, Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
Now = erlang:system_time(millisecond),
To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>),
ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_headers(
#{priority => 200},
amqp10_msg:set_properties(
#{message_id => {ulong, 999},
user_id => <<"guest">>,
to => To,
subject => <<"🐇"/utf8>>,
reply_to => ReplyTo,
correlation_id => <<"corr-123">>,
content_type => <<"text/plain">>,
content_encoding => <<"some encoding">>,
absolute_expiry_time => Now + 100_000,
creation_time => Now,
group_id => <<"my group ID">>,
group_sequence => 16#ff_ff_ff_ff,
reply_to_group_id => <<"other group ID">>},
amqp10_msg:set_application_properties(
#{<<"k1">> => -3,
<<"k2">> => false,
<<"k3">> => true,
<<"k4">> => <<"hey👋"/utf8>>},
amqp10_msg:new(<<"t2">>, <<"m2">>))))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:set_application_properties(
#{<<"k1">> => -4},
amqp10_msg:new(<<"t3">>, <<"m3">>)))),
ok = wait_for_accepts(3),
ok = detach_link_sync(Sender),
flush(sent),
Filter1 = filter(<<"k1 <= -3">>),
{ok, R1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),
ok = amqp10_client:flow_link_credit(R1, 10, never, true),
receive {amqp10_msg, R1, R1M2} ->
?assertEqual([<<"m2">>], amqp10_msg:body(R1M2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, R1, R1M3} ->
?assertEqual([<<"m3">>], amqp10_msg:body(R1M3))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = assert_credit_exhausted(R1, ?LINE),
ok = detach_link_sync(R1),
Filter2 = filter(
<<"header.priority = 200 AND "
"properties.message-id = 999 AND "
"properties.user-id = 'guest' AND "
"properties.to LIKE '/exch_nges/some=%20exchange/rout%' ESCAPE '=' AND "
"properties.subject = '🐇' AND "
"properties.reply-to LIKE '/queues/some%' AND "
"properties.correlation-id IN ('corr-345', 'corr-123') AND "
"properties.content-type = 'text/plain' AND "
"properties.content-encoding = 'some encoding' AND "
"properties.absolute-expiry-time > 0 AND "
"properties.creation-time > 0 AND "
"properties.group-id IS NOT NULL AND "
"properties.group-sequence = 4294967295 AND "
"properties.reply-to-group-id = 'other group ID' AND "
"k1 < 0 AND "
"NOT k2 AND "
"k3 AND "
"k4 NOT LIKE 'hey' AND "
"k5 IS NULL"
/utf8>>),
{ok, R2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
settled, configuration, Filter2),
ok = amqp10_client:flow_link_credit(R2, 10, never, true),
receive {amqp10_msg, R2, R2M2} ->
?assertEqual([<<"m2">>], amqp10_msg:body(R2M2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = assert_credit_exhausted(R2, ?LINE),
ok = detach_link_sync(R2),
Filter3 = filter(<<"absent IS NULL">>),
{ok, R3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
settled, configuration, Filter3),
ok = amqp10_client:flow_link_credit(R3, 10, never, true),
receive {amqp10_msg, R3, R3M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R3M1))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, R3, R3M2} ->
?assertEqual([<<"m2">>], amqp10_msg:body(R3M2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, R3, R3M3} ->
?assertEqual([<<"m3">>], amqp10_msg:body(R3M3))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = assert_credit_exhausted(R3, ?LINE),
ok = detach_link_sync(R3),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% Filter a small subset from many messages.
%% We test here that flow control still works correctly.
filter_few_messages_from_many(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
{Connection, Session, LinkPair} = init(Config),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair, Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:new(<<"t1">>, <<"first msg">>))),
ok = send_messages(Sender, 1000, false),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{group_id => <<"my group ID">>},
amqp10_msg:new(<<"t2">>, <<"last msg">>))),
ok = wait_for_accepts(1002),
ok = detach_link_sync(Sender),
flush(sent),
%% Our filter should cause us to receive only the first and
%% last message out of the 1002 messages in the stream.
Filter = filter(<<"properties.group-id is not null">>),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address,
unsettled, configuration, Filter),
ok = amqp10_client:flow_link_credit(Receiver, 2, never, true),
receive {amqp10_msg, Receiver, M1} ->
?assertEqual([<<"first msg">>], amqp10_msg:body(M1)),
ok = amqp10_client:accept_msg(Receiver, M1)
after 30000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver, M2} ->
?assertEqual([<<"last msg">>], amqp10_msg:body(M2)),
ok = amqp10_client:accept_msg(Receiver, M2)
after 30000 -> ct:fail({missing_msg, ?LINE})
end,
ok = assert_credit_exhausted(Receiver, ?LINE),
ok = detach_link_sync(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% Test that SQL and Bloom filters can be used together.
sql_and_bloom_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair, Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_message_annotations(
#{<<"x-stream-filter-value">> => <<"v1">>},
amqp10_msg:set_headers(
#{priority => 12},
amqp10_msg:set_properties(
#{subject => <<"v1">>},
amqp10_msg:new(<<"t1">>, <<"msg">>))))),
receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Sender),
flush(sent),
Filter = filter(<<"properties.subject = 'v1' AND header.priority > 10">>),
DesiredFilter = maps:put(<<"my bloom filter">>,
#filter{descriptor = <<"rabbitmq:stream-filter">>,
value = {utf8, <<"v1">>}},
Filter),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address,
unsettled, configuration, DesiredFilter),
receive {amqp10_event,
{link, Receiver,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter}}}}}} ->
DesiredFilterNames = lists:sort(maps:keys(DesiredFilter)),
ActualFilterNames = lists:sort([Name || {{symbol, Name}, _} <- ActualFilter]),
?assertEqual(DesiredFilterNames, ActualFilterNames)
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver, 1, never),
receive {amqp10_msg, Receiver, M1} ->
?assertEqual([<<"msg">>], amqp10_msg:body(M1)),
ok = amqp10_client:accept_msg(Receiver, M1)
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = detach_link_sync(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
invalid_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair, Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
%% Trigger a lexer error.
Filter1 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SELECTOR_FILTER,
value = {utf8, <<"@#$%^&">>}}},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
unsettled, configuration, Filter1),
receive {amqp10_event,
{link, Receiver1,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter1}}}}}} ->
%% RabbitMQ should exclude this filter in its reply attach frame because
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
?assertMatch([], ActualFilter1)
after 9000 ->
ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Receiver1),
%% Trigger a parser error. We use allowed tokens here, but the grammar is incorrect.
Filter2 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SELECTOR_FILTER,
value = {utf8, <<"FALSE FALSE">>}}},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
unsettled, configuration, Filter2),
receive {amqp10_event,
{link, Receiver2,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter2}}}}}} ->
?assertMatch([], ActualFilter2)
after 9000 ->
ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Receiver2),
%% SQL filtering should be mutually exclusive with AMQP property filtering
PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"some subject">>}}],
Filter3 = #{<<"prop name">> => #filter{descriptor = ?DESCRIPTOR_NAME_PROPERTIES_FILTER,
value = {map, PropsFilter}},
?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SELECTOR_FILTER,
value = {utf8, <<"TRUE">>}}},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
unsettled, configuration, Filter3),
receive {amqp10_event,
{link, Receiver3,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter3}}}}}} ->
%% We expect only one of the two filters to be actually in place.
?assertMatch([_], ActualFilter3)
after 9000 ->
ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Receiver3),
%% Send invalid UTF-8 in the SQL expression.
InvalidUTF8 = <<255>>,
Filter4 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SELECTOR_FILTER,
value = {utf8, InvalidUTF8}}},
{ok, Receiver4} = amqp10_client:attach_receiver_link(
Session, <<"receiver 4">>, Address,
unsettled, configuration, Filter4),
receive {amqp10_event,
{link, Receiver4,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter4}}}}}} ->
?assertMatch([], ActualFilter4)
after 9000 ->
ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Receiver4),
%% Send invalid descriptor
Filter5 = #{?FILTER_NAME_SQL => #filter{descriptor = <<"apache.org:invalid:string">>,
value = {utf8, <<"TRUE">>}}},
{ok, Receiver5} = amqp10_client:attach_receiver_link(
Session, <<"receiver 5">>, Address,
unsettled, configuration, Filter5),
receive {amqp10_event,
{link, Receiver5,
{attached, #'v1_0.attach'{
source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} ->
?assertMatch([], ActualFilter5)
after 9000 ->
ct:fail({missing_event, ?LINE})
end,
ok = detach_link_sync(Receiver5),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(Connection).
filter(String)
when is_binary(String) ->
#{<<"from start">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
value = {symbol, <<"first">>}},
?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_NAME_SELECTOR_FILTER,
value = {utf8, String}}}.
assert_credit_exhausted(Receiver, Line) ->
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 9000 -> ct:fail({missing_credit_exhausted, Line})
end.

892
deps/rabbit/test/amqp_jms_unit_SUITE.erl vendored Normal file
View File

@ -0,0 +1,892 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(amqp_jms_unit_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
all() ->
[
{group, tests}
].
groups() ->
[{tests, [shuffle],
[
logical_operators,
comparison_operators,
arithmetic_operators,
string_comparison,
like_operator,
in_operator,
between_operator,
null_handling,
literals,
scientific_notation,
precedence_and_parentheses,
type_handling,
complex_expressions,
case_sensitivity,
whitespace_handling,
identifier_rules,
header_section,
properties_section,
multiple_sections,
parse_errors
]
}].
%%%===================================================================
%%% Test cases
%%%===================================================================
logical_operators(_Config) ->
%% Basic logical operators
true = match("country = 'UK' AND weight = 5", app_props()),
true = match("'UK' = country AND 5 = weight", app_props()),
true = match("country = 'France' OR weight < 6", app_props()),
true = match("NOT country = 'France'", app_props()),
false = match("country = 'UK' AND weight > 5", app_props()),
false = match("missing AND premium", app_props()),
false = match("active AND absent", app_props()),
false = match("NOT absent", app_props()),
false = match("premium OR absent", app_props()),
true = match("absent OR active", app_props()),
true = match("active OR absent", app_props()),
%% The JMS spec isn't very clear on whether the following should match.
%% Option 1:
%% The conditional expression is invalid because percentage
%% is an identifier returning an integer instead of a boolean.
%% Therefore, arguably the conditional expression is invalid and should not match.
%% Option 2:
%% This integer could be interpreted as UNKNOWN such that
%% "UNKNOWN OR TRUE" evalutes to TRUE as per table Table 35.
%% Qpid Broker-J and ActiveMQ Artemis implement option 2.
%% That's why we also expect option 2 here.
true = match("percentage OR active", app_props()),
true = match("active OR percentage", app_props()),
%% See tables 3-4 and 3-6 in
%% https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#null-values
%% NOT (Unknown AND True) = NOT Unknown = Unknown
false = match("NOT (absent IN ('v1', 'v2') AND active)", app_props()),
%% NOT (Unknown AND Unknown) = NOT Unknown = Unknown
false = match("NOT (absent IN ('v1', 'v2') AND absent LIKE 'v3')", app_props()),
%% NOT (Unknown AND False) = NOT False = True
true = match("NOT (absent IN ('v1', 'v2') AND premium)", app_props()),
%% NOT (True AND Unknown) = NOT Unknown = Unknown
false = match("NOT (active AND absent IN ('v1', 'v2'))", app_props()),
%% NOT (True AND False) = NOT False = True
true = match("NOT (active AND premium)", app_props()),
%% NOT (Unknown OR False) = NOT Unknown = Unknown
false = match("NOT (absent IN ('v1', 'v2') OR premium)", app_props()),
%% NOT (Unknown OR Unknown) = NOT Unknown = Unknown
false = match("NOT (absent IN ('v1', 'v2') OR absent LIKE 'v3')", app_props()),
%% NOT (Unknown OR True) = NOT True = False
false = match("NOT (absent IN ('v1', 'v2') OR active)", app_props()),
%% NOT (NOT (Unknown OR True)) = NOT (Not True) = Not False = True
true = match("NOT (NOT (absent IN ('v1', 'v2') OR active))", app_props()),
%% NOT (False Or Unknown) = NOT Unknown = Unknown
false = match("NOT (premium OR absent IN ('v1', 'v2'))", app_props()),
%% NOT (NOT (False Or Unknown)) = NOT (NOT Unknown) = Not Unknown = Unknown
false = match("NOT (NOT (premium OR absent IN ('v1', 'v2')))", app_props()),
%% Compound logical expressions
true = match("country = 'UK' AND (weight > 3 OR price < 20)", app_props()),
true = match("NOT (country = 'France' OR country = 'Germany')", app_props()),
false = match("country = 'UK' AND NOT active = TRUE", app_props()),
true = match("(country = 'US' OR country = 'UK') AND (weight > 2 AND weight < 10)", app_props()).
comparison_operators(_Config) ->
%% Equality
true = match("country = 'UK'", app_props()),
false = match("country = 'US'", app_props()),
%% Inequality
true = match("country <> 'US'", app_props()),
false = match("country <> 'UK'", app_props()),
%% Greater than
true = match("weight > 3", app_props()),
false = match("weight > 5", app_props()),
%% Less than
true = match("weight < 10", app_props()),
false = match("weight < 5", app_props()),
%% Greater than or equal
true = match("weight >= 5", app_props()),
true = match("weight >= 4", app_props()),
false = match("weight >= 6", app_props()),
%% "Only like type values can be compared. One exception is that it is
%% valid to compare exact numeric values and approximate numeric value"
true = match("weight >= 5.0", app_props()),
true = match("weight >= 4.99", app_props()),
false = match("weight >= 5.01", app_props()),
true = match("price >= 10.5", app_props()),
false = match("price >= 10.51", app_props()),
true = match("price >= 10.4", app_props()),
true = match("price >= 10", app_props()),
false = match("price >= 11", app_props()),
%% Less than or equal
true = match("weight <= 5", app_props()),
true = match("weight <= 6", app_props()),
false = match("weight <= 4", app_props()),
true = match("price <= 10.6", app_props()),
false = match("price <= 10", app_props()),
%% "String and Boolean comparison is restricted to = and <>."
%% "If the comparison of non-like type values is attempted, the value of the operation is false."
true = match("active = true", app_props()),
true = match("premium = false", app_props()),
false = match("premium <> false", app_props()),
false = match("premium >= 'false'", app_props()),
false = match("premium <= 'false'", app_props()),
false = match("premium >= 0", app_props()),
false = match("premium <= 0", app_props()),
false = match("country >= 'UK'", app_props()),
false = match("country > 'UA'", app_props()),
false = match("country >= 'UA'", app_props()),
false = match("country < 'UA'", app_props()),
false = match("country <= 'UA'", app_props()),
false = match("country < 'UL'", app_props()),
false = match("country < true", app_props()),
false = match("weight = '5'", app_props()),
false = match("weight >= '5'", app_props()),
false = match("weight <= '5'", app_props()),
false = match("country > 1", app_props()),
false = match("country < 1", app_props()).
arithmetic_operators(_Config) ->
%% Addition
true = match("weight + 5 = 10", app_props()),
true = match("price + 4.5 = 15", app_props()),
%% Subtraction
true = match("weight - 2 = 3", app_props()),
true = match("price - 0.5 = 10", app_props()),
%% Multiplication
true = match("weight * 2 = 10", app_props()),
true = match("quantity * price * discount = 262.5", app_props()),
%% Division
true = match("weight / 2 = 2.5", app_props()),
true = match("price / 2 = 5.25", app_props()),
true = match("quantity / 10 = 10", app_props()),
true = match("quantity / 10 = 10.000", app_props()),
%% Nested arithmetic
true = match("(weight + 5) * 2 = 20", app_props()),
true = match("price / (weight - 3) = 5.25", app_props()),
%% Unary operators
true = match("+temperature = -5", app_props()),
true = match("-temperature = 5", app_props()),
true = match("+weight = 5", app_props()),
true = match("-weight = -5", app_props()),
true = match("6 + -weight = 1", app_props()),
true = match("6 - +weight = 1", app_props()),
true = match("6 + +weight = 11", app_props()),
true = match("6 - -weight = 11", app_props()),
true = match("+(-weight) = -5", app_props()),
true = match("-(+weight) = -5", app_props()),
true = match("-(-weight) = 5", app_props()),
true = match("+(+weight) = 5", app_props()),
false = match("+weight", app_props()),
%% Unary operators followed by identifiers with non-numeric values are invalid
%% and should therefore not match.
false = match("+city", app_props()),
false = match("+city = 'London'", app_props()),
false = match("-absent", app_props()),
%% "Comparison or arithmetic with an unknown value always yields an unknown value."
false = match("absent + 4 = 5", app_props()),
false = match("2 * absent = 0", app_props()).
string_comparison(_Config) ->
%% "Two strings are equal if and only if they contain the same sequence of characters."
false = match("country = '🇬🇧'", app_props()),
true = match("country = '🇬🇧'", [{{utf8, <<"country">>}, {utf8, <<"🇬🇧"/utf8>>}}]),
%% "A string literal is enclosed in single quotes, with an included
%% single quote represented by doubled single quote"
true = match("'UK''s' = 'UK''s'", app_props()),
true = match("country = 'UK''s'", [{{utf8, <<"country">>}, {utf8, <<"UK's">>}}]),
true = match("country = '🇬🇧''s'", [{{utf8, <<"country">>}, {utf8, <<"🇬🇧's"/utf8>>}}]),
true = match("country = ''", [{{utf8, <<"country">>}, {utf8, <<>>}}]),
true = match("country = ''''", [{{utf8, <<"country">>}, {utf8, <<$'>>}}]).
like_operator(_Config) ->
%% Basic LIKE operations
true = match("description LIKE '%test%'", app_props()),
true = match("description LIKE 'This is a %'", app_props()),
true = match("description LIKE '%a test message'", app_props()),
true = match("description LIKE 'T_i% a %e_%e_sa%'", app_props()),
false = match("description LIKE 'T_i% a %e_%e_sa'", app_props()),
false = match("description LIKE 'is a test message'", app_props()),
true = match("country LIKE 'UK'", app_props()),
true = match("country LIKE 'U_'", app_props()),
true = match("country LIKE '_K'", app_props()),
true = match("country LIKE 'UK%'", app_props()),
true = match("country LIKE '%UK'", app_props()),
true = match("country LIKE 'U%K'", app_props()),
false = match("country LIKE 'US'", app_props()),
false = match("country LIKE '_UK'", app_props()),
false = match("country LIKE 'UK_'", app_props()),
false = match("country LIKE 'U_K'", app_props()),
false = match("city LIKE 'New%'", app_props()),
true = match("key LIKE 'a%a'", [{{utf8, <<"key">>}, {utf8, <<"aa">>}}]),
false = match("key LIKE 'a%a'", [{{utf8, <<"key">>}, {utf8, <<"a">>}}]),
%% identifier with empty string value
Empty = [{{utf8, <<"empty">>}, {utf8, <<"">>}}],
true = match("empty LIKE ''", Empty),
true = match("empty LIKE '%'", Empty),
true = match("empty LIKE '%%'", Empty),
true = match("empty LIKE '%%%'", Empty),
false = match("empty LIKE 'x'", Empty),
false = match("empty LIKE '%x'", Empty),
false = match("empty LIKE '%x%'", Empty),
false = match("empty LIKE '_'", Empty),
%% LIKE operations with UTF8
Utf8 = [{{utf8, <<"food">>}, {utf8, <<"car🥕rot"/utf8>>}}],
true = match("food LIKE 'car🥕rot'", Utf8),
true = match("food LIKE 'car_rot'", Utf8),
true = match("food LIKE '___🥕___'", Utf8),
true = match("food LIKE '%🥕%'", Utf8),
true = match("food LIKE '%_🥕_%'", Utf8),
true = match("food LIKE '_%🥕%_'", Utf8),
false = match("food LIKE 'car__rot'", Utf8),
false = match("food LIKE 'carrot'", Utf8),
false = match("food LIKE 'car🥕to'", Utf8),
false = match("invalid_utf8 LIKE '%a'", [{{utf8, <<"invalid_utf8">>}, {binary, <<0, 1, 128>>}}]),
false = match("invalid_utf8 LIKE '_a'", [{{utf8, <<"invalid_utf8">>}, {binary, <<255>>}}]),
true = match("key LIKE '_#.\\|()[]{} ^$*+?%'", [{{utf8, <<"key">>}, {utf8, <<"##.\\|()[]{} ^$*+???">>}}]),
true = match("key LIKE '##.\\|()[]{} ^$*+???'", [{{utf8, <<"key">>}, {utf8, <<"##.\\|()[]{} ^$*+???">>}}]),
%% Escape character
true = match("key LIKE 'z_%' ESCAPE 'z'", [{{utf8, <<"key">>}, {utf8, <<"_foo">>}}]),
false = match("key LIKE 'z_%' ESCAPE 'z'", [{{utf8, <<"key">>}, {utf8, <<"foo">>}}]),
true = match("key LIKE '$_%' ESCAPE '$'", [{{utf8, <<"key">>}, {utf8, <<"_foo">>}}]),
false = match("key LIKE '$_%' ESCAPE '$'", [{{utf8, <<"key">>}, {utf8, <<"foo">>}}]),
true = match("key LIKE '_$%' ESCAPE '$'", [{{utf8, <<"key">>}, {utf8, <<"5%">>}}]),
true = match("key LIKE '_$%' ESCAPE '$'", [{{utf8, <<"key">>}, {utf8, <<"🥕%"/utf8>>}}]),
true = match("key LIKE '🍰@%🥕' ESCAPE '@'", [{{utf8, <<"key">>}, {utf8, <<"🍰%🥕"/utf8>>}}]),
false = match("key LIKE '🍰@%🥕' ESCAPE '@'", [{{utf8, <<"key">>}, {utf8, <<"🍰other🥕"/utf8>>}}]),
false = match("key LIKE '🍰@%🥕' ESCAPE '@'", [{{utf8, <<"key">>}, {utf8, <<"🍰🥕"/utf8>>}}]),
true = match("key LIKE '!_#.\\|()[]{} ^$*+?!%' ESCAPE '!'", [{{utf8, <<"key">>}, {utf8, <<"_#.\\|()[]{} ^$*+?%">>}}]),
false = match("key LIKE '!_#.\\|()[]{} ^$*+?!%' ESCAPE '!'", [{{utf8, <<"key">>}, {utf8, <<"##.\\|()[]{} ^$*+?%">>}}]),
true = match("product_id LIKE 'ABC\\_%' ESCAPE '\\'", app_props()),
false = match("product_id LIKE 'ABC\\%' ESCAPE '\\'", app_props()),
false = match("product_id LIKE 'ABC\\_\\%' ESCAPE '\\'", app_props()),
true = match("product_id LIKE 'ABC🥕_123' ESCAPE '🥕'", app_props()),
false = match("product_id LIKE 'ABC🥕%123' ESCAPE '🥕'", app_props()),
%% NOT LIKE
true = match("country NOT LIKE 'US'", app_props()),
false = match("country NOT LIKE 'U_'", app_props()),
false = match("country NOT LIKE '%U%'", app_props()),
false = match("country NOT LIKE 'U%K'", app_props()),
true = match("country NOT LIKE 'U%S'", app_props()),
true = match("country NOT LIKE 'z_🇬🇧' ESCAPE 'z'", [{{utf8, <<"country">>}, {utf8, <<"a🇬🇧"/utf8>>}}]),
false = match("country NOT LIKE 'z_🇬🇧' ESCAPE 'z'", [{{utf8, <<"country">>}, {utf8, <<"_🇬🇧"/utf8>>}}]),
%% "If identifier of a LIKE or NOT LIKE operation is NULL, the value of the operation is unknown."
false = match("absent LIKE '%'", app_props()),
false = match("absent NOT LIKE '%'", app_props()),
false = match("missing LIKE '%'", app_props()),
false = match("missing NOT LIKE '%'", app_props()),
%% Combined with other operators
true = match("description LIKE '%test%' AND country = 'UK'", app_props()),
true = match("(city LIKE 'Paris') OR (description LIKE '%test%')", app_props()),
true = match("city LIKE 'Paris' OR description LIKE '%test%'", app_props()).
in_operator(_Config) ->
AppPropsUtf8 = [{{utf8, <<"country">>}, {utf8, <<"🇬🇧"/utf8>>}}],
%% Basic IN operations
true = match("country IN ('US', 'UK', 'France')", app_props()),
true = match("country IN ('UK')", app_props()),
true = match("country IN ('🇫🇷', '🇬🇧')", AppPropsUtf8),
false = match("country IN ('US', 'France')", app_props()),
%% NOT IN
true = match("country NOT IN ('US', 'France', 'Germany')", app_props()),
true = match("country NOT IN ('🇬🇧')", app_props()),
false = match("country NOT IN ('🇫🇷', '🇬🇧')", AppPropsUtf8),
false = match("country NOT IN ('US', 'UK', 'France')", app_props()),
%% Combined with other operators
true = match("country IN ('UK', 'US') AND weight > 3", app_props()),
true = match("city IN ('Berlin', 'Paris') OR country IN ('UK', 'US')", app_props()),
%% "If identifier of an IN or NOT IN operation is NULL, the value of the operation is unknown."
false = match("missing IN ('UK', 'US')", app_props()),
false = match("absent IN ('UK', 'US')", app_props()),
false = match("missing NOT IN ('UK', 'US')", app_props()),
false = match("absent NOT IN ('UK', 'US')", app_props()).
between_operator(_Config) ->
%% Basic BETWEEN operations
true = match("weight BETWEEN 3 AND 7", app_props()),
true = match("weight BETWEEN 5 AND 7", app_props()),
true = match("weight BETWEEN 3 AND 5", app_props()),
false = match("weight BETWEEN 6 AND 10", app_props()),
true = match("price BETWEEN 10 AND 11", app_props()),
true = match("price BETWEEN 10 AND 10.5", app_props()),
false = match("price BETWEEN -1 AND 10", app_props()),
false = match("score BETWEEN tiny_value AND quantity", app_props()),
true = match("score BETWEEN -tiny_value AND quantity", app_props()),
%% NOT BETWEEN
true = match("weight NOT BETWEEN 6 AND 10", app_props()),
false = match("weight NOT BETWEEN 3 AND 7", app_props()),
false = match("weight NOT BETWEEN 3 AND 5", app_props()),
true = match("score NOT BETWEEN tiny_value AND quantity", app_props()),
false = match("score NOT BETWEEN -tiny_value AND quantity", app_props()),
%% Combined with other operators
true = match("weight BETWEEN 4 AND 6 AND country = 'UK'", app_props()),
true = match("(price BETWEEN 20 AND 30) OR (weight BETWEEN 5 AND 6)", app_props()),
%% "a string cannot be used in an arithmetic expression"
false = match("weight BETWEEN 1 AND 'Z'", app_props()),
false = match("country BETWEEN 'A' AND 'Z'", app_props()),
%% "Comparison or arithmetic with an unknown value always yields an unknown value."
false = match("weight BETWEEN absent AND 10", app_props()),
false = match("weight BETWEEN 2 AND absent", app_props()),
false = match("weight BETWEEN absent AND absent", app_props()),
false = match("absent BETWEEN 2 AND 10", app_props()),
false = match("weight NOT BETWEEN absent AND 10", app_props()),
false = match("weight NOT BETWEEN 2 AND absent", app_props()),
false = match("weight NOT BETWEEN absent AND absent", app_props()),
false = match("absent NOT BETWEEN 2 AND 10", app_props()).
null_handling(_Config) ->
%% IS NULL / IS NOT NULL
true = match("missing IS NULL", app_props()),
true = match("absent IS NULL", app_props()),
false = match("country IS NULL", app_props()),
true = match("country IS NOT NULL", app_props()),
false = match("missing IS NOT NULL", app_props()),
false = match("absent IS NOT NULL", app_props()),
true = match("country = 'UK' AND missing IS NULL", app_props()),
true = match("country = 'France' OR weight IS NOT NULL", app_props()),
%% "SQL treats a NULL value as unknown.
%% Comparison or arithmetic with an unknown value always yields an unknown value."
false = match("missing > 0", app_props()),
false = match("0 < missing", app_props()),
false = match("0 > absent", app_props()),
false = match("0 = missing", app_props()),
false = match("missing >= 0", app_props()),
false = match("missing < 0", app_props()),
false = match("missing <= 0", app_props()),
false = match("missing = 0", app_props()),
false = match("missing <> 0", app_props()),
false = match("missing = missing", app_props()),
false = match("absent = absent", app_props()),
false = match("missing AND true", app_props()),
false = match("missing OR false", app_props()).
literals(_Config) ->
%% Exact numeric literals
true = match("5 = 5", app_props()),
true = match("weight = 5", app_props()),
%% "Approximate literals use the Java floating-point literal syntax."
true = match("10.5 = 10.5", app_props()),
true = match("price = 10.5", app_props()),
true = match("5.0 > 4.999", app_props()),
true = match("10 = 10.", app_props()),
true = match("0 = 0.0", app_props()),
true = match("0 = 0.", app_props()),
true = match("0 = .0", app_props()),
true = match("weight = 5.0", app_props()), % int = float
true = match("5. = weight", app_props()), % float = int
%% String literals
true = match("'UK' = 'UK'", app_props()),
true = match("country = 'UK'", app_props()),
%% Boolean literals
true = match("TRUE = TRUE", app_props()),
true = match("active = TRUE", app_props()),
true = match("TRUE", app_props()),
true = match("active", app_props()),
true = match("FALSE = FALSE", app_props()),
true = match("premium = FALSE", app_props()),
false = match("FALSE", app_props()),
false = match("premium", app_props()),
%% Literals in expressions
true = match("weight + 2 > 6", app_props()),
true = match("price * 2 > 20.0", app_props()),
true = match("'UK' <> 'US'", app_props()).
scientific_notation(_Config) ->
%% Basic scientific notation comparisons
true = match("distance = 1.2E6", app_props()),
true = match("distance = 1200000.0", app_props()),
true = match("tiny_value = 3.5E-4", app_props()),
true = match("tiny_value = 0.00035", app_props()),
%% Scientific notation literals in expressions
true = match("1.2E3 = 1200", app_props()),
true = match("5E2 = 500", app_props()),
true = match("5.E2 = 500", app_props()),
true = match("-5E-2 = -0.05", app_props()),
true = match("-5.E-2 = -0.05", app_props()),
true = match(".5E-1 = 0.05", app_props()),
true = match("-.5E-1 = -0.05", app_props()),
true = match("1E0 = 1", app_props()),
%% Arithmetic with scientific notation
true = match("distance / 1.2E5 = 10", app_props()),
true = match("tiny_value * 1E6 = 350", app_props()),
true = match("1.5E2 + 2.5E2 = 400", app_props()),
true = match("3E3 - 2E3 = 1000", app_props()),
%% Comparisons with scientific notation
true = match("distance > 1E6", app_props()),
true = match("tiny_value < 1E-3", app_props()),
true = match("distance BETWEEN 1E6 AND 2E6", app_props()),
%% Mixed numeric formats
true = match("distance / 1200 = 1000", app_props()),
true = match("large_value + tiny_value >= large_value", app_props()),
true = match("large_value + large_value > large_value", app_props()).
precedence_and_parentheses(_Config) ->
%% Arithmetic precedence
true = match("weight + 2 * 3 = 11", app_props()),
true = match("(weight + 2) * 3 = 21", app_props()),
true = match("weight + weight * quantity - -temperature / 2 = 502.5", app_props()),
%% "Logical operators in precedence order: NOT, AND, OR"
true = match("NOT country = 'US' AND weight > 3", app_props()),
true = match("weight > 3 AND NOT country = 'US'", app_props()),
true = match("NOT (country = 'US' AND weight > 3)", app_props()),
true = match("NOT country = 'US' OR country = 'France' AND weight > 3", app_props()),
true = match("country = 'France' AND weight > 3 OR NOT country = 'US'", app_props()),
%% Mixed precedence
true = match("weight * 2 > 5 + 3", app_props()),
true = match("price < 20 OR country = 'US' AND weight > 3", app_props()),
true = match("weight > 3 AND price < 20 OR country = 'US'", app_props()),
false = match("weight > 3 AND (price > 20 OR country = 'US')", app_props()),
%% Complex parentheses nesting
true = match("((weight > 3) AND (price < -1)) OR ((country = 'UK') AND (city = 'London'))", app_props()),
true = match("weight > 3 AND price < -1 OR country = 'UK' AND city = 'London'", app_props()),
true = match("(weight + (price * 2)) > (score + 15)", app_props()).
%% "Only like type values can be compared. One exception is that it is
%% valid to compare exact numeric values and approximate numeric values.
%% If the comparison of non-like type values is attempted, the value of the operation is false."
type_handling(_Config) ->
%% Numeric comparisons
true = match("weight = 5", app_props()), % int = int
true = match("weight = 5.0", app_props()), % int = float
true = match("price = 10.5", app_props()), % float = float
%% String and numeric
false = match("country = 5", app_props()), % string != number
false = match("weight = 'UK'", app_props()), % number != string
%% Boolean comparisons
true = match("active = TRUE", app_props()),
true = match("active <> FALSE", app_props()),
false = match("TRUE = 1", app_props()), % boolean != number
false = match("active = 1", app_props()), % boolean != number
false = match("TRUE = 'TRUE'", app_props()), % boolean != string
false = match("active = 'TRUE'", app_props()), % boolean != string
%% Type-specific operators
true = match("description LIKE '%test%'", app_props()), % LIKE only works on strings
false = match("weight LIKE '5'", app_props()), % LIKE doesn't work on numbers
%% Arithmetic with different types
true = match("weight + price = 15.5", app_props()), % int + float = float
true = match("weight * discount = 1.25", app_props()), % int * float = float
%% Division by zero is undefined
false = match("weight / 0 > 0", app_props()),
false = match("weight / score = 5", app_props()),
false = match("0 / 0 = 0", app_props()),
false = match("0 / 0.0 = 0", app_props()),
false = match("0 / 0. = 0", app_props()),
false = match("-1 / 0 = 0", app_props()),
false = match("score / score = score", app_props()),
true = match("0.0 / 1 = 0", app_props()),
%% Type incompatibility
false = match("country + weight = 'UK5'", app_props()), % can't add string and number
false = match("active + premium = 1", app_props()). % can't add booleans
complex_expressions(_Config) ->
true = match(
"country = 'UK' AND price > 10.0 AND (weight BETWEEN 4 AND 6) AND description LIKE '%test%'",
app_props()
),
true = match(
"(country IN ('UK', 'US') OR city = 'London') AND (weight * 2 >= 10) AND NOT premium",
app_props()
),
true = match(
"price * quantity * (1 - discount) > 500",
app_props()
),
true = match(
"country = 'UK' AND (city = 'London' OR description LIKE '%test%') AND" ++
"(weight > 3 OR premium = TRUE) AND price <= 20",
app_props()
),
true = match(
"percentage >= 0 AND percentage <= 100 AND weight + temperature = 0",
app_props()
),
true = match(
"((country = 'UK' OR country = 'US') AND (city IN ('London', 'New York', 'Paris'))) OR " ++
"(price * (1 - discount) < 10.0 AND quantity > 50 AND description LIKE '%test%') OR " ++
"(active = TRUE AND premium = FALSE AND (weight BETWEEN 4 AND 10))",
app_props()
).
%% "Predefined selector literals and operator names are [...] case insensitive."
%% "Identifiers are case sensitive."
case_sensitivity(_Config) ->
AppProps = app_props(),
%% 1. Test that operators and literals are case insensitive
true = match("country = 'UK' AnD weight = 5", AppProps),
true = match("country = 'UK' and weight = 5", AppProps),
true = match("country = 'France' Or weight < 6", AppProps),
true = match("country = 'France' or weight < 6", AppProps),
true = match("NoT country = 'France'", AppProps),
true = match("not country = 'France'", AppProps),
true = match("weight BeTwEeN 3 AnD 7", AppProps),
true = match("weight between 3 AnD 7", AppProps),
true = match("description LiKe '%test%'", AppProps),
true = match("description like '%test%'", AppProps),
true = match("country In ('US', 'UK', 'France')", AppProps),
true = match("country in ('US', 'UK', 'France')", AppProps),
true = match("missing Is NuLl", AppProps),
true = match("missing is null", AppProps),
true = match("active = TrUe", AppProps),
true = match("active = true", AppProps),
true = match("premium = FaLsE", AppProps),
true = match("premium = false", AppProps),
true = match("distance = 1.2e6", app_props()),
true = match("tiny_value = 3.5e-4", app_props()),
true = match("3 = 3e0", app_props()),
true = match("3 = 3e-0", app_props()),
true = match("300 = 3e2", app_props()),
true = match("0.03 = 3e-2", app_props()),
%% 2. Test that identifiers are case sensitive
AppPropsCaseSensitiveKeys = AppProps ++ [{{utf8, <<"COUNTRY">>}, {utf8, <<"France">>}},
{{utf8, <<"Weight">>}, {uint, 10}}],
true = match("country = 'UK'", AppPropsCaseSensitiveKeys),
true = match("COUNTRY = 'France'", AppPropsCaseSensitiveKeys),
true = match("Weight = 10", AppPropsCaseSensitiveKeys),
false = match("COUNTRY = 'UK'", AppPropsCaseSensitiveKeys),
false = match("country = 'France'", AppPropsCaseSensitiveKeys),
false = match("weight = 10", AppPropsCaseSensitiveKeys),
false = match("WEIGHT = 5", AppPropsCaseSensitiveKeys),
true = match(
"country = 'UK' aNd COUNTRY = 'France' and (weight Between 4 AnD 6) AND Weight = 10",
AppPropsCaseSensitiveKeys
).
%% "Whitespace is the same as that defined for Java:
%% space, horizontal tab, form feed and line terminator."
whitespace_handling(_Config) ->
%% 1. Space
true = match("country = 'UK'", app_props()),
%% 2. Multiple spaces
true = match("country = 'UK'", app_props()),
%% 3. Horizontal tab (\t)
true = match("country\t=\t'UK'", app_props()),
%% 4. Form feed (\f)
true = match("country\f=\f'UK'", app_props()),
%% 5. Line terminators (\n line feed, \r carriage return)
true = match("country\n=\n'UK'", app_props()),
true = match("country\r=\r'UK'", app_props()),
%% 6. Mixed whitespace
true = match("country \t\f\n\r = \t\f\n\r 'UK'", app_props()),
%% 7. Complex expression with various whitespace
true = match("country\t=\t'UK'\nAND\rweight\f>\t3", app_props()),
%% 8. Ensure whitespace is not required
true = match("country='UK'AND weight=5", app_props()),
%% 9. Whitespace inside string literals should be preserved
true = match("description = 'This is a test message'", app_props()),
%% 10. Whitespace at beginning and end of expression
true = match(" \t\n\r country = 'UK' \t\n\r ", app_props()).
%% "An identifier is an unlimited-length character sequence that must begin with a
%% Java identifier start character; all following characters must be Java identifier
%% part characters. An identifier start character is any character for which the method
%% Character.isJavaIdentifierStart returns true. This includes '_' and '$'. An
%% identifier part character is any character for which the method
%% Character.isJavaIdentifierPart returns true."
identifier_rules(_Config) ->
Identifiers = [<<"simple">>,
<<"a1b2c3">>,
<<"x">>,
<<"_underscore">>,
<<"$dollar">>,
<<"_">>,
<<"$">>,
<<"with_underscore">>,
<<"with$dollar">>,
<<"mixed_$_identifiers_$_123">>],
AppProps = [{{utf8, Id}, {utf8, <<"value">>}} || Id <- Identifiers],
true = match("simple = 'value'", AppProps),
true = match("a1b2c3 = 'value'", AppProps),
true = match("x = 'value'", AppProps),
true = match("_underscore = 'value'", AppProps),
true = match("$dollar = 'value'", AppProps),
true = match("_ = 'value'", AppProps),
true = match("$ = 'value'", AppProps),
true = match("with_underscore = 'value'", AppProps),
true = match("with$dollar = 'value'", AppProps),
true = match("mixed_$_identifiers_$_123 = 'value'", AppProps).
header_section(_Config) ->
Hdr = #'v1_0.header'{priority = {ubyte, 7}},
Ps = #'v1_0.properties'{},
APs = [],
true = match("header.priority > 5", Hdr, Ps, APs),
true = match("header.priority = 7", Hdr, Ps, APs),
false = match("header.priority < 7", Hdr, Ps, APs),
%% Since the default priority is 4 in both AMQP and JMS, we expect the
%% following expression to evaluate to true if matched against a message
%% without an explicit priority level set.
true = match("header.priority = 4", []).
properties_section(_Config) ->
Ps = #'v1_0.properties'{
message_id = {utf8, <<"id-123">>},
user_id = {binary,<<"some user ID">>},
to = {utf8, <<"to some queue">>},
subject = {utf8, <<"some subject">>},
reply_to = {utf8, <<"reply to some topic">>},
correlation_id = {ulong, 789},
content_type = {symbol, <<"text/plain">>},
content_encoding = {symbol, <<"deflate">>},
absolute_expiry_time = {timestamp, 1311999988888},
creation_time = {timestamp, 1311704463521},
group_id = {utf8, <<"some group ID">>},
group_sequence = {uint, 999},
reply_to_group_id = {utf8, <<"other group ID">>}},
APs = [],
true = match("properties.message-id = 'id-123'", Ps, APs),
false = match("'id-123' <> properties.message-id", Ps, APs),
true = match("properties.message-id LIKE 'id-%'", Ps, APs),
true = match("properties.message-id IN ('id-123', 'id-456')", Ps, APs),
true = match("properties.user-id = 'some user ID'", Ps, APs),
true = match("properties.user-id LIKE '%user%'", Ps, APs),
false = match("properties.user-id = 'other user ID'", Ps, APs),
true = match("properties.to = 'to some queue'", Ps, APs),
true = match("properties.to LIKE 'to some%'", Ps, APs),
true = match("properties.to NOT LIKE '%topic'", Ps, APs),
true = match("properties.subject = 'some subject'", Ps, APs),
true = match("properties.subject LIKE '%subject'", Ps, APs),
true = match("properties.subject IN ('some subject', 'other subject')", Ps, APs),
true = match("properties.reply-to = 'reply to some topic'", Ps, APs),
true = match("properties.reply-to LIKE 'reply%topic'", Ps, APs),
false = match("properties.reply-to LIKE 'reply%queue'", Ps, APs),
true = match("properties.correlation-id = 789", Ps, APs),
true = match("500 < properties.correlation-id", Ps, APs),
true = match("properties.correlation-id BETWEEN 700 AND 800", Ps, APs),
false = match("properties.correlation-id < 700", Ps, APs),
true = match("properties.content-type = 'text/plain'", Ps, APs),
true = match("properties.content-type LIKE 'text/%'", Ps, APs),
true = match("properties.content-type IN ('text/plain', 'text/html')", Ps, APs),
true = match("'deflate' = properties.content-encoding", Ps, APs),
false = match("properties.content-encoding = 'gzip'", Ps, APs),
true = match("properties.content-encoding NOT IN ('gzip', 'compress')", Ps, APs),
true = match("properties.absolute-expiry-time = 1311999988888", Ps, APs),
true = match("properties.absolute-expiry-time > 1311999988000", Ps, APs),
true = match("properties.absolute-expiry-time BETWEEN 1311999988000 AND 1311999989000", Ps, APs),
true = match("properties.creation-time = 1311704463521", Ps, APs),
true = match("properties.creation-time < 1311999988888", Ps, APs),
true = match("properties.creation-time NOT BETWEEN 1311999988000 AND 1311999989000", Ps, APs),
true = match("properties.group-id = 'some group ID'", Ps, APs),
true = match("properties.group-id LIKE 'some%ID'", Ps, APs),
false = match("properties.group-id = 'other group ID'", Ps, APs),
true = match("properties.group-sequence = 999", Ps, APs),
true = match("properties.group-sequence >= 999", Ps, APs),
true = match("properties.group-sequence BETWEEN 900 AND 1000", Ps, APs),
false = match("properties.group-sequence > 999", Ps, APs),
true = match("properties.reply-to-group-id = 'other group ID'", Ps, APs),
true = match("properties.reply-to-group-id LIKE '%group ID'", Ps, APs),
true = match("properties.reply-to-group-id <> 'some group ID'", Ps, APs),
true = match("properties.reply-to-group-id IS NOT NULL", Ps, APs),
false = match("properties.reply-to-group-id IS NULL", Ps, APs),
true = match("properties.message-id = 'id-123' and 'some subject' = properties.subject", Ps, APs),
true = match("properties.group-sequence < 500 or properties.correlation-id > 700", Ps, APs),
true = match("(properties.content-type LIKE 'text/%') AND properties.content-encoding = 'deflate'", Ps, APs),
true = match("properties.subject IS NULL", #'v1_0.properties'{}, APs),
false = match("properties.subject IS NOT NULL", #'v1_0.properties'{}, APs).
multiple_sections(_Config) ->
Hdr = #'v1_0.header'{durable = true,
priority = {ubyte, 7}},
Ps = #'v1_0.properties'{
message_id = {utf8, <<"id-123">>},
user_id = {binary,<<"some user ID">>},
to = {utf8, <<"to some queue">>},
subject = {utf8, <<"some subject">>},
reply_to = {utf8, <<"reply to some topic">>},
correlation_id = {ulong, 789},
content_type = {symbol, <<"text/plain">>},
content_encoding = {symbol, <<"deflate">>},
absolute_expiry_time = {timestamp, 1311999988888},
creation_time = {timestamp, 1311704463521},
group_id = {utf8, <<"some group ID">>},
group_sequence = {uint, 999},
reply_to_group_id = {utf8, <<"other group ID">>}},
APs = [{{utf8, <<"key_1">>}, {byte, -1}}],
true = match("-1.0 = key_1 AND 4 < header.priority AND properties.group-sequence > 90", Hdr, Ps, APs),
false = match("-1.0 = key_1 AND 4 < header.priority AND properties.group-sequence < 90", Hdr, Ps, APs).
parse_errors(_Config) ->
%% Parsing a non-UTF-8 encoded message selector should fail.
?assertEqual(error, parse([255])),
%% Invalid token.
?assertEqual(error, parse("!!!")),
%% Invalid grammar.
?assertEqual(error, parse("AND NOT")),
%% Escape charater at end of pattern hould fail because it doesn't make any sense.
?assertEqual(error, parse("id LIKE 'pattern*' ESCAPE '*'")),
?assertEqual(error, parse("id LIKE '_pattern*' ESCAPE '*'")),
%% Control characters in user provided pattern shouldn't be allowed.
?assertEqual(error, parse("id LIKE '\n'")),
?assertEqual(error, parse("id LIKE '\r'")),
?assertEqual(error, parse("id LIKE '_\n'")),
?assertEqual(error, parse("id LIKE '%\r'")),
?assertEqual(error, parse("id LIKE '!\r' ESCAPE '!'")),
%% Expressions with more than 4096 characters should be prohibited.
ManyCharacters = lists:append(lists:duplicate(4096, "x")) ++ " IS NULL",
?assertEqual(error, parse(ManyCharacters)),
%% Expression with more than 200 tokens should be prohibited.
ManyTokens = "id IN (" ++ string:join(["'" ++ integer_to_list(N) ++ "'"|| N <- lists:seq(1, 100)], ",") ++ ")",
?assertEqual(error, parse(ManyTokens)),
%% "header." or "properties." prefixed identifiers
%% that do not refer to supported field names are disallowed.
?assertEqual(error, parse("header.invalid")),
?assertEqual(error, parse("properties.invalid")),
ok.
%%%===================================================================
%%% Helpers
%%%===================================================================
app_props() ->
[
%% String or symbolic values
{{utf8, <<"country">>}, {symbol, <<"UK">>}},
{{utf8, <<"city">>}, {utf8, <<"London">>}},
{{utf8, <<"description">>}, {utf8, <<"This is a test message">>}},
{{utf8, <<"currency">>}, {symbol, <<"GBP">>}},
{{utf8, <<"product_id">>}, {symbol, <<"ABC_123">>}},
%% Numeric values
{{utf8, <<"weight">>}, {ushort, 5}},
{{utf8, <<"price">> }, {double, 10.5}},
{{utf8, <<"quantity">>}, {uint, 100}},
{{utf8, <<"discount">>}, {double, 0.25}},
{{utf8, <<"temperature">>}, {int, -5}},
{{utf8, <<"score">>}, {ulong, 0}},
%% Scientific notation values
{{utf8, <<"distance">>}, {float, 1.2E6}}, % 1,200,000
{{utf8, <<"tiny_value">>}, {double, 3.5E-4}}, % 0.00035
{{utf8, <<"large_value">>}, {double, 6.02E23}}, % Avogadro's number
%% Boolean values
{{utf8, <<"active">>}, true},
{{utf8, <<"premium">>}, {boolean, false}},
%% Special cases
{{utf8, <<"missing">>}, null},
{{utf8, <<"percentage">>}, {ubyte, 75}}
].
match(Selector, AppProps) ->
match(Selector, #'v1_0.properties'{}, AppProps).
match(Selector, Props, AppProps) ->
match(Selector, #'v1_0.header'{}, Props, AppProps).
match(Selector, Header, Props, AppProps)
when is_list(AppProps) ->
{ok, ParsedSelector} = parse(Selector),
AP = #'v1_0.application_properties'{content = AppProps},
Body = #'v1_0.amqp_value'{content = {symbol, <<"some message body">>}},
Sections = [Header, Props, AP, Body],
Payload = amqp_encode_bin(Sections),
Mc = mc_amqp:init_from_stream(Payload, #{}),
rabbit_amqp_filter_jms:eval(ParsedSelector, Mc).
parse(Selector) ->
Descriptor = {ulong, ?DESCRIPTOR_CODE_SELECTOR_FILTER},
Filter = {described, Descriptor, {utf8, Selector}},
rabbit_amqp_filter_jms:parse(Filter).
amqp_encode_bin(L) when is_list(L) ->
iolist_to_binary([amqp10_framing:encode_bin(X) || X <- L]).

View File

@ -14,7 +14,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_filtex.hrl").
-include_lib("amqp10_common/include/amqp10_filter.hrl").
all() ->
[{group, tests}].

View File

@ -20,6 +20,39 @@ All AMQP 1.0 client libraries [maintained by Team RabbitMQ](https://www.rabbitmq
## Features
### SQL Filter Expression for Streams
AMQP 1.0 clients can now define SQL-like filter expressions when consuming from streams, enabling server-side message filtering.
RabbitMQ will only dispatch messages that match the provided filter expression, reducing network traffic and client-side processing overhead.
SQL filter expressions are a more powerful alternative to the [AMQP Property Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) introduced in RabbitMQ 4.1.
SQL filter expressions are based on the [JMS message selector syntax](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax) and support:
* Comparison operators (`=`, `<>`, `>`, `<`, `>=`, `<=`)
* Logical operators (`AND`, `OR`, `NOT`)
* Arithmetic operators (`+`, `-`, `*`, `/`)
* Special operators (`BETWEEN`, `LIKE`, `IN`, `IS NULL`)
* Access to the properties and application-properties sections
#### Examples
Simple expression:
```sql
header.priority > 4
```
Complex expression:
```sql
order_type IN ('premium', 'express') AND
total_amount BETWEEN 100 AND 5000 AND
(customer_region LIKE 'EU-%' OR customer_region = 'US-CA') AND
properties.creation-time >= 1750772279000 AND
NOT cancelled
```
Pull Request: [#14110](https://github.com/rabbitmq/rabbitmq-server/pull/14110)
### Incoming and Outgoing Message Interceptors for native protocols
Incoming and outgoing messages can now be intercepted on the broker.