diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 1fe8d382ac..0d2aaa17ed 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -27,6 +27,7 @@ -define(HEADER_PERSISTENT, "persistent"). -define(HEADER_PREFETCH_COUNT, "prefetch-count"). -define(HEADER_X_STREAM_OFFSET, "x-stream-offset"). +-define(HEADER_X_STREAM_FILTER, "x-stream-filter"). -define(HEADER_PRIORITY, "priority"). -define(HEADER_RECEIPT, "receipt"). -define(HEADER_REDELIVERED, "redelivered"). @@ -50,6 +51,7 @@ -define(HEADER_X_MESSAGE_TTL, "x-message-ttl"). -define(HEADER_X_QUEUE_NAME, "x-queue-name"). -define(HEADER_X_QUEUE_TYPE, "x-queue-type"). +-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, "x-stream-filter-size-bytes"). -define(MESSAGE_ID_SEPARATOR, "@@"). @@ -67,7 +69,8 @@ ?HEADER_X_MAX_LENGTH_BYTES, ?HEADER_X_MAX_PRIORITY, ?HEADER_X_MESSAGE_TTL, - ?HEADER_X_QUEUE_TYPE + ?HEADER_X_QUEUE_TYPE, + ?HEADER_X_STREAM_FILTER_SIZE_BYTES ]). -define(HEADER_PARAMS, [ diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl index 0aea95cb6c..2549ca8978 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl @@ -15,7 +15,7 @@ boolean_header/2, boolean_header/3, integer_header/2, integer_header/3, binary_header/2, binary_header/3]). --export([stream_offset_header/2]). +-export([stream_offset_header/1, stream_filter_header/1]). -export([serialize/1, serialize/2]). initial_state() -> none. @@ -211,20 +211,33 @@ binary_header(F, K) -> binary_header(F, K, D) -> default_value(binary_header(F, K), D). -stream_offset_header(F, D) -> - case binary_header(F, ?HEADER_X_STREAM_OFFSET, D) of - <<"first">> -> +stream_offset_header(F) -> + case binary_header(F, ?HEADER_X_STREAM_OFFSET) of + {ok, <<"first">>} -> {longstr, <<"first">>}; - <<"last">> -> + {ok, <<"last">>} -> {longstr, <<"last">>}; - <<"next">> -> + {ok, <<"next">>} -> {longstr, <<"next">>}; - <<"offset=", OffsetValue/binary>> -> + {ok, <<"offset=", OffsetValue/binary>>} -> {long, binary_to_integer(OffsetValue)}; - <<"timestamp=", TimestampValue/binary>> -> + {ok, <<"timestamp=", TimestampValue/binary>>} -> {timestamp, binary_to_integer(TimestampValue)}; _ -> - D + not_found + end. + +stream_filter_header(F) -> + case binary_header(F, ?HEADER_X_STREAM_FILTER) of + {ok, Str} -> + {array, lists:reverse( + lists:foldl(fun(V, Acc) -> + [{longstr, V}] ++ Acc + end, + [], + binary:split(Str, <<",">>, [global])))}; + not_found -> + not_found end. serialize(Frame) -> diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 7606036436..c925bfff55 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -676,13 +676,7 @@ do_subscribe(Destination, DestHdr, Frame, {stop, normal, close_connection(State)}; error -> ExchangeAndKey = parse_routing(Destination, DfltTopicEx), - StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined), - Arguments = case StreamOffset of - undefined -> - []; - {Type, Value} -> - [{<<"x-stream-offset">>, Type, Value}] - end, + Arguments = subscribe_arguments(Frame), try amqp_channel:subscribe(Channel, #'basic.consume'{ @@ -722,6 +716,32 @@ do_subscribe(Destination, DestHdr, Frame, Err end. +subscribe_arguments(Frame) -> + subscribe_arguments([?HEADER_X_STREAM_OFFSET, ?HEADER_X_STREAM_FILTER], Frame, []). + +subscribe_arguments([], _Frame , Acc) -> + Acc; +subscribe_arguments([K | T], Frame, Acc0) -> + Acc1 = subscribe_argument(K, Frame, Acc0), + subscribe_arguments(T, Frame, Acc1). + +subscribe_argument(?HEADER_X_STREAM_OFFSET, Frame, Acc) -> + StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame), + case StreamOffset of + not_found -> + Acc; + {OffsetType, OffsetValue} -> + [{<<"x-stream-offset">>, OffsetType, OffsetValue}] ++ Acc + end; +subscribe_argument(?HEADER_X_STREAM_FILTER, Frame, Acc) -> + StreamFilter = rabbit_stomp_frame:stream_filter_header(Frame), + case StreamFilter of + not_found -> + Acc; + {FilterType, FilterValue} -> + [{<<"x-stream-filter">>, FilterType, FilterValue}] ++ Acc + end. + check_subscription_access(Destination = {topic, _Topic}, #proc_state{auth_login = _User, connection = Connection, diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 951573aefe..d3e07be04a 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -296,8 +296,12 @@ build_argument(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, Val) -> {list_to_binary(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES), long, list_to_integer(string:strip(Val))}; build_argument(?HEADER_X_QUEUE_TYPE, Val) -> - {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr, - list_to_binary(string:strip(Val))}. + {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr, + list_to_binary(string:strip(Val))}; +build_argument(?HEADER_X_STREAM_FILTER_SIZE_BYTES, Val) -> + {list_to_binary(?HEADER_X_STREAM_FILTER_SIZE_BYTES), long, + list_to_integer(string:strip(Val))}. + build_params(EndPoint, Headers) -> Params = lists:foldl(fun({K, V}, Acc) -> diff --git a/deps/rabbitmq_stomp/test/frame_SUITE.erl b/deps/rabbitmq_stomp/test/frame_SUITE.erl index 2395eaccd7..0d81255c00 100644 --- a/deps/rabbitmq_stomp/test/frame_SUITE.erl +++ b/deps/rabbitmq_stomp/test/frame_SUITE.erl @@ -39,7 +39,8 @@ all() -> header_value_with_colon, headers_escaping_roundtrip, headers_escaping_roundtrip_without_trailing_lf, - stream_offset_header + stream_offset_header, + stream_filter_header ]. parse_simple_frame(_) -> @@ -170,17 +171,35 @@ stream_offset_header(_) -> {{"x-stream-offset", "next"}, {longstr, <<"next">>}}, {{"x-stream-offset", "offset=5000"}, {long, 5000}}, {{"x-stream-offset", "timestamp=1000"}, {timestamp, 1000}}, - {{"x-stream-offset", "foo"}, undefined}, - {{"some-header", "some value"}, undefined} + {{"x-stream-offset", "foo"}, not_found}, + {{"some-header", "some value"}, not_found} ], lists:foreach(fun({Header, Expected}) -> ?assertEqual( Expected, - rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}, undefined) + rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}) ) end, TestCases). +stream_filter_header(_) -> + TestCases = [ + {{"x-stream-filter", "banana"}, {array, [{longstr, <<"banana">>}]}}, + {{"x-stream-filter", "banana,apple"}, {array, [{longstr, <<"banana">>}, + {longstr, <<"apple">>}]}}, + {{"x-stream-filter", "banana,apple,orange"}, {array, [{longstr, <<"banana">>}, + {longstr, <<"apple">>}, + {longstr, <<"orange">>}]}}, + {{"some-header", "some value"}, not_found} + ], + + lists:foreach(fun({Header, Expected}) -> + ?assertEqual( + Expected, + rabbit_stomp_frame:stream_filter_header(#stomp_frame{headers = [Header]}) + ) + end, TestCases). + test_frame_serialization(Expected, TrailingLF) -> {ok, Frame, _} = parse(Expected), {ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"), diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py index 1ce2579bf2..dd87548d5d 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py @@ -27,6 +27,7 @@ class TestUserGeneratedQueueName(base.BaseTest): 'x-queue-type': 'stream', 'x-max-age' : '10h', 'x-stream-max-segment-size-bytes' : 1048576, + 'x-stream-filter-size-bytes' : 32, 'durable': True, 'auto-delete': False, 'id': 1234,