Support stream filtering in STOMP

Forward the x-stream-filter-size-bytes header when
a subscription creates a stream queue.

Extract the x-stream-filter header for subscriptions,
tokenize it (using the comma as the separator) in case
several filter values are provided.
This commit is contained in:
Arnaud Cogoluègnes 2023-12-05 10:39:08 +01:00
parent f54f880ca9
commit 02d1d86996
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
6 changed files with 83 additions and 23 deletions

View File

@ -27,6 +27,7 @@
-define(HEADER_PERSISTENT, "persistent"). -define(HEADER_PERSISTENT, "persistent").
-define(HEADER_PREFETCH_COUNT, "prefetch-count"). -define(HEADER_PREFETCH_COUNT, "prefetch-count").
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset"). -define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
-define(HEADER_PRIORITY, "priority"). -define(HEADER_PRIORITY, "priority").
-define(HEADER_RECEIPT, "receipt"). -define(HEADER_RECEIPT, "receipt").
-define(HEADER_REDELIVERED, "redelivered"). -define(HEADER_REDELIVERED, "redelivered").
@ -50,6 +51,7 @@
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl"). -define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
-define(HEADER_X_QUEUE_NAME, "x-queue-name"). -define(HEADER_X_QUEUE_NAME, "x-queue-name").
-define(HEADER_X_QUEUE_TYPE, "x-queue-type"). -define(HEADER_X_QUEUE_TYPE, "x-queue-type").
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, "x-stream-filter-size-bytes").
-define(MESSAGE_ID_SEPARATOR, "@@"). -define(MESSAGE_ID_SEPARATOR, "@@").
@ -67,7 +69,8 @@
?HEADER_X_MAX_LENGTH_BYTES, ?HEADER_X_MAX_LENGTH_BYTES,
?HEADER_X_MAX_PRIORITY, ?HEADER_X_MAX_PRIORITY,
?HEADER_X_MESSAGE_TTL, ?HEADER_X_MESSAGE_TTL,
?HEADER_X_QUEUE_TYPE ?HEADER_X_QUEUE_TYPE,
?HEADER_X_STREAM_FILTER_SIZE_BYTES
]). ]).
-define(HEADER_PARAMS, [ -define(HEADER_PARAMS, [

View File

@ -15,7 +15,7 @@
boolean_header/2, boolean_header/3, boolean_header/2, boolean_header/3,
integer_header/2, integer_header/3, integer_header/2, integer_header/3,
binary_header/2, binary_header/3]). binary_header/2, binary_header/3]).
-export([stream_offset_header/2]). -export([stream_offset_header/1, stream_filter_header/1]).
-export([serialize/1, serialize/2]). -export([serialize/1, serialize/2]).
initial_state() -> none. initial_state() -> none.
@ -211,20 +211,33 @@ binary_header(F, K) ->
binary_header(F, K, D) -> default_value(binary_header(F, K), D). binary_header(F, K, D) -> default_value(binary_header(F, K), D).
stream_offset_header(F, D) -> stream_offset_header(F) ->
case binary_header(F, ?HEADER_X_STREAM_OFFSET, D) of case binary_header(F, ?HEADER_X_STREAM_OFFSET) of
<<"first">> -> {ok, <<"first">>} ->
{longstr, <<"first">>}; {longstr, <<"first">>};
<<"last">> -> {ok, <<"last">>} ->
{longstr, <<"last">>}; {longstr, <<"last">>};
<<"next">> -> {ok, <<"next">>} ->
{longstr, <<"next">>}; {longstr, <<"next">>};
<<"offset=", OffsetValue/binary>> -> {ok, <<"offset=", OffsetValue/binary>>} ->
{long, binary_to_integer(OffsetValue)}; {long, binary_to_integer(OffsetValue)};
<<"timestamp=", TimestampValue/binary>> -> {ok, <<"timestamp=", TimestampValue/binary>>} ->
{timestamp, binary_to_integer(TimestampValue)}; {timestamp, binary_to_integer(TimestampValue)};
_ -> _ ->
D not_found
end.
stream_filter_header(F) ->
case binary_header(F, ?HEADER_X_STREAM_FILTER) of
{ok, Str} ->
{array, lists:reverse(
lists:foldl(fun(V, Acc) ->
[{longstr, V}] ++ Acc
end,
[],
binary:split(Str, <<",">>, [global])))};
not_found ->
not_found
end. end.
serialize(Frame) -> serialize(Frame) ->

View File

@ -676,13 +676,7 @@ do_subscribe(Destination, DestHdr, Frame,
{stop, normal, close_connection(State)}; {stop, normal, close_connection(State)};
error -> error ->
ExchangeAndKey = parse_routing(Destination, DfltTopicEx), ExchangeAndKey = parse_routing(Destination, DfltTopicEx),
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined), Arguments = subscribe_arguments(Frame),
Arguments = case StreamOffset of
undefined ->
[];
{Type, Value} ->
[{<<"x-stream-offset">>, Type, Value}]
end,
try try
amqp_channel:subscribe(Channel, amqp_channel:subscribe(Channel,
#'basic.consume'{ #'basic.consume'{
@ -722,6 +716,32 @@ do_subscribe(Destination, DestHdr, Frame,
Err Err
end. end.
subscribe_arguments(Frame) ->
subscribe_arguments([?HEADER_X_STREAM_OFFSET, ?HEADER_X_STREAM_FILTER], Frame, []).
subscribe_arguments([], _Frame , Acc) ->
Acc;
subscribe_arguments([K | T], Frame, Acc0) ->
Acc1 = subscribe_argument(K, Frame, Acc0),
subscribe_arguments(T, Frame, Acc1).
subscribe_argument(?HEADER_X_STREAM_OFFSET, Frame, Acc) ->
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame),
case StreamOffset of
not_found ->
Acc;
{OffsetType, OffsetValue} ->
[{<<"x-stream-offset">>, OffsetType, OffsetValue}] ++ Acc
end;
subscribe_argument(?HEADER_X_STREAM_FILTER, Frame, Acc) ->
StreamFilter = rabbit_stomp_frame:stream_filter_header(Frame),
case StreamFilter of
not_found ->
Acc;
{FilterType, FilterValue} ->
[{<<"x-stream-filter">>, FilterType, FilterValue}] ++ Acc
end.
check_subscription_access(Destination = {topic, _Topic}, check_subscription_access(Destination = {topic, _Topic},
#proc_state{auth_login = _User, #proc_state{auth_login = _User,
connection = Connection, connection = Connection,

View File

@ -297,7 +297,11 @@ build_argument(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, Val) ->
list_to_integer(string:strip(Val))}; list_to_integer(string:strip(Val))};
build_argument(?HEADER_X_QUEUE_TYPE, Val) -> build_argument(?HEADER_X_QUEUE_TYPE, Val) ->
{list_to_binary(?HEADER_X_QUEUE_TYPE), longstr, {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr,
list_to_binary(string:strip(Val))}. list_to_binary(string:strip(Val))};
build_argument(?HEADER_X_STREAM_FILTER_SIZE_BYTES, Val) ->
{list_to_binary(?HEADER_X_STREAM_FILTER_SIZE_BYTES), long,
list_to_integer(string:strip(Val))}.
build_params(EndPoint, Headers) -> build_params(EndPoint, Headers) ->
Params = lists:foldl(fun({K, V}, Acc) -> Params = lists:foldl(fun({K, V}, Acc) ->

View File

@ -39,7 +39,8 @@ all() ->
header_value_with_colon, header_value_with_colon,
headers_escaping_roundtrip, headers_escaping_roundtrip,
headers_escaping_roundtrip_without_trailing_lf, headers_escaping_roundtrip_without_trailing_lf,
stream_offset_header stream_offset_header,
stream_filter_header
]. ].
parse_simple_frame(_) -> parse_simple_frame(_) ->
@ -170,14 +171,32 @@ stream_offset_header(_) ->
{{"x-stream-offset", "next"}, {longstr, <<"next">>}}, {{"x-stream-offset", "next"}, {longstr, <<"next">>}},
{{"x-stream-offset", "offset=5000"}, {long, 5000}}, {{"x-stream-offset", "offset=5000"}, {long, 5000}},
{{"x-stream-offset", "timestamp=1000"}, {timestamp, 1000}}, {{"x-stream-offset", "timestamp=1000"}, {timestamp, 1000}},
{{"x-stream-offset", "foo"}, undefined}, {{"x-stream-offset", "foo"}, not_found},
{{"some-header", "some value"}, undefined} {{"some-header", "some value"}, not_found}
], ],
lists:foreach(fun({Header, Expected}) -> lists:foreach(fun({Header, Expected}) ->
?assertEqual( ?assertEqual(
Expected, Expected,
rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}, undefined) rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]})
)
end, TestCases).
stream_filter_header(_) ->
TestCases = [
{{"x-stream-filter", "banana"}, {array, [{longstr, <<"banana">>}]}},
{{"x-stream-filter", "banana,apple"}, {array, [{longstr, <<"banana">>},
{longstr, <<"apple">>}]}},
{{"x-stream-filter", "banana,apple,orange"}, {array, [{longstr, <<"banana">>},
{longstr, <<"apple">>},
{longstr, <<"orange">>}]}},
{{"some-header", "some value"}, not_found}
],
lists:foreach(fun({Header, Expected}) ->
?assertEqual(
Expected,
rabbit_stomp_frame:stream_filter_header(#stomp_frame{headers = [Header]})
) )
end, TestCases). end, TestCases).

View File

@ -27,6 +27,7 @@ class TestUserGeneratedQueueName(base.BaseTest):
'x-queue-type': 'stream', 'x-queue-type': 'stream',
'x-max-age' : '10h', 'x-max-age' : '10h',
'x-stream-max-segment-size-bytes' : 1048576, 'x-stream-max-segment-size-bytes' : 1048576,
'x-stream-filter-size-bytes' : 32,
'durable': True, 'durable': True,
'auto-delete': False, 'auto-delete': False,
'id': 1234, 'id': 1234,