Handle single active consumer registration

WIP. Uses a simple in-memory coordinator for now.
No failover yet.

References #3753
This commit is contained in:
Arnaud Cogoluègnes 2021-11-17 17:28:52 +01:00
parent ec39f83105
commit d5ae62b1a9
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
7 changed files with 654 additions and 98 deletions

View File

@ -215,6 +215,12 @@ used to make the difference between a request (0) and a response (1). Example fo
|Client
|0x0019
|Yes
|<<consumerupdate>> (experimental)
|Server
|0x0020
|Yes
|===
=== DeclarePublisher
@ -597,10 +603,11 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream
RoutingKey => string
SuperStream => string
RouteResponse => Key Version CorrelationId [Stream]
RouteResponse => Key Version CorrelationId ResponseCode [Stream]
Key => uint16 // 0x8018
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Stream => string
```
@ -615,13 +622,34 @@ PartitionsQuery => Key Version CorrelationId SuperStream
CorrelationId => uint32
SuperStream => string
PartitionsResponse => Key Version CorrelationId [Stream]
PartitionsResponse => Key Version CorrelationId ResponseCode [Stream]
Key => uint16 // 0x8019
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Stream => string
```
=== Consumer Update (experimental)
```
ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active
Key => uint16 // 0x001a
Version => uint16
CorrelationId => uint32
SubscriptionId => uint8
Active => uint8 (boolean, 0 = false, 1 = true)
ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification
Key => uint16 // 0x801a
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
OffsetSpecification => OffsetType Offset
OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
```
== Authentication
Once a client is connected to the server, it initiates an authentication

View File

@ -39,7 +39,8 @@
stream :: stream(),
offset :: osiris:offset(),
counters :: atomics:atomics_ref(),
properties :: map()}).
properties :: map(),
active :: boolean()}).
-record(consumer,
{configuration :: #consumer_configuration{},
credit :: non_neg_integer(),
@ -85,7 +86,9 @@
send_file_oct ::
atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
transport :: tcp | ssl,
proxy_socket :: undefined | ranch_proxy:proxy_socket()}).
proxy_socket :: undefined | ranch_proxy:proxy_socket(),
correlation_id_sequence :: integer(),
outstanding_requests :: #{integer() => term()}}).
-record(configuration,
{initial_credits :: integer(),
credits_required_for_unblocking :: integer(),
@ -237,7 +240,9 @@ init([KeepaliveSup,
send_file_oct = SendFileOct,
transport = ConnTransport,
proxy_socket =
rabbit_net:maybe_get_proxy_socket(Sock)},
rabbit_net:maybe_get_proxy_socket(Sock),
correlation_id_sequence = 0,
outstanding_requests = #{}},
State =
#stream_connection_state{consumers = #{},
blocked = false,
@ -643,7 +648,12 @@ augment_infos_with_user_provided_connection_name(Infos,
close(Transport, S,
#stream_connection_state{consumers = Consumers}) ->
[osiris_log:close(Log)
[case Log of
undefined ->
ok; %% segment may not be defined on subscription (single active consumer)
L ->
osiris_log:close(L)
end
|| #consumer{log = Log} <- maps:values(Consumers)],
Transport:shutdown(S, write),
Transport:close(S).
@ -1789,26 +1799,36 @@ handle_frame_post_auth(Transport,
Stream,
OffsetSpec,
Properties]),
CounterSpec =
{{?MODULE,
QueueResource,
SubscriptionId,
self()},
[]},
Options =
#{transport => ConnTransport,
chunk_selector =>
get_chunk_selector(Properties)},
{ok, Log} =
osiris:init_reader(LocalMemberPid,
OffsetSpec,
CounterSpec,
Options),
rabbit_log:debug("Next offset for subscription ~p is ~p",
[SubscriptionId,
osiris_log:next_offset(Log)]),
Sac = single_active_consumer(Properties),
ConsumerName = consumer_name(Properties),
%% TODO check consumer name is defined when SAC
Log = case Sac of
true ->
undefined;
false ->
init_reader(ConnTransport,
LocalMemberPid,
QueueResource,
SubscriptionId,
Properties,
OffsetSpec)
end,
ConsumerCounters =
atomics:new(2, [{signed, false}]),
Active =
maybe_register_consumer(Stream,
ConsumerName,
SubscriptionId,
Sac),
Connection1 =
maybe_notify_consumer(Transport,
Connection,
SubscriptionId,
Active,
Sac),
ConsumerConfiguration =
#consumer_configuration{member_pid =
LocalMemberPid,
@ -1819,83 +1839,47 @@ handle_frame_post_auth(Transport,
offset = OffsetSpec,
counters =
ConsumerCounters,
properties =
Properties},
properties = Properties,
active = Active},
ConsumerState =
#consumer{configuration = ConsumerConfiguration,
log = Log,
credit = Credit},
Connection1 =
Connection2 =
maybe_monitor_stream(LocalMemberPid, Stream,
Connection),
Connection1),
response_ok(Transport,
Connection,
subscribe,
CorrelationId),
rabbit_log:debug("Distributing existing messages to subscription ~p",
[SubscriptionId]),
case send_chunks(Transport, ConsumerState,
SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
{ok,
#consumer{log = Log1, credit = Credit1} =
ConsumerState1} ->
Consumers1 =
Consumers#{SubscriptionId =>
ConsumerState1},
StreamSubscriptions1 =
case StreamSubscriptions of
#{Stream := SubscriptionIds} ->
StreamSubscriptions#{Stream =>
[SubscriptionId]
++ SubscriptionIds};
_ ->
StreamSubscriptions#{Stream =>
[SubscriptionId]}
end,
#consumer{configuration =
#consumer_configuration{counters
=
ConsumerCounters1}} =
ConsumerState1,
ConsumerOffset =
osiris_log:next_offset(Log1),
ConsumerOffsetLag =
consumer_i(offset_lag, ConsumerState1),
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
[SubscriptionId,
ConsumerOffset,
messages_consumed(ConsumerCounters1)]),
rabbit_stream_metrics:consumer_created(self(),
stream_r(Stream,
Connection1),
SubscriptionId,
Credit1,
messages_consumed(ConsumerCounters1),
ConsumerOffset,
ConsumerOffsetLag,
Properties),
{Connection1#stream_connection{stream_subscriptions
=
StreamSubscriptions1},
State#stream_connection_state{consumers =
Consumers1}}
end
State1 =
maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
Connection2,
Consumers,
Stream,
SubscriptionId,
Properties,
SendFileOct,
Sac),
StreamSubscriptions1 =
case StreamSubscriptions of
#{Stream := SubscriptionIds} ->
StreamSubscriptions#{Stream =>
[SubscriptionId]
++ SubscriptionIds};
_ ->
StreamSubscriptions#{Stream =>
[SubscriptionId]}
end,
{Connection2#stream_connection{stream_subscriptions
=
StreamSubscriptions1},
State1}
end
end;
error ->
@ -2376,6 +2360,118 @@ handle_frame_post_auth(Transport,
FrameSize = byte_size(Frame),
Transport:send(S, <<FrameSize:32, Frame/binary>>),
{Connection, State};
handle_frame_post_auth(Transport,
#stream_connection{transport = ConnTransport,
outstanding_requests = Requests0,
send_file_oct = SendFileOct,
virtual_host = VirtualHost} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
{response, CorrelationId,
{consumer_update, _ResponseCode,
ResponseOffsetSpec}}) ->
%% FIXME check response code? It's supposed to be OK all the time.
case maps:take(CorrelationId, Requests0) of
{{{subscription_id, SubscriptionId}}, Rs} ->
rabbit_log:debug("Received consumer update response for subscription ~p",
[SubscriptionId]),
Consumers1 =
case Consumers of
#{SubscriptionId :=
#consumer{configuration =
#consumer_configuration{active =
true}} =
Consumer} ->
%% active, dispatch messages
#consumer{configuration =
#consumer_configuration{properties =
Properties,
member_pid =
LocalMemberPid,
offset =
SubscriptionOffsetSpec,
stream =
Stream}} =
Consumer,
OffsetSpec =
case ResponseOffsetSpec of
none ->
SubscriptionOffsetSpec;
ROS ->
ROS
end,
rabbit_log:debug("Initializing reader for active consumer, offset "
"spec is ~p",
[OffsetSpec]),
QueueResource =
#resource{name = Stream,
kind = queue,
virtual_host = VirtualHost},
Segment =
init_reader(ConnTransport,
LocalMemberPid,
QueueResource,
SubscriptionId,
Properties,
OffsetSpec),
Consumer1 = Consumer#consumer{log = Segment},
Consumer2 =
case send_chunks(Transport, Consumer1, SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~p",
[Reason]),
%% likely a connection problem
Consumer;
{{segment, Log1}, {credit, Credit1}} ->
Consumer#consumer{log = Log1,
credit = Credit1}
end,
#consumer{configuration =
#consumer_configuration{counters =
ConsumerCounters},
log = Log2} =
Consumer2,
ConsumerOffset = osiris_log:next_offset(Log2),
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters)]),
Consumers#{SubscriptionId => Consumer2};
#{SubscriptionId :=
#consumer{configuration =
#consumer_configuration{active =
false}}} ->
rabbit_log:debug("Not an active consumer"),
Consumers;
_ ->
rabbit_log:debug("No consumer found for subscription ~p",
[SubscriptionId]),
Consumers
end,
{Connection#stream_connection{outstanding_requests = Rs},
State#stream_connection_state{consumers = Consumers1}};
{V, _Rs} ->
rabbit_log:warning("Unexpected outstanding requests for correlation "
"ID ~p: ~p",
[CorrelationId, V]),
{Connection, State};
error ->
rabbit_log:warning("Could not find outstanding consumer update request "
"with correlation ID ~p. No actions taken for "
"the subscription.",
[CorrelationId]),
{Connection, State}
end;
handle_frame_post_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
@ -2409,6 +2505,140 @@ handle_frame_post_auth(Transport,
?UNKNOWN_FRAME, 1),
{Connection#stream_connection{connection_step = close_sent}, State}.
init_reader(ConnectionTransport,
LocalMemberPid,
QueueResource,
SubscriptionId,
Properties,
OffsetSpec) ->
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
Options =
#{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties)},
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
rabbit_log:debug("Next offset for subscription ~p is ~p",
[SubscriptionId, osiris_log:next_offset(Segment)]),
Segment.
single_active_consumer(#{<<"single-active-consumer">> :=
<<"true">>}) ->
true;
single_active_consumer(_Properties) ->
false.
consumer_name(#{<<"name">> := Name}) ->
Name;
consumer_name(_Properties) ->
undefined.
maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
Connection,
Consumers,
Stream,
SubscriptionId,
SubscriptionProperties,
SendFileOct,
false = _Sac) ->
rabbit_log:debug("Distributing existing messages to subscription ~p",
[SubscriptionId]),
case send_chunks(Transport, ConsumerState, SendFileOct) of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
{ok, #consumer{log = Log1, credit = Credit1} = ConsumerState1} ->
Consumers1 = Consumers#{SubscriptionId => ConsumerState1},
#consumer{configuration =
#consumer_configuration{counters =
ConsumerCounters1}} =
ConsumerState1,
ConsumerOffset = osiris_log:next_offset(Log1),
ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1),
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters1)]),
rabbit_stream_metrics:consumer_created(self(),
stream_r(Stream, Connection),
SubscriptionId,
Credit1,
messages_consumed(ConsumerCounters1),
ConsumerOffset,
ConsumerOffsetLag,
SubscriptionProperties),
State#stream_connection_state{consumers = Consumers1}
end;
maybe_dispatch_on_subscription(_Transport,
State,
ConsumerState,
Connection,
Consumers,
Stream,
SubscriptionId,
SubscriptionProperties,
_SendFileOct,
true = _Sac) ->
rabbit_log:debug("No initial dispatch for subscription ~p for now, "
"waiting for consumer update response from client "
"(single active consumer)",
[SubscriptionId]),
#consumer{credit = Credit,
configuration = #consumer_configuration{offset = Offset}} =
ConsumerState,
rabbit_stream_metrics:consumer_created(self(),
stream_r(Stream, Connection),
SubscriptionId,
Credit,
0, %% messages consumed
Offset,
0, %% offset lag
SubscriptionProperties),
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
State#stream_connection_state{consumers = Consumers1}.
maybe_register_consumer(_, _, _, false = _Sac) ->
true;
maybe_register_consumer(Stream, ConsumerName, SubscriptionId, true) ->
{ok, Active} =
rabbit_stream_sac_coordinator:register_consumer(Stream,
ConsumerName,
self(),
SubscriptionId),
Active.
maybe_notify_consumer(_, Connection, _, _, false = _Sac) ->
Connection;
maybe_notify_consumer(Transport,
#stream_connection{socket = S,
correlation_id_sequence = CorrIdSeq,
outstanding_requests =
OutstandingRequests0} =
Connection,
SubscriptionId,
Active,
true = _Sac) ->
rabbit_log:debug("SAC subscription ~p, active = ~p",
[SubscriptionId, Active]),
Frame =
rabbit_stream_core:frame({request, CorrIdSeq,
{consumer_update, SubscriptionId, Active}}),
OutstandingRequests1 =
maps:put(CorrIdSeq, {{subscription_id, SubscriptionId}},
OutstandingRequests0),
send(Transport, S, Frame),
Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1,
outstanding_requests = OutstandingRequests1}.
notify_connection_closed(#statem_data{connection =
#stream_connection{name = Name,
publishers =

View File

@ -0,0 +1,220 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_sac_coordinator).
-behaviour(gen_server).
%% API functions
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([register_consumer/4]).
-type stream() :: binary().
-type consumer_name() :: binary().
-type subscription_id() :: byte().
-record(consumer,
{pid :: pid(), subscription_id :: subscription_id()}).
-record(group, {consumers :: [#consumer{}]}).
-record(stream_groups, {groups :: #{consumer_name() => #group{}}}).
-record(state, {stream_groups :: #{stream() => #stream_groups{}}}).
register_consumer(Stream,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
call({register_consumer,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId}).
call(Request) ->
gen_server:call({global, ?MODULE},
Request).%%%===================================================================
%%% API functions
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
case gen_server:start_link({global, ?MODULE}, ?MODULE, [], []) of
{error, {already_started, _Pid}} ->
ignore;
R ->
R
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{stream_groups = #{}}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call({register_consumer,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId},
_From, #state{stream_groups = StreamGroups0} = State) ->
StreamGroups1 =
maybe_create_group(Stream, ConsumerName, StreamGroups0),
Group0 = lookup_group(Stream, ConsumerName, StreamGroups1),
Consumer =
#consumer{pid = ConnectionPid, subscription_id = SubscriptionId},
Group = add_to_group(Consumer, Group0),
Active = is_active(Consumer, Group),
StreamGroups2 =
update_groups(Stream, ConsumerName, Group, StreamGroups1),
{reply, {ok, Active}, State#state{stream_groups = StreamGroups2}};
handle_call(which_children, _From, State) ->
{reply, [], State}.
maybe_create_group(Stream, ConsumerName, StreamGroups) ->
case StreamGroups of
#{Stream := #stream_groups{groups = #{ConsumerName := _Consumers}}} ->
%% the group already exists
StreamGroups;
#{Stream := #stream_groups{groups = GroupsForTheStream} = SG} ->
%% there are groups for this streams, but not one for this consumer name
GroupsForTheStream1 =
maps:put(ConsumerName, #group{consumers = []},
GroupsForTheStream),
StreamGroups#{Stream =>
SG#stream_groups{groups = GroupsForTheStream1}};
SGS ->
SG = maps:get(Stream, SGS, #stream_groups{groups = #{}}),
#stream_groups{groups = Groups} = SG,
Groups1 = maps:put(ConsumerName, #group{consumers = []}, Groups),
SGS#{Stream => SG#stream_groups{groups = Groups1}}
end.
lookup_group(Stream, ConsumerName, StreamGroups) ->
case StreamGroups of
#{Stream := #stream_groups{groups = #{ConsumerName := Group}}} ->
Group;
_ ->
error
end.
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = Consumers ++ [Consumer]}.
is_active(Consumer, #group{consumers = [Consumer]}) ->
true;
is_active(Consumer, #group{consumers = [Consumer | _]}) ->
true;
is_active(_, _) ->
false.
update_groups(Stream, ConsumerName, Group, StreamGroups) ->
#{Stream := #stream_groups{groups = Groups}} = StreamGroups,
Groups1 = maps:put(ConsumerName, Group, Groups),
StreamGroups#{Stream => #stream_groups{groups = Groups1}}.
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -79,9 +79,14 @@ init([]) ->
type => worker,
start => {rabbit_stream_metrics_gc, start_link, []}},
SacCoordinator =
#{id => rabbit_stream_sac_coordinator,
type => worker,
start => {rabbit_stream_sac_coordinator, start_link, []}},
{ok,
{{one_for_all, 10, 10},
[StreamManager, MetricsGc]
[StreamManager, MetricsGc, SacCoordinator]
++ listener_specs(fun tcp_listener_spec/1,
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
Listeners)

View File

@ -23,6 +23,7 @@
-define(COMMAND_HEARTBEAT, 23).
-define(COMMAND_ROUTE, 24).
-define(COMMAND_PARTITIONS, 25).
-define(COMMAND_CONSUMER_UPDATE, 26).
-define(REQUEST, 0).
-define(RESPONSE, 1).
@ -50,6 +51,7 @@
-define(RESPONSE_CODE_NO_OFFSET, 19).
-define(OFFSET_TYPE_NONE, 0).
-define(OFFSET_TYPE_FIRST, 1).
-define(OFFSET_TYPE_LAST, 2).
-define(OFFSET_TYPE_NEXT, 3).

View File

@ -63,6 +63,7 @@
-type credit() :: non_neg_integer().
-type offset_ref() :: binary().
-type endpoint() :: {Host :: binary(), Port :: non_neg_integer()}.
-type active() :: boolean().
-type command() ::
{publish,
publisher_id(),
@ -98,7 +99,8 @@
{open, VirtualHost :: binary()} |
{close, Code :: non_neg_integer(), Reason :: binary()} |
{route, RoutingKey :: binary(), SuperStream :: binary()} |
{partitions, SuperStream :: binary()}} |
{partitions, SuperStream :: binary()} |
{consumer_update, subscription_id(), active()}} |
{response, correlation_id(),
{declare_publisher |
delete_publisher |
@ -124,7 +126,8 @@
HeartBeat :: non_neg_integer()} |
{credit, response_code(), subscription_id()} |
{route, response_code(), stream_name()} |
{partitions, response_code(), [stream_name()]}} |
{partitions, response_code(), [stream_name()]} |
{consumer_update, response_code(), none | offset_spec()}} |
{unknown, binary()}.
-spec init(term()) -> state().
@ -423,7 +426,24 @@ response_body({route = Tag, Code, Stream}) ->
{command_id(Tag), <<Code:16, ?STRING(Stream)>>};
response_body({partitions = Tag, Code, Streams}) ->
StreamsBin = [<<?STRING(Stream)>> || Stream <- Streams],
{command_id(Tag), [<<Code:16, (length(Streams)):32>>, StreamsBin]}.
{command_id(Tag), [<<Code:16, (length(Streams)):32>>, StreamsBin]};
response_body({consumer_update = Tag, Code, OffsetSpec}) ->
OffsetSpecBin =
case OffsetSpec of
none ->
<<?OFFSET_TYPE_NONE:16>>;
first ->
<<?OFFSET_TYPE_FIRST:16>>;
last ->
<<?OFFSET_TYPE_LAST:16>>;
next ->
<<?OFFSET_TYPE_NEXT:16>>;
Offset when is_integer(Offset) ->
<<?OFFSET_TYPE_OFFSET, Offset:64/unsigned>>;
{timestamp, Ts} ->
<<?OFFSET_TYPE_TIMESTAMP, Ts:64/signed>>
end,
{command_id(Tag), [<<Code:16, OffsetSpecBin/binary>>]}.
request_body({declare_publisher = Tag,
PublisherId,
@ -510,7 +530,16 @@ request_body({close = Tag, Code, Reason}) ->
request_body({route = Tag, RoutingKey, SuperStream}) ->
{Tag, <<?STRING(RoutingKey), ?STRING(SuperStream)>>};
request_body({partitions = Tag, SuperStream}) ->
{Tag, <<?STRING(SuperStream)>>}.
{Tag, <<?STRING(SuperStream)>>};
request_body({consumer_update = Tag, SubscriptionId, Active}) ->
ActiveBin =
case Active of
true ->
1;
false ->
0
end,
{Tag, <<SubscriptionId:8, ActiveBin:8>>}.
append_data(Prev, Data) when is_binary(Prev) ->
[Prev, Data];
@ -748,6 +777,20 @@ parse_request(<<?REQUEST:1,
CorrelationId:32,
?STRING(StreamSize, SuperStream)>>) ->
request(CorrelationId, {partitions, SuperStream});
parse_request(<<?REQUEST:1,
?COMMAND_CONSUMER_UPDATE:15,
?VERSION_1:16,
CorrelationId:32,
SubscriptionId:8,
ActiveBin:8>>) ->
Active =
case ActiveBin of
0 ->
false;
1 ->
true
end,
request(CorrelationId, {consumer_update, SubscriptionId, Active});
parse_request(Bin) ->
{unknown, Bin}.
@ -823,7 +866,30 @@ parse_response_body(?COMMAND_ROUTE,
parse_response_body(?COMMAND_PARTITIONS,
<<ResponseCode:16, _Count:32, PartitionsBin/binary>>) ->
Partitions = list_of_strings(PartitionsBin),
{partitions, ResponseCode, Partitions}.
{partitions, ResponseCode, Partitions};
parse_response_body(?COMMAND_CONSUMER_UPDATE,
<<ResponseCode:16, OffsetType:16/signed,
OffsetValue/binary>>) ->
OffsetSpec = offset_spec(OffsetType, OffsetValue),
{consumer_update, ResponseCode, OffsetSpec}.
offset_spec(OffsetType, OffsetValueBin) ->
case OffsetType of
?OFFSET_TYPE_NONE ->
none;
?OFFSET_TYPE_FIRST ->
first;
?OFFSET_TYPE_LAST ->
last;
?OFFSET_TYPE_NEXT ->
next;
?OFFSET_TYPE_OFFSET ->
<<Offset:64/unsigned>> = OffsetValueBin,
Offset;
?OFFSET_TYPE_TIMESTAMP ->
<<Timestamp:64/signed>> = OffsetValueBin,
{timestamp, Timestamp}
end.
request(Corr, Cmd) ->
{request, Corr, Cmd}.
@ -941,7 +1007,9 @@ command_id(heartbeat) ->
command_id(route) ->
?COMMAND_ROUTE;
command_id(partitions) ->
?COMMAND_PARTITIONS.
?COMMAND_PARTITIONS;
command_id(consumer_update) ->
?COMMAND_CONSUMER_UPDATE.
parse_command_id(?COMMAND_DECLARE_PUBLISHER) ->
declare_publisher;
@ -992,7 +1060,9 @@ parse_command_id(?COMMAND_HEARTBEAT) ->
parse_command_id(?COMMAND_ROUTE) ->
route;
parse_command_id(?COMMAND_PARTITIONS) ->
partitions.
partitions;
parse_command_id(?COMMAND_CONSUMER_UPDATE) ->
consumer_update.
element_index(Element, List) ->
element_index(Element, List, 0).

View File

@ -103,6 +103,7 @@ roundtrip(_Config) ->
test_roundtrip({request, 99, {close, 99, <<"reason">>}}),
test_roundtrip({request, 99, {route, <<"rkey.*">>, <<"exchange">>}}),
test_roundtrip({request, 99, {partitions, <<"super stream">>}}),
test_roundtrip({request, 99, {consumer_update, 1, true}}),
%% RESPONSES
[test_roundtrip({response, 99, {Tag, 53}})
|| Tag
@ -128,10 +129,10 @@ roundtrip(_Config) ->
test_roundtrip({response, 0, {tune, 10000, 12345}}),
% %% NB: does not write correlation id
test_roundtrip({response, 0, {credit, 98, 200}}),
% %% TODO should route return a list of routed streams?
test_roundtrip({response, 99, {route, 1, <<"stream_name">>}}),
test_roundtrip({response, 99,
{partitions, 1, [<<"stream1">>, <<"stream2">>]}}),
test_roundtrip({response, 99, {consumer_update, 1, none}}),
ok.
roundtrip_metadata(_Config) ->