WIP end-to-end spike

This commit is contained in:
David Ansari 2025-04-24 16:41:42 +02:00
parent 8a809b0b8b
commit f74a6e8194
3 changed files with 74 additions and 19 deletions

View File

@ -22,3 +22,14 @@
% [3.2.16]
-define(MESSAGE_FORMAT, 0).
%% SQL-based filtering syntax
%% These descriptors are defined in
%% https://www.amqp.org/specification/1.0/filters
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).
%% A filter with this name contains a JMS message selector.
%% We use the same name as Qpid JMS in
%% https://github.com/apache/qpid-jms/blob/2.7.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
-define(FILTER_NAME_JMS, <<"jms-selector">>).

View File

@ -3242,6 +3242,12 @@ parse_filters({Symbol = {symbol, <<"rabbitmq:stream-", _/binary>>}, Value}, Acc)
false ->
Acc
end;
parse_filters(Filter = {{symbol, ?FILTER_NAME_JMS}, {described, Descriptor, {utf8, JmsSelector}}},
{EffectiveFilters, ConsumerFilter, ConsumerArgs})
when Descriptor =:= {symbol, ?DESCRIPTOR_NAME_SELECTOR_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_SELECTOR_FILTER} ->
Arg = {<<"x-jms-selector">>, longstr, JmsSelector},
{[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]};
parse_filters(Filter = {{symbol, _Key}, Value},
Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) ->
case rabbit_amqp_filtex:validate(Value) of

View File

@ -394,6 +394,60 @@ single_active_consumer_on(Q) ->
_ -> false
end.
consumer_filter(Spec, Args, Q) ->
FilterSpec = maps:get(filter, Spec, []),
JmsSelector = case rabbit_misc:table_lookup(Args, <<"x-jms-selector">>) of
undefined -> undefined;
{longstr, Selector} -> Selector
end,
FilterSpecDefined = FilterSpec =/= [],
JmsSelectorDefined = JmsSelector =/= undefined,
case {FilterSpecDefined, JmsSelectorDefined} of
{false, false} ->
{ok, []};
{true, true} ->
%% AMQP filter expressions and JMS message selectors are mutually exclusive
error;
{true, false} ->
consumer_filter_from_filtex(FilterSpec, Q);
{false, true} ->
consumer_filter_from_jms_selector(JmsSelector, Q)
end.
%%TODO
%% * parse SQL mapping to Erlang expressions
%% * map JMS fields to AMQP fields
%% * validate that the quorum queue is configured to filter on these fields
%% * provide the Erlang expressions to the quorum queue
consumer_filter_from_jms_selector(JmsSelector, Q) ->
<<"JMSXGroupID = 'red'">> = JmsSelector,
Section = properties,
Field = group_id,
Value = <<"red">>,
QFilter = queue_filter(Q),
case lists:member({Section, Field}, QFilter) of
true ->
Filter = [{Section, [{Field, Value}]}],
{ok, Filter};
false ->
error
end.
%% TODO Allow more consumer filters.
%% Error out if the consumer defines a filter which is not allowed on the queue.
consumer_filter_from_filtex([], _Q) ->
{ok, []};
consumer_filter_from_filtex(Filter = [{properties = Section, [{group_id = Field, _}]}], Q) ->
QFilter = queue_filter(Q),
case lists:member({Section, Field}, QFilter) of
true ->
{ok, Filter};
false ->
error
end;
consumer_filter_from_filtex(_InvalidFilter, _Q) ->
error.
%%TODO Allow defining any property or application-property and also a way
%% to define "all properties" or "all application-properties"
queue_filter(Q) ->
@ -415,21 +469,6 @@ queue_filter(Q) ->
false
end.
%% TODO Allow more consumer filters.
%% Error out if the consumer defines a filter which is not allowed on the queue.
consumer_filter([], _Q) ->
{ok, []};
consumer_filter(Filter = [{properties = Section, [{group_id = Field, _}]}], Q) ->
QFilter = queue_filter(Q),
case lists:member({Section, Field}, QFilter) of
true ->
{ok, Filter};
false ->
error
end;
consumer_filter(_InvalidFilter, _Q) ->
error.
update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired,
Prefetch, Active, ActivityStatus, Args) ->
catch local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
@ -1043,12 +1082,11 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
_ ->
0
end,
FilterSpec = maps:get(filter, Spec, []),
case consumer_filter(FilterSpec, Q) of
case consumer_filter(Spec, Args, Q) of
error ->
{error, precondition_failed,
"invalid filter consuming from quorum ~ts: ~tp",
[rabbit_misc:rs(QName), FilterSpec]};
"invalid filter for quorum ~ts: consumer spec: ~tp consumer args: ~tp",
[rabbit_misc:rs(QName), Spec, Args]};
{ok, Filter} ->
ConsumerMeta = #{ack => AckRequired,
prefetch => Prefetch,