Use coordinator to get notified on stream deletion

This commit is contained in:
Arnaud Cogoluègnes 2020-06-18 08:58:59 +02:00
parent 625eaec004
commit a1314a13c9
2 changed files with 121 additions and 45 deletions

View File

@ -21,7 +21,8 @@
%% API
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([start_link/1, create/4, register/0, delete/3, lookup_leader/2, lookup_local_member/2, topology/2, unregister/0]).
-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]).
-export([subscribe/2, unsubscribe/2]).
-record(state, {
configuration, listeners, monitors
@ -43,14 +44,6 @@ create(VirtualHost, Reference, Arguments, Username) ->
delete(VirtualHost, Reference, Username) ->
gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
-spec register() -> ok.
register() ->
gen_server:call(?MODULE, {register, self()}).
-spec unregister() -> ok.
unregister() ->
gen_server:call(?MODULE, {unregister, self()}).
-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
@ -64,6 +57,14 @@ lookup_local_member(VirtualHost, Stream) ->
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
-spec subscribe(binary(), binary()) -> ok | {error, stream_not_found} | {error, internal_error}.
subscribe(VirtualHost, Stream) ->
gen_server:call(?MODULE, {subscribe, VirtualHost, Stream, self()}).
-spec unsubscribe(binary(), binary()) -> ok | {error, stream_not_found}.
unsubscribe(VirtualHost, Stream) ->
gen_server:call(?MODULE, {unsubscribe, VirtualHost, Stream, self()}).
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
@ -189,6 +190,44 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
_ ->
{error, stream_not_found}
end,
{reply, Res, State};
handle_call({subscribe, VirtualHost, Stream, SubscriberPid}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{name := StreamInternalName} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:subscribe(StreamInternalName, SubscriberPid) of
{ok, ok, _} ->
ok;
{ok, {error, not_found}, _} ->
{error, stream_not_found};
_ ->
{error, internal_error}
end;
_ ->
{error, stream_not_found}
end;
_ ->
{error, stream_not_found}
end,
{reply, Res, State};
handle_call({unsubscribe, VirtualHost, Stream, SubscriberPid}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{name := StreamInternalName} = amqqueue:get_type_state(Q),
rabbit_stream_coordinator:unsubscribe(StreamInternalName, SubscriberPid),
ok;
_ ->
{error, stream_not_found}
end;
_ ->
{error, stream_not_found}
end,
{reply, Res, State}.
handle_cast(_, State) ->

View File

@ -39,7 +39,7 @@
name :: string(),
helper_sup :: pid(),
socket :: rabbit_net:socket(),
clusters :: #{binary() => pid()},
stream_leaders :: #{binary() => pid()},
stream_subscriptions :: #{binary() => [integer()]},
credits :: atomics:atomics_ref(),
authentication_state :: atom(),
@ -48,7 +48,8 @@
connection_step :: atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
frame_max :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()}
client_properties = #{} :: #{binary() => binary()},
subscribed_streams = sets:new() :: sets:set(binary())
}).
-record(configuration, {
@ -79,14 +80,13 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
rabbit_stream_manager:register(),
Credits = atomics:new(1, [{signed, true}]),
init_credit(Credits, InitialCredits),
Connection = #stream_connection{
name = ConnStr,
helper_sup = KeepaliveSup,
socket = RealSocket,
clusters = #{},
stream_leaders = #{},
stream_subscriptions = #{},
credits = Credits,
authentication_state = none,
@ -161,7 +161,8 @@ close(Transport, S) ->
Transport:close(S).
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits, heartbeater = Heartbeater} = Connection,
stream_subscriptions = StreamSubscriptions, credits = Credits,
heartbeater = Heartbeater} = Connection,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
{OK, Closed, Error} = Transport:messages(),
@ -197,13 +198,14 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
end,
listen_loop_post_auth(Transport, Connection1, State2, Configuration)
end;
{stream_manager, cluster_deleted, ClusterReference} ->
{Connection1, State1} = case clean_state_after_stream_deletion(ClusterReference, Connection, State) of
{queue_deleted, #resource{name = Stream}} ->
C = unsubscribe_from_stream(Stream, Connection),
{Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of
{cleaned, NewConnection, NewState} ->
StreamSize = byte_size(ClusterReference),
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, ClusterReference/binary>>]),
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
{NewConnection, NewState};
{not_cleaned, SameConnection, SameState} ->
{SameConnection, SameState}
@ -265,18 +267,18 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
{heartbeat_send_error, Reason} ->
rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Reason]),
rabbit_stream_manager:unregister(),
close(Transport, S);
S1 = unsubscribe_from_all_streams(S),
close(Transport, S1);
heartbeat_timeout ->
rabbit_log:info("Heartbeat timeout, closing connection~n"),
rabbit_stream_manager:unregister(),
close(Transport, S);
S1 = unsubscribe_from_all_streams(S),
close(Transport, S1);
{Closed, S} ->
rabbit_stream_manager:unregister(),
unsubscribe_from_all_streams(S),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
rabbit_stream_manager:unregister(),
unsubscribe_from_all_streams(S),
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p~n", [M]),
@ -487,7 +489,6 @@ handle_frame_pre_auth(Transport, #stream_connection{helper_sup = SupPid, socket
SupPid, Sock, ConnectionName,
Heartbeat, SendFun, Heartbeat, ReceiveFun),
{Connection#stream_connection{connection_step = tuned, frame_max = FrameMax, heartbeater = Heartbeater}, State, Rest};
handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State,
<<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32,
@ -571,7 +572,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
credit = Credit,
stream = Stream
},
rabbit_log:info("registering consumer ~p in ~p~n", [ConsumerState, self()]),
Connection1 = subscribe_to_stream(Stream, Connection),
response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
@ -588,10 +590,11 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
_ ->
StreamSubscriptions#{Stream => [SubscriptionId]}
end,
{Connection#stream_connection{stream_subscriptions = StreamSubscriptions1}, State#stream_connection_state{consumers = Consumers1}, Rest}
{Connection1#stream_connection{stream_subscriptions = StreamSubscriptions1}, State#stream_connection_state{consumers = Consumers1}, Rest}
end
end;
handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions, clusters = Clusters} = Connection,
handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions,
stream_leaders = StreamLeaders} = Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32>>, Rest) ->
case subscription_exists(StreamSubscriptions, SubscriptionId) of
@ -603,21 +606,27 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre
Stream = Consumer#consumer.stream,
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
{StreamSubscriptions1, Clusters1} =
{Connection1, StreamSubscriptions1, StreamLeaders1} =
case length(SubscriptionsForThisStream1) of
0 ->
%% no more subscriptions for this stream
{maps:remove(Stream, StreamSubscriptions),
maps:remove(Stream, Clusters)
%% we unregister even though it could affect publishing if the stream is published to
%% from this connection and is deleted.
%% to mitigate this, we remove the stream from the leaders cache
%% this way the stream leader will be looked up in the next publish command
%% and registered to.
C = unsubscribe_from_stream(Stream, Connection),
{C, maps:remove(Stream, StreamSubscriptions),
maps:remove(Stream, StreamLeaders)
};
_ ->
{StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, Clusters}
{Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders}
end,
Consumers1 = maps:remove(SubscriptionId, Consumers),
response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
{Connection#stream_connection{
{Connection1#stream_connection{
stream_subscriptions = StreamSubscriptions1,
clusters = Clusters1
stream_leaders = StreamLeaders1
}, State#stream_connection_state{consumers = Consumers1}, Rest}
end;
handle_frame_post_auth(Transport, Connection, #stream_connection_state{consumers = Consumers} = State,
@ -663,8 +672,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
{ok, deleted} ->
response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId),
%% FIXME add connection and state parameters (and change return type)
{Connection1, State1} = case clean_state_after_stream_deletion(Stream, Connection, State) of
C = unsubscribe_from_stream(Stream, Connection),
{Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of
{cleaned, NewConnection, NewState} ->
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
@ -803,35 +812,63 @@ extract_stream_list(<<>>, Streams) ->
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
extract_stream_list(Rest, [Stream | Streams]).
clean_state_after_stream_deletion(Stream, #stream_connection{clusters = Clusters, stream_subscriptions = StreamSubscriptions} = Connection,
clean_state_after_stream_deletion(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State) ->
case maps:is_key(Stream, StreamSubscriptions) of
true ->
case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of
{true, _} ->
#{Stream := SubscriptionIds} = StreamSubscriptions,
{cleaned, Connection#stream_connection{
clusters = maps:remove(Stream, Clusters),
stream_leaders = maps:remove(Stream, StreamLeaders),
stream_subscriptions = maps:remove(Stream, StreamSubscriptions)
}, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
false ->
{false, true} ->
{cleaned, Connection#stream_connection{
stream_leaders = maps:remove(Stream, StreamLeaders)
}, State};
{false, false} ->
{not_cleaned, Connection, State}
end.
lookup_leader(Stream, #stream_connection{clusters = Clusters, virtual_host = VirtualHost} = State) ->
case maps:get(Stream, Clusters, undefined) of
lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) ->
case maps:get(Stream, StreamLeaders, undefined) of
undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of
cluster_not_found ->
cluster_not_found;
ClusterPid ->
{ClusterPid, State#stream_connection{clusters = Clusters#{Stream => ClusterPid}}}
Connection1 = subscribe_to_stream(Stream, Connection),
{ClusterPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => ClusterPid}}}
end;
ClusterPid ->
{ClusterPid, State}
{ClusterPid, Connection}
end.
lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
subscribe_to_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) ->
case rabbit_stream_manager:subscribe(VirtualHost, Stream) of
ok ->
Connection#stream_connection{subscribed_streams = sets:add_element(Stream, SubscribedStreams)};
M ->
rabbit_log:warning("Could not subscribe to ~p stream: ~p~n", [Stream, M]),
Connection
end.
unsubscribe_from_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) ->
case rabbit_stream_manager:unsubscribe(VirtualHost, Stream) of
ok ->
ok;
M ->
rabbit_log:warning("Could not unsubscribe from ~p stream: ~p~n", [Stream, M]),
Connection
end,
Connection#stream_connection{subscribed_streams = sets:del_element(Stream, SubscribedStreams)}.
unsubscribe_from_all_streams(#stream_connection{subscribed_streams = SubscribedStreams} = Connection) ->
[unsubscribe_from_stream(Stream, Connection) || Stream <- sets:to_list(SubscribedStreams)],
Connection#stream_connection{subscribed_streams = sets:new()}.
frame(Transport, #stream_connection{socket = S}, Frame) ->
FrameSize = byte_size(Frame),
Transport:send(S, [<<FrameSize:32>>, Frame]).