Use "filter."-shaped subscription properties

To set filter values in stream protocol.
This commit is contained in:
Arnaud Cogoluègnes 2023-05-24 17:07:39 +02:00
parent 9f06fb7db9
commit 62c83b0b9d
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
4 changed files with 77 additions and 18 deletions

View File

@ -357,6 +357,13 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification
NB: Timestamp is https://www.erlang.org/doc/apps/erts/time_correction.html#Erlang_System_Time[Erlang system time],
milliseconds from epoch
Supported properties:
* `single-active-consumer`: set to `true` to enable https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/[single active consumer] for this subscription.
* `super-stream`: set to the name of the super stream the subscribed is a partition of.
* `filter.` (e.g. `filter.0`, `filter.1`, etc): prefix to use to define filter values for the subscription.
* `match-unfiltered`: whether to return messages without any filter value or not.
=== Deliver
Version 1

View File

@ -2826,28 +2826,15 @@ init_reader(ConnectionTransport,
Properties,
OffsetSpec) ->
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
Options = filter_spec(Properties,
#{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties)}),
Options = maps:merge(#{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties)},
rabbit_stream_utils:filter_spec(Properties)),
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
rabbit_log:debug("Next offset for subscription ~tp is ~tp",
[SubscriptionId, osiris_log:next_offset(Segment)]),
Segment.
filter_spec(#{<<"filters">> := FiltersBin} = Properties, Options) ->
Filters = binary:split(FiltersBin, <<",">>, [global]),
MatchUnfiltered = case Properties of
#{<<"match-unfiltered">> := <<"true">>} ->
true;
_ ->
false
end,
Options#{filter_spec =>
#{filters => Filters, match_unfiltered => MatchUnfiltered}};
filter_spec(_, Options) ->
Options.
single_active_consumer(#consumer{configuration =
#consumer_configuration{properties = Properties}}) ->
single_active_consumer(Properties);

View File

@ -29,6 +29,7 @@
sort_partitions/1,
strip_cr_lf/1,
consumer_activity_status/2,
filter_spec/1,
command_versions/0]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@ -124,6 +125,21 @@ write_messages(?VERSION_1 = V, ClusterLeader,
UncompressedSize,
Batch}),
write_messages(V, ClusterLeader, PublisherRef, PublisherId, Rest);
write_messages(?VERSION_2 = V, ClusterLeader,
undefined,
PublisherId,
<<PublishingId:64,
-1:16/signed,
0:1,
MessageSize:31,
Message:MessageSize/binary,
Rest/binary>>) ->
ok =
osiris:write(ClusterLeader,
undefined,
{PublisherId, PublishingId},
Message),
write_messages(V, ClusterLeader, undefined, PublisherId, Rest);
write_messages(?VERSION_2 = V, ClusterLeader,
undefined,
PublisherId,
@ -139,6 +155,17 @@ write_messages(?VERSION_2 = V, ClusterLeader,
{PublisherId, PublishingId},
{FilterValue, Message}),
write_messages(V, ClusterLeader, undefined, PublisherId, Rest);
write_messages(?VERSION_2 = V, ClusterLeader,
PublisherRef,
PublisherId,
<<PublishingId:64,
-1:16/signed,
0:1,
MessageSize:31,
Message:MessageSize/binary,
Rest/binary>>) ->
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
write_messages(V, ClusterLeader, PublisherRef, PublisherId, Rest);
write_messages(?VERSION_2 = V, ClusterLeader,
PublisherRef,
PublisherId,
@ -150,7 +177,6 @@ write_messages(?VERSION_2 = V, ClusterLeader,
Rest/binary>>) ->
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {FilterValue, Message}),
write_messages(V, ClusterLeader, PublisherRef, PublisherId, Rest).
%% TODO handle filter value with sub-batching
parse_map(<<>>, _Count) ->
{#{}, <<>>};
@ -276,6 +302,26 @@ consumer_activity_status(Active, Properties) ->
waiting
end.
filter_spec(Properties) ->
Filters = maps:fold(fun(<<"filter.",_/binary>>, V, Acc) ->
[V] ++ Acc;
(_, _, Acc) ->
Acc
end, [], Properties),
case Filters of
[] ->
#{};
_ ->
MatchUnfiltered = case Properties of
#{<<"match-unfiltered">> := <<"true">>} ->
true;
_ ->
false
end,
#{filter_spec =>
#{filters => Filters, match_unfiltered => MatchUnfiltered}}
end.
command_versions() ->
[{declare_publisher, ?VERSION_1, ?VERSION_1},
{publish, ?VERSION_1, ?VERSION_2},

View File

@ -17,7 +17,7 @@ suite() ->
[{timetrap, {seconds, 30}}].
groups() ->
[{tests, [], [sort_partitions]}].
[{tests, [], [sort_partitions, filter_spec]}].
init_per_suite(Config) ->
Config.
@ -65,6 +65,25 @@ sort_partitions(_Config) ->
0)])]),
ok.
filter_spec(_Config) ->
[begin
FilterSpec = rabbit_stream_utils:filter_spec(Properties),
?assert(maps:is_key(filter_spec, FilterSpec)),
#{filter_spec := #{filters := Filters, match_unfiltered := MatchUnfiltered}} = FilterSpec,
?assertEqual(lists:sort(ExpectedFilters), lists:sort(Filters)),
?assertEqual(ExpectedMatchUnfiltered, MatchUnfiltered)
end || {Properties, ExpectedFilters, ExpectedMatchUnfiltered} <-
[{#{<<"filter.1">> => <<"apple">>,
<<"filter.2">> => <<"banana">>,
<<"sac">> => true}, [<<"apple">>, <<"banana">>], false},
{#{<<"filter.1">> => <<"apple">>}, [<<"apple">>], false},
{#{<<"filter.1">> => <<"apple">>,
<<"match-unfiltered">> => <<"true">>}, [<<"apple">>], true}
]],
#{} = rabbit_stream_utils:filter_spec(#{}),
#{} = rabbit_stream_utils:filter_spec(#{<<"sac">> => true}),
ok.
binding(Destination, Order) ->
#binding{destination = #resource{name = Destination},
args = [{<<"x-stream-partition-order">>, signedint, Order}]}.