Use Erlang monitor to detect stream deletion/failure

This commit is contained in:
Arnaud Cogoluègnes 2020-06-22 18:33:54 +02:00
parent 5bd352737c
commit 7b3d8c4ad0
2 changed files with 56 additions and 97 deletions

View File

@ -22,7 +22,6 @@
%% API
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]).
-export([subscribe/2, unsubscribe/2]).
-record(state, {
configuration
@ -57,14 +56,6 @@ lookup_local_member(VirtualHost, Stream) ->
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
-spec subscribe(binary(), binary()) -> ok | {error, stream_not_found} | {error, internal_error}.
subscribe(VirtualHost, Stream) ->
gen_server:call(?MODULE, {subscribe, VirtualHost, Stream, self()}).
-spec unsubscribe(binary(), binary()) -> ok | {error, stream_not_found}.
unsubscribe(VirtualHost, Stream) ->
gen_server:call(?MODULE, {unsubscribe, VirtualHost, Stream, self()}).
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
@ -172,44 +163,6 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
_ ->
{error, stream_not_found}
end,
{reply, Res, State};
handle_call({subscribe, VirtualHost, Stream, SubscriberPid}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{name := StreamInternalName} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:subscribe(StreamInternalName, SubscriberPid) of
{ok, ok, _} ->
ok;
{ok, {error, not_found}, _} ->
{error, stream_not_found};
_ ->
{error, internal_error}
end;
_ ->
{error, stream_not_found}
end;
_ ->
{error, stream_not_found}
end,
{reply, Res, State};
handle_call({unsubscribe, VirtualHost, Stream, SubscriberPid}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{name := StreamInternalName} = amqqueue:get_type_state(Q),
rabbit_stream_coordinator:unsubscribe(StreamInternalName, SubscriberPid),
ok;
_ ->
{error, stream_not_found}
end;
_ ->
{error, stream_not_found}
end,
{reply, Res, State}.
handle_cast(_, State) ->

View File

@ -49,7 +49,7 @@
frame_max :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()},
subscribed_streams = sets:new() :: sets:set(binary())
monitors = #{} :: #{reference() => binary()}
}).
-record(configuration, {
@ -163,7 +163,7 @@ close(Transport, S) ->
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits,
heartbeater = Heartbeater} = Connection,
heartbeater = Heartbeater, monitors = Monitors} = Connection,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
{OK, Closed, Error} = Transport:messages(),
@ -199,17 +199,23 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
end,
listen_loop_post_auth(Transport, Connection1, State2, Configuration)
end;
{queue_deleted, #resource{name = Stream}} ->
C = clear_stream_subscription(Stream, Connection),
{Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of
{cleaned, NewConnection, NewState} ->
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
{NewConnection, NewState};
{not_cleaned, SameConnection, SameState} ->
{SameConnection, SameState}
{'DOWN', MonitorRef, process, _OsirisPid, _Reason} ->
{Connection1, State1} = case Monitors of
#{MonitorRef := Stream} ->
Monitors1 = maps:remove(MonitorRef, Monitors),
C = Connection#stream_connection{monitors = Monitors1},
case clean_state_after_stream_deletion_or_failure(Stream, C, State) of
{cleaned, NewConnection, NewState} ->
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
{NewConnection, NewState};
{not_cleaned, SameConnection, SameState} ->
{SameConnection, SameState}
end;
_ ->
{Connection, State}
end,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
{'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationIdList}}} ->
@ -268,18 +274,18 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
{heartbeat_send_error, Reason} ->
rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Reason]),
S1 = unsubscribe_from_all_streams(Connection),
close(Transport, S1);
C1 = demonitor_all_streams(Connection),
close(Transport, C1);
heartbeat_timeout ->
rabbit_log:info("Heartbeat timeout, closing connection~n"),
S1 = unsubscribe_from_all_streams(Connection),
close(Transport, S1);
C1 = demonitor_all_streams(Connection),
close(Transport, C1);
{Closed, S} ->
unsubscribe_from_all_streams(Connection),
demonitor_all_streams(Connection),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
unsubscribe_from_all_streams(Connection),
demonitor_all_streams(Connection),
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p~n", [M]),
@ -587,7 +593,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
stream = Stream
},
Connection1 = subscribe_to_stream(Stream, Connection),
Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, Connection),
response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
@ -633,7 +639,7 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre
%% to mitigate this, we remove the stream from the leaders cache
%% this way the stream leader will be looked up in the next publish command
%% and registered to.
C = unsubscribe_from_stream(Stream, Connection),
C = demonitor_stream(Stream, Connection),
{C, maps:remove(Stream, StreamSubscriptions),
maps:remove(Stream, StreamLeaders)
};
@ -701,8 +707,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
{ok, deleted} ->
response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId),
C = clear_stream_subscription(Stream, Connection),
{Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of
{Connection1, State1} = case clean_state_after_stream_deletion_or_failure(Stream, Connection, State) of
{cleaned, NewConnection, NewState} ->
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
@ -845,7 +850,7 @@ extract_stream_list(<<>>, Streams) ->
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
extract_stream_list(Rest, [Stream | Streams]).
clean_state_after_stream_deletion(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State) ->
case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of
{true, _} ->
@ -868,42 +873,43 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual
case lookup_leader_from_manager(VirtualHost, Stream) of
cluster_not_found ->
cluster_not_found;
ClusterPid ->
Connection1 = subscribe_to_stream(Stream, Connection),
{ClusterPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => ClusterPid}}}
LeaderPid ->
Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => LeaderPid}}}
end;
ClusterPid ->
{ClusterPid, Connection}
LeaderPid ->
{LeaderPid, Connection}
end.
lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
subscribe_to_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) ->
case rabbit_stream_manager:subscribe(VirtualHost, Stream) of
ok ->
Connection#stream_connection{subscribed_streams = sets:add_element(Stream, SubscribedStreams)};
M ->
rabbit_log:warning("Could not subscribe to ~p stream: ~p~n", [Stream, M]),
Connection
maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) ->
case lists:member(Stream, maps:values(Monitors)) of
true ->
Connection;
false ->
MonitorRef = monitor(process, Pid),
Connection#stream_connection{monitors = maps:put(MonitorRef, Stream, Monitors)}
end.
clear_stream_subscription(Stream, #stream_connection{subscribed_streams = SubscribedStreams} = Connection) ->
Connection#stream_connection{subscribed_streams = sets:del_element(Stream, SubscribedStreams)}.
demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) ->
Monitors = maps:fold(fun(MonitorRef, Strm, Acc) ->
case Strm of
Stream ->
Acc;
_ ->
maps:put(MonitorRef, Strm, Acc)
unsubscribe_from_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) ->
case rabbit_stream_manager:unsubscribe(VirtualHost, Stream) of
ok ->
ok;
M ->
rabbit_log:warning("Could not unsubscribe from ~p stream: ~p~n", [Stream, M]),
Connection
end,
Connection#stream_connection{subscribed_streams = sets:del_element(Stream, SubscribedStreams)}.
end
end, #{}, Monitors0),
Connection#stream_connection{monitors = Monitors}.
unsubscribe_from_all_streams(#stream_connection{subscribed_streams = SubscribedStreams} = Connection) ->
[unsubscribe_from_stream(Stream, Connection) || Stream <- sets:to_list(SubscribedStreams)],
Connection#stream_connection{subscribed_streams = sets:new()}.
demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) ->
lists:foreach(fun(MonitorRef) ->
demonitor(MonitorRef, [flush])
end, maps:values(Monitors)),
Connection#stream_connection{monitors = #{}}.
frame(Transport, #stream_connection{socket = S}, Frame) ->
FrameSize = byte_size(Frame),