diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 532c86a2bc..d87975823f 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -215,6 +215,12 @@ used to make the difference between a request (0) and a response (1). Example fo |Client |0x0019 |Yes + +|<> (experimental) +|Server +|0x0020 +|Yes + |=== === DeclarePublisher @@ -597,10 +603,11 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream RoutingKey => string SuperStream => string -RouteResponse => Key Version CorrelationId [Stream] +RouteResponse => Key Version CorrelationId ResponseCode [Stream] Key => uint16 // 0x8018 Version => uint16 CorrelationId => uint32 + ResponseCode => uint16 Stream => string ``` @@ -615,13 +622,34 @@ PartitionsQuery => Key Version CorrelationId SuperStream CorrelationId => uint32 SuperStream => string -PartitionsResponse => Key Version CorrelationId [Stream] +PartitionsResponse => Key Version CorrelationId ResponseCode [Stream] Key => uint16 // 0x8019 Version => uint16 CorrelationId => uint32 + ResponseCode => uint16 Stream => string ``` +=== Consumer Update (experimental) + +``` +ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active + Key => uint16 // 0x001a + Version => uint16 + CorrelationId => uint32 + SubscriptionId => uint8 + Active => uint8 (boolean, 0 = false, 1 = true) + +ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification + Key => uint16 // 0x801a + Version => uint16 + CorrelationId => uint32 + ResponseCode => uint16 + OffsetSpecification => OffsetType Offset + OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp) + Offset => uint64 (for offset) | int64 (for timestamp) +``` + == Authentication Once a client is connected to the server, it initiates an authentication diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 09bd692fa7..fb18916a0e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -39,7 +39,8 @@ stream :: stream(), offset :: osiris:offset(), counters :: atomics:atomics_ref(), - properties :: map()}). + properties :: map(), + active :: boolean()}). -record(consumer, {configuration :: #consumer_configuration{}, credit :: non_neg_integer(), @@ -85,7 +86,9 @@ send_file_oct :: atomics:atomics_ref(), % number of bytes sent with send_file (for metrics) transport :: tcp | ssl, - proxy_socket :: undefined | ranch_proxy:proxy_socket()}). + proxy_socket :: undefined | ranch_proxy:proxy_socket(), + correlation_id_sequence :: integer(), + outstanding_requests :: #{integer() => term()}}). -record(configuration, {initial_credits :: integer(), credits_required_for_unblocking :: integer(), @@ -237,7 +240,9 @@ init([KeepaliveSup, send_file_oct = SendFileOct, transport = ConnTransport, proxy_socket = - rabbit_net:maybe_get_proxy_socket(Sock)}, + rabbit_net:maybe_get_proxy_socket(Sock), + correlation_id_sequence = 0, + outstanding_requests = #{}}, State = #stream_connection_state{consumers = #{}, blocked = false, @@ -643,7 +648,12 @@ augment_infos_with_user_provided_connection_name(Infos, close(Transport, S, #stream_connection_state{consumers = Consumers}) -> - [osiris_log:close(Log) + [case Log of + undefined -> + ok; %% segment may not be defined on subscription (single active consumer) + L -> + osiris_log:close(L) + end || #consumer{log = Log} <- maps:values(Consumers)], Transport:shutdown(S, write), Transport:close(S). @@ -1789,26 +1799,36 @@ handle_frame_post_auth(Transport, Stream, OffsetSpec, Properties]), - CounterSpec = - {{?MODULE, - QueueResource, - SubscriptionId, - self()}, - []}, - Options = - #{transport => ConnTransport, - chunk_selector => - get_chunk_selector(Properties)}, - {ok, Log} = - osiris:init_reader(LocalMemberPid, - OffsetSpec, - CounterSpec, - Options), - rabbit_log:debug("Next offset for subscription ~p is ~p", - [SubscriptionId, - osiris_log:next_offset(Log)]), + Sac = single_active_consumer(Properties), + ConsumerName = consumer_name(Properties), + %% TODO check consumer name is defined when SAC + Log = case Sac of + true -> + undefined; + false -> + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) + end, + ConsumerCounters = atomics:new(2, [{signed, false}]), + + Active = + maybe_register_consumer(Stream, + ConsumerName, + SubscriptionId, + Sac), + + Connection1 = + maybe_notify_consumer(Transport, + Connection, + SubscriptionId, + Active, + Sac), ConsumerConfiguration = #consumer_configuration{member_pid = LocalMemberPid, @@ -1819,83 +1839,47 @@ handle_frame_post_auth(Transport, offset = OffsetSpec, counters = ConsumerCounters, - properties = - Properties}, + properties = Properties, + active = Active}, ConsumerState = #consumer{configuration = ConsumerConfiguration, log = Log, credit = Credit}, - Connection1 = + Connection2 = maybe_monitor_stream(LocalMemberPid, Stream, - Connection), + Connection1), response_ok(Transport, Connection, subscribe, CorrelationId), - rabbit_log:debug("Distributing existing messages to subscription ~p", - [SubscriptionId]), - - case send_chunks(Transport, ConsumerState, - SendFileOct) - of - {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by " - "peer", - []), - throw({stop, normal}); - {ok, - #consumer{log = Log1, credit = Credit1} = - ConsumerState1} -> - Consumers1 = - Consumers#{SubscriptionId => - ConsumerState1}, - - StreamSubscriptions1 = - case StreamSubscriptions of - #{Stream := SubscriptionIds} -> - StreamSubscriptions#{Stream => - [SubscriptionId] - ++ SubscriptionIds}; - _ -> - StreamSubscriptions#{Stream => - [SubscriptionId]} - end, - - #consumer{configuration = - #consumer_configuration{counters - = - ConsumerCounters1}} = - ConsumerState1, - - ConsumerOffset = - osiris_log:next_offset(Log1), - ConsumerOffsetLag = - consumer_i(offset_lag, ConsumerState1), - - rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " - "distributed after subscription", - [SubscriptionId, - ConsumerOffset, - messages_consumed(ConsumerCounters1)]), - - rabbit_stream_metrics:consumer_created(self(), - stream_r(Stream, - Connection1), - SubscriptionId, - Credit1, - messages_consumed(ConsumerCounters1), - ConsumerOffset, - ConsumerOffsetLag, - Properties), - {Connection1#stream_connection{stream_subscriptions - = - StreamSubscriptions1}, - State#stream_connection_state{consumers = - Consumers1}} - end + State1 = + maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + Connection2, + Consumers, + Stream, + SubscriptionId, + Properties, + SendFileOct, + Sac), + StreamSubscriptions1 = + case StreamSubscriptions of + #{Stream := SubscriptionIds} -> + StreamSubscriptions#{Stream => + [SubscriptionId] + ++ SubscriptionIds}; + _ -> + StreamSubscriptions#{Stream => + [SubscriptionId]} + end, + {Connection2#stream_connection{stream_subscriptions + = + StreamSubscriptions1}, + State1} end end; error -> @@ -2376,6 +2360,118 @@ handle_frame_post_auth(Transport, FrameSize = byte_size(Frame), Transport:send(S, <>), {Connection, State}; +handle_frame_post_auth(Transport, + #stream_connection{transport = ConnTransport, + outstanding_requests = Requests0, + send_file_oct = SendFileOct, + virtual_host = VirtualHost} = + Connection, + #stream_connection_state{consumers = Consumers} = State, + {response, CorrelationId, + {consumer_update, _ResponseCode, + ResponseOffsetSpec}}) -> + %% FIXME check response code? It's supposed to be OK all the time. + case maps:take(CorrelationId, Requests0) of + {{{subscription_id, SubscriptionId}}, Rs} -> + rabbit_log:debug("Received consumer update response for subscription ~p", + [SubscriptionId]), + Consumers1 = + case Consumers of + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = + true}} = + Consumer} -> + %% active, dispatch messages + #consumer{configuration = + #consumer_configuration{properties = + Properties, + member_pid = + LocalMemberPid, + offset = + SubscriptionOffsetSpec, + stream = + Stream}} = + Consumer, + OffsetSpec = + case ResponseOffsetSpec of + none -> + SubscriptionOffsetSpec; + ROS -> + ROS + end, + + rabbit_log:debug("Initializing reader for active consumer, offset " + "spec is ~p", + [OffsetSpec]), + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + Segment = + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec), + Consumer1 = Consumer#consumer{log = Segment}, + Consumer2 = + case send_chunks(Transport, Consumer1, SendFileOct) + of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {error, Reason} -> + rabbit_log_connection:info("Error while sending chunks: ~p", + [Reason]), + %% likely a connection problem + Consumer; + {{segment, Log1}, {credit, Credit1}} -> + Consumer#consumer{log = Log1, + credit = Credit1} + end, + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters}, + log = Log2} = + Consumer2, + ConsumerOffset = osiris_log:next_offset(Log2), + + rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters)]), + + Consumers#{SubscriptionId => Consumer2}; + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = + false}}} -> + rabbit_log:debug("Not an active consumer"), + Consumers; + _ -> + rabbit_log:debug("No consumer found for subscription ~p", + [SubscriptionId]), + Consumers + end, + + {Connection#stream_connection{outstanding_requests = Rs}, + State#stream_connection_state{consumers = Consumers1}}; + {V, _Rs} -> + rabbit_log:warning("Unexpected outstanding requests for correlation " + "ID ~p: ~p", + [CorrelationId, V]), + {Connection, State}; + error -> + rabbit_log:warning("Could not find outstanding consumer update request " + "with correlation ID ~p. No actions taken for " + "the subscription.", + [CorrelationId]), + {Connection, State} + end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, @@ -2409,6 +2505,140 @@ handle_frame_post_auth(Transport, ?UNKNOWN_FRAME, 1), {Connection#stream_connection{connection_step = close_sent}, State}. +init_reader(ConnectionTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) -> + CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, + Options = + #{transport => ConnectionTransport, + chunk_selector => get_chunk_selector(Properties)}, + {ok, Segment} = + osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), + rabbit_log:debug("Next offset for subscription ~p is ~p", + [SubscriptionId, osiris_log:next_offset(Segment)]), + Segment. + +single_active_consumer(#{<<"single-active-consumer">> := + <<"true">>}) -> + true; +single_active_consumer(_Properties) -> + false. + +consumer_name(#{<<"name">> := Name}) -> + Name; +consumer_name(_Properties) -> + undefined. + +maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + SendFileOct, + false = _Sac) -> + rabbit_log:debug("Distributing existing messages to subscription ~p", + [SubscriptionId]), + case send_chunks(Transport, ConsumerState, SendFileOct) of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {ok, #consumer{log = Log1, credit = Credit1} = ConsumerState1} -> + Consumers1 = Consumers#{SubscriptionId => ConsumerState1}, + + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters1}} = + ConsumerState1, + + ConsumerOffset = osiris_log:next_offset(Log1), + ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1), + + rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters1)]), + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit1, + messages_consumed(ConsumerCounters1), + ConsumerOffset, + ConsumerOffsetLag, + SubscriptionProperties), + State#stream_connection_state{consumers = Consumers1} + end; +maybe_dispatch_on_subscription(_Transport, + State, + ConsumerState, + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + _SendFileOct, + true = _Sac) -> + rabbit_log:debug("No initial dispatch for subscription ~p for now, " + "waiting for consumer update response from client " + "(single active consumer)", + [SubscriptionId]), + #consumer{credit = Credit, + configuration = #consumer_configuration{offset = Offset}} = + ConsumerState, + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit, + 0, %% messages consumed + Offset, + 0, %% offset lag + SubscriptionProperties), + Consumers1 = Consumers#{SubscriptionId => ConsumerState}, + State#stream_connection_state{consumers = Consumers1}. + +maybe_register_consumer(_, _, _, false = _Sac) -> + true; +maybe_register_consumer(Stream, ConsumerName, SubscriptionId, true) -> + {ok, Active} = + rabbit_stream_sac_coordinator:register_consumer(Stream, + ConsumerName, + self(), + SubscriptionId), + Active. + +maybe_notify_consumer(_, Connection, _, _, false = _Sac) -> + Connection; +maybe_notify_consumer(Transport, + #stream_connection{socket = S, + correlation_id_sequence = CorrIdSeq, + outstanding_requests = + OutstandingRequests0} = + Connection, + SubscriptionId, + Active, + true = _Sac) -> + rabbit_log:debug("SAC subscription ~p, active = ~p", + [SubscriptionId, Active]), + Frame = + rabbit_stream_core:frame({request, CorrIdSeq, + {consumer_update, SubscriptionId, Active}}), + + OutstandingRequests1 = + maps:put(CorrIdSeq, {{subscription_id, SubscriptionId}}, + OutstandingRequests0), + send(Transport, S, Frame), + Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1, + outstanding_requests = OutstandingRequests1}. + notify_connection_closed(#statem_data{connection = #stream_connection{name = Name, publishers = diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl b/deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl new file mode 100644 index 0000000000..f55c13a54d --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl @@ -0,0 +1,220 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_sac_coordinator). + +-behaviour(gen_server). + +%% API functions +-export([start_link/0]). +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([register_consumer/4]). + +-type stream() :: binary(). +-type consumer_name() :: binary(). +-type subscription_id() :: byte(). + +-record(consumer, + {pid :: pid(), subscription_id :: subscription_id()}). +-record(group, {consumers :: [#consumer{}]}). +-record(stream_groups, {groups :: #{consumer_name() => #group{}}}). +-record(state, {stream_groups :: #{stream() => #stream_groups{}}}). + +register_consumer(Stream, + ConsumerName, + ConnectionPid, + SubscriptionId) -> + call({register_consumer, + Stream, + ConsumerName, + ConnectionPid, + SubscriptionId}). + +call(Request) -> + gen_server:call({global, ?MODULE}, + Request).%%%=================================================================== + %%% API functions + %%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + case gen_server:start_link({global, ?MODULE}, ?MODULE, [], []) of + {error, {already_started, _Pid}} -> + ignore; + R -> + R + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{stream_groups = #{}}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call({register_consumer, + Stream, + ConsumerName, + ConnectionPid, + SubscriptionId}, + _From, #state{stream_groups = StreamGroups0} = State) -> + StreamGroups1 = + maybe_create_group(Stream, ConsumerName, StreamGroups0), + Group0 = lookup_group(Stream, ConsumerName, StreamGroups1), + Consumer = + #consumer{pid = ConnectionPid, subscription_id = SubscriptionId}, + Group = add_to_group(Consumer, Group0), + Active = is_active(Consumer, Group), + StreamGroups2 = + update_groups(Stream, ConsumerName, Group, StreamGroups1), + {reply, {ok, Active}, State#state{stream_groups = StreamGroups2}}; +handle_call(which_children, _From, State) -> + {reply, [], State}. + +maybe_create_group(Stream, ConsumerName, StreamGroups) -> + case StreamGroups of + #{Stream := #stream_groups{groups = #{ConsumerName := _Consumers}}} -> + %% the group already exists + StreamGroups; + #{Stream := #stream_groups{groups = GroupsForTheStream} = SG} -> + %% there are groups for this streams, but not one for this consumer name + GroupsForTheStream1 = + maps:put(ConsumerName, #group{consumers = []}, + GroupsForTheStream), + StreamGroups#{Stream => + SG#stream_groups{groups = GroupsForTheStream1}}; + SGS -> + SG = maps:get(Stream, SGS, #stream_groups{groups = #{}}), + #stream_groups{groups = Groups} = SG, + Groups1 = maps:put(ConsumerName, #group{consumers = []}, Groups), + SGS#{Stream => SG#stream_groups{groups = Groups1}} + end. + +lookup_group(Stream, ConsumerName, StreamGroups) -> + case StreamGroups of + #{Stream := #stream_groups{groups = #{ConsumerName := Group}}} -> + Group; + _ -> + error + end. + +add_to_group(Consumer, #group{consumers = Consumers} = Group) -> + Group#group{consumers = Consumers ++ [Consumer]}. + +is_active(Consumer, #group{consumers = [Consumer]}) -> + true; +is_active(Consumer, #group{consumers = [Consumer | _]}) -> + true; +is_active(_, _) -> + false. + +update_groups(Stream, ConsumerName, Group, StreamGroups) -> + #{Stream := #stream_groups{groups = Groups}} = StreamGroups, + Groups1 = maps:put(ConsumerName, Group, Groups), + StreamGroups#{Stream => #stream_groups{groups = Groups1}}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl index 92bec93a82..bccc60ae29 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl @@ -79,9 +79,14 @@ init([]) -> type => worker, start => {rabbit_stream_metrics_gc, start_link, []}}, + SacCoordinator = + #{id => rabbit_stream_sac_coordinator, + type => worker, + start => {rabbit_stream_sac_coordinator, start_link, []}}, + {ok, {{one_for_all, 10, 10}, - [StreamManager, MetricsGc] + [StreamManager, MetricsGc, SacCoordinator] ++ listener_specs(fun tcp_listener_spec/1, [SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners) diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index e552cc7a02..e7e0137f2a 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -23,6 +23,7 @@ -define(COMMAND_HEARTBEAT, 23). -define(COMMAND_ROUTE, 24). -define(COMMAND_PARTITIONS, 25). +-define(COMMAND_CONSUMER_UPDATE, 26). -define(REQUEST, 0). -define(RESPONSE, 1). @@ -50,6 +51,7 @@ -define(RESPONSE_CODE_NO_OFFSET, 19). +-define(OFFSET_TYPE_NONE, 0). -define(OFFSET_TYPE_FIRST, 1). -define(OFFSET_TYPE_LAST, 2). -define(OFFSET_TYPE_NEXT, 3). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index d16b887df1..5ec5f73e1c 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -63,6 +63,7 @@ -type credit() :: non_neg_integer(). -type offset_ref() :: binary(). -type endpoint() :: {Host :: binary(), Port :: non_neg_integer()}. +-type active() :: boolean(). -type command() :: {publish, publisher_id(), @@ -98,7 +99,8 @@ {open, VirtualHost :: binary()} | {close, Code :: non_neg_integer(), Reason :: binary()} | {route, RoutingKey :: binary(), SuperStream :: binary()} | - {partitions, SuperStream :: binary()}} | + {partitions, SuperStream :: binary()} | + {consumer_update, subscription_id(), active()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -124,7 +126,8 @@ HeartBeat :: non_neg_integer()} | {credit, response_code(), subscription_id()} | {route, response_code(), stream_name()} | - {partitions, response_code(), [stream_name()]}} | + {partitions, response_code(), [stream_name()]} | + {consumer_update, response_code(), none | offset_spec()}} | {unknown, binary()}. -spec init(term()) -> state(). @@ -423,7 +426,24 @@ response_body({route = Tag, Code, Stream}) -> {command_id(Tag), <>}; response_body({partitions = Tag, Code, Streams}) -> StreamsBin = [<> || Stream <- Streams], - {command_id(Tag), [<>, StreamsBin]}. + {command_id(Tag), [<>, StreamsBin]}; +response_body({consumer_update = Tag, Code, OffsetSpec}) -> + OffsetSpecBin = + case OffsetSpec of + none -> + <>; + first -> + <>; + last -> + <>; + next -> + <>; + Offset when is_integer(Offset) -> + <>; + {timestamp, Ts} -> + <> + end, + {command_id(Tag), [<>]}. request_body({declare_publisher = Tag, PublisherId, @@ -510,7 +530,16 @@ request_body({close = Tag, Code, Reason}) -> request_body({route = Tag, RoutingKey, SuperStream}) -> {Tag, <>}; request_body({partitions = Tag, SuperStream}) -> - {Tag, <>}. + {Tag, <>}; +request_body({consumer_update = Tag, SubscriptionId, Active}) -> + ActiveBin = + case Active of + true -> + 1; + false -> + 0 + end, + {Tag, <>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -748,6 +777,20 @@ parse_request(<>) -> request(CorrelationId, {partitions, SuperStream}); +parse_request(<>) -> + Active = + case ActiveBin of + 0 -> + false; + 1 -> + true + end, + request(CorrelationId, {consumer_update, SubscriptionId, Active}); parse_request(Bin) -> {unknown, Bin}. @@ -823,7 +866,30 @@ parse_response_body(?COMMAND_ROUTE, parse_response_body(?COMMAND_PARTITIONS, <>) -> Partitions = list_of_strings(PartitionsBin), - {partitions, ResponseCode, Partitions}. + {partitions, ResponseCode, Partitions}; +parse_response_body(?COMMAND_CONSUMER_UPDATE, + <>) -> + OffsetSpec = offset_spec(OffsetType, OffsetValue), + {consumer_update, ResponseCode, OffsetSpec}. + +offset_spec(OffsetType, OffsetValueBin) -> + case OffsetType of + ?OFFSET_TYPE_NONE -> + none; + ?OFFSET_TYPE_FIRST -> + first; + ?OFFSET_TYPE_LAST -> + last; + ?OFFSET_TYPE_NEXT -> + next; + ?OFFSET_TYPE_OFFSET -> + <> = OffsetValueBin, + Offset; + ?OFFSET_TYPE_TIMESTAMP -> + <> = OffsetValueBin, + {timestamp, Timestamp} + end. request(Corr, Cmd) -> {request, Corr, Cmd}. @@ -941,7 +1007,9 @@ command_id(heartbeat) -> command_id(route) -> ?COMMAND_ROUTE; command_id(partitions) -> - ?COMMAND_PARTITIONS. + ?COMMAND_PARTITIONS; +command_id(consumer_update) -> + ?COMMAND_CONSUMER_UPDATE. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -992,7 +1060,9 @@ parse_command_id(?COMMAND_HEARTBEAT) -> parse_command_id(?COMMAND_ROUTE) -> route; parse_command_id(?COMMAND_PARTITIONS) -> - partitions. + partitions; +parse_command_id(?COMMAND_CONSUMER_UPDATE) -> + consumer_update. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index 74b0cc1d60..40152279c8 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -103,6 +103,7 @@ roundtrip(_Config) -> test_roundtrip({request, 99, {close, 99, <<"reason">>}}), test_roundtrip({request, 99, {route, <<"rkey.*">>, <<"exchange">>}}), test_roundtrip({request, 99, {partitions, <<"super stream">>}}), + test_roundtrip({request, 99, {consumer_update, 1, true}}), %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) || Tag @@ -128,10 +129,10 @@ roundtrip(_Config) -> test_roundtrip({response, 0, {tune, 10000, 12345}}), % %% NB: does not write correlation id test_roundtrip({response, 0, {credit, 98, 200}}), - % %% TODO should route return a list of routed streams? test_roundtrip({response, 99, {route, 1, <<"stream_name">>}}), test_roundtrip({response, 99, {partitions, 1, [<<"stream1">>, <<"stream2">>]}}), + test_roundtrip({response, 99, {consumer_update, 1, none}}), ok. roundtrip_metadata(_Config) ->