Support streams in STOMP plugin
This commit introduces the support of an x-stream-offset header in the SUBSCRIBE frame to start consuming from a specific place in a stream. The possible values are first, last, next, offset:<offset-value> (e.g. offset:40000), timestamp:<timestamp-in-seconds> (e.g. timestamp:1619428685). This commit also propagates the x-stream-offset header in the MESSAGE frame to know the offset of a the delivered message in the stream.
This commit is contained in:
parent
bedc46a21a
commit
065b92114d
|
|
@ -26,6 +26,7 @@
|
|||
-define(HEADER_PASSCODE, "passcode").
|
||||
-define(HEADER_PERSISTENT, "persistent").
|
||||
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
|
||||
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
|
||||
-define(HEADER_PRIORITY, "priority").
|
||||
-define(HEADER_RECEIPT, "receipt").
|
||||
-define(HEADER_REDELIVERED, "redelivered").
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@
|
|||
boolean_header/2, boolean_header/3,
|
||||
integer_header/2, integer_header/3,
|
||||
binary_header/2, binary_header/3]).
|
||||
-export([stream_offset_header/2]).
|
||||
-export([serialize/1, serialize/2]).
|
||||
|
||||
initial_state() -> none.
|
||||
|
|
@ -222,6 +223,26 @@ binary_header(F, K) ->
|
|||
|
||||
binary_header(F, K, D) -> default_value(binary_header(F, K), D).
|
||||
|
||||
stream_offset_header(F, D) ->
|
||||
OffsetPrefix = <<"offset:">>,
|
||||
OffsetPrefixLength = byte_size(OffsetPrefix),
|
||||
TimestampPrefix = <<"timestamp:">>,
|
||||
TimestampPrefixLength = byte_size(TimestampPrefix),
|
||||
case binary_header(F, ?HEADER_X_STREAM_OFFSET, D) of
|
||||
<<"first">> ->
|
||||
{longstr, <<"first">>};
|
||||
<<"last">> ->
|
||||
{longstr, <<"last">>};
|
||||
<<"next">> ->
|
||||
{longstr, <<"next">>};
|
||||
<<OffsetPrefix:OffsetPrefixLength/binary, OffsetValue/binary>> ->
|
||||
{long, binary_to_integer(OffsetValue)};
|
||||
<<TimestampPrefix:TimestampPrefixLength/binary, TimestampValue/binary>> ->
|
||||
{timestamp, binary_to_integer(TimestampValue)};
|
||||
_ ->
|
||||
D
|
||||
end.
|
||||
|
||||
serialize(Frame) ->
|
||||
serialize(Frame, true).
|
||||
|
||||
|
|
|
|||
|
|
@ -685,6 +685,13 @@ do_subscribe(Destination, DestHdr, Frame,
|
|||
{stop, normal, close_connection(State)};
|
||||
error ->
|
||||
ExchangeAndKey = parse_routing(Destination, DfltTopicEx),
|
||||
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined),
|
||||
Arguments = case StreamOffset of
|
||||
undefined ->
|
||||
[];
|
||||
{Type, Value} ->
|
||||
[{<<"x-stream-offset">>, Type, Value}]
|
||||
end,
|
||||
try
|
||||
amqp_channel:subscribe(Channel,
|
||||
#'basic.consume'{
|
||||
|
|
@ -693,7 +700,7 @@ do_subscribe(Destination, DestHdr, Frame,
|
|||
no_local = false,
|
||||
no_ack = (AckMode == auto),
|
||||
exclusive = false,
|
||||
arguments = []},
|
||||
arguments = Arguments},
|
||||
self()),
|
||||
ok = rabbit_routing_util:ensure_binding(
|
||||
Queue, ExchangeAndKey, Channel)
|
||||
|
|
|
|||
|
|
@ -115,6 +115,8 @@ adhoc_convert_headers(Headers, Existing) ->
|
|||
[{binary_to_list(K), binary_to_list(V)} | Acc];
|
||||
({K, signedint, V}, Acc) ->
|
||||
[{binary_to_list(K), integer_to_list(V)} | Acc];
|
||||
({K, long, V}, Acc) ->
|
||||
[{binary_to_list(K), integer_to_list(V)} | Acc];
|
||||
(_, Acc) ->
|
||||
Acc
|
||||
end, Existing, Headers).
|
||||
|
|
|
|||
|
|
@ -38,7 +38,8 @@ all() ->
|
|||
header_value_with_cr,
|
||||
header_value_with_colon,
|
||||
headers_escaping_roundtrip,
|
||||
headers_escaping_roundtrip_without_trailing_lf
|
||||
headers_escaping_roundtrip_without_trailing_lf,
|
||||
stream_offset_header
|
||||
].
|
||||
|
||||
parse_simple_frame(_) ->
|
||||
|
|
@ -162,6 +163,24 @@ header_value_with_colon(_) ->
|
|||
headers = [{"header", "val:ue"}],
|
||||
body_iolist = []}).
|
||||
|
||||
stream_offset_header(_) ->
|
||||
TestCases = [
|
||||
{{"x-stream-offset", "first"}, {longstr, <<"first">>}},
|
||||
{{"x-stream-offset", "last"}, {longstr, <<"last">>}},
|
||||
{{"x-stream-offset", "next"}, {longstr, <<"next">>}},
|
||||
{{"x-stream-offset", "offset:5000"}, {long, 5000}},
|
||||
{{"x-stream-offset", "timestamp:1000"}, {timestamp, 1000}},
|
||||
{{"x-stream-offset", "foo"}, undefined},
|
||||
{{"some-header", "some value"}, undefined}
|
||||
],
|
||||
|
||||
lists:foreach(fun({Header, Expected}) ->
|
||||
?assertEqual(
|
||||
Expected,
|
||||
rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}, undefined)
|
||||
)
|
||||
end, TestCases).
|
||||
|
||||
test_frame_serialization(Expected, TrailingLF) ->
|
||||
{ok, Frame, _} = parse(Expected),
|
||||
{ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),
|
||||
|
|
|
|||
Loading…
Reference in New Issue