diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 56622d9388..bc3dffc9e1 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -22,7 +22,6 @@ %% API -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]). --export([subscribe/2, unsubscribe/2]). -record(state, { configuration @@ -57,14 +56,6 @@ lookup_local_member(VirtualHost, Stream) -> topology(VirtualHost, Stream) -> gen_server:call(?MODULE, {topology, VirtualHost, Stream}). --spec subscribe(binary(), binary()) -> ok | {error, stream_not_found} | {error, internal_error}. -subscribe(VirtualHost, Stream) -> - gen_server:call(?MODULE, {subscribe, VirtualHost, Stream, self()}). - --spec unsubscribe(binary(), binary()) -> ok | {error, stream_not_found}. -unsubscribe(VirtualHost, Stream) -> - gen_server:call(?MODULE, {unsubscribe, VirtualHost, Stream, self()}). - stream_queue_arguments(Arguments) -> stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). @@ -172,44 +163,6 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> _ -> {error, stream_not_found} end, - {reply, Res, State}; -handle_call({subscribe, VirtualHost, Stream, SubscriberPid}, _From, State) -> - Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> - case is_stream_queue(Q) of - true -> - #{name := StreamInternalName} = amqqueue:get_type_state(Q), - case rabbit_stream_coordinator:subscribe(StreamInternalName, SubscriberPid) of - {ok, ok, _} -> - ok; - {ok, {error, not_found}, _} -> - {error, stream_not_found}; - _ -> - {error, internal_error} - end; - _ -> - {error, stream_not_found} - end; - _ -> - {error, stream_not_found} - end, - {reply, Res, State}; -handle_call({unsubscribe, VirtualHost, Stream, SubscriberPid}, _From, State) -> - Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> - case is_stream_queue(Q) of - true -> - #{name := StreamInternalName} = amqqueue:get_type_state(Q), - rabbit_stream_coordinator:unsubscribe(StreamInternalName, SubscriberPid), - ok; - _ -> - {error, stream_not_found} - end; - _ -> - {error, stream_not_found} - end, {reply, Res, State}. handle_cast(_, State) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 113a6edfd4..9e98fb1230 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -49,7 +49,7 @@ frame_max :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, - subscribed_streams = sets:new() :: sets:set(binary()) + monitors = #{} :: #{reference() => binary()} }). -record(configuration, { @@ -163,7 +163,7 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, - heartbeater = Heartbeater} = Connection, + heartbeater = Heartbeater, monitors = Monitors} = Connection, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> {OK, Closed, Error} = Transport:messages(), @@ -199,17 +199,23 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, end, listen_loop_post_auth(Transport, Connection1, State2, Configuration) end; - {queue_deleted, #resource{name = Stream}} -> - C = clear_stream_subscription(Stream, Connection), - {Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of - {cleaned, NewConnection, NewState} -> - StreamSize = byte_size(Stream), - FrameSize = 2 + 2 + 2 + 2 + StreamSize, - Transport:send(S, [<>]), - {NewConnection, NewState}; - {not_cleaned, SameConnection, SameState} -> - {SameConnection, SameState} + {'DOWN', MonitorRef, process, _OsirisPid, _Reason} -> + {Connection1, State1} = case Monitors of + #{MonitorRef := Stream} -> + Monitors1 = maps:remove(MonitorRef, Monitors), + C = Connection#stream_connection{monitors = Monitors1}, + case clean_state_after_stream_deletion_or_failure(Stream, C, State) of + {cleaned, NewConnection, NewState} -> + StreamSize = byte_size(Stream), + FrameSize = 2 + 2 + 2 + 2 + StreamSize, + Transport:send(S, [<>]), + {NewConnection, NewState}; + {not_cleaned, SameConnection, SameState} -> + {SameConnection, SameState} + end; + _ -> + {Connection, State} end, listen_loop_post_auth(Transport, Connection1, State1, Configuration); {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationIdList}}} -> @@ -268,18 +274,18 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, listen_loop_post_auth(Transport, Connection1, State1, Configuration); {heartbeat_send_error, Reason} -> rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Reason]), - S1 = unsubscribe_from_all_streams(Connection), - close(Transport, S1); + C1 = demonitor_all_streams(Connection), + close(Transport, C1); heartbeat_timeout -> rabbit_log:info("Heartbeat timeout, closing connection~n"), - S1 = unsubscribe_from_all_streams(Connection), - close(Transport, S1); + C1 = demonitor_all_streams(Connection), + close(Transport, C1); {Closed, S} -> - unsubscribe_from_all_streams(Connection), + demonitor_all_streams(Connection), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> - unsubscribe_from_all_streams(Connection), + demonitor_all_streams(Connection), rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p~n", [M]), @@ -587,7 +593,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, stream = Stream }, - Connection1 = subscribe_to_stream(Stream, Connection), + Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, Connection), response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId), @@ -633,7 +639,7 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre %% to mitigate this, we remove the stream from the leaders cache %% this way the stream leader will be looked up in the next publish command %% and registered to. - C = unsubscribe_from_stream(Stream, Connection), + C = demonitor_stream(Stream, Connection), {C, maps:remove(Stream, StreamSubscriptions), maps:remove(Stream, StreamLeaders) }; @@ -701,8 +707,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of {ok, deleted} -> response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId), - C = clear_stream_subscription(Stream, Connection), - {Connection1, State1} = case clean_state_after_stream_deletion(Stream, C, State) of + {Connection1, State1} = case clean_state_after_stream_deletion_or_failure(Stream, Connection, State) of {cleaned, NewConnection, NewState} -> StreamSize = byte_size(Stream), FrameSize = 2 + 2 + 2 + 2 + StreamSize, @@ -845,7 +850,7 @@ extract_stream_list(<<>>, Streams) -> extract_stream_list(<>, Streams) -> extract_stream_list(Rest, [Stream | Streams]). -clean_state_after_stream_deletion(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection, +clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State) -> case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of {true, _} -> @@ -868,42 +873,43 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual case lookup_leader_from_manager(VirtualHost, Stream) of cluster_not_found -> cluster_not_found; - ClusterPid -> - Connection1 = subscribe_to_stream(Stream, Connection), - {ClusterPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => ClusterPid}}} + LeaderPid -> + Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection), + {LeaderPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => LeaderPid}}} end; - ClusterPid -> - {ClusterPid, Connection} + LeaderPid -> + {LeaderPid, Connection} end. lookup_leader_from_manager(VirtualHost, Stream) -> rabbit_stream_manager:lookup_leader(VirtualHost, Stream). -subscribe_to_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) -> - case rabbit_stream_manager:subscribe(VirtualHost, Stream) of - ok -> - Connection#stream_connection{subscribed_streams = sets:add_element(Stream, SubscribedStreams)}; - M -> - rabbit_log:warning("Could not subscribe to ~p stream: ~p~n", [Stream, M]), - Connection +maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) -> + case lists:member(Stream, maps:values(Monitors)) of + true -> + Connection; + false -> + MonitorRef = monitor(process, Pid), + Connection#stream_connection{monitors = maps:put(MonitorRef, Stream, Monitors)} end. -clear_stream_subscription(Stream, #stream_connection{subscribed_streams = SubscribedStreams} = Connection) -> - Connection#stream_connection{subscribed_streams = sets:del_element(Stream, SubscribedStreams)}. +demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) -> + Monitors = maps:fold(fun(MonitorRef, Strm, Acc) -> + case Strm of + Stream -> + Acc; + _ -> + maps:put(MonitorRef, Strm, Acc) -unsubscribe_from_stream(Stream, #stream_connection{virtual_host = VirtualHost, subscribed_streams = SubscribedStreams} = Connection) -> - case rabbit_stream_manager:unsubscribe(VirtualHost, Stream) of - ok -> - ok; - M -> - rabbit_log:warning("Could not unsubscribe from ~p stream: ~p~n", [Stream, M]), - Connection - end, - Connection#stream_connection{subscribed_streams = sets:del_element(Stream, SubscribedStreams)}. + end + end, #{}, Monitors0), + Connection#stream_connection{monitors = Monitors}. -unsubscribe_from_all_streams(#stream_connection{subscribed_streams = SubscribedStreams} = Connection) -> - [unsubscribe_from_stream(Stream, Connection) || Stream <- sets:to_list(SubscribedStreams)], - Connection#stream_connection{subscribed_streams = sets:new()}. +demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) -> + lists:foreach(fun(MonitorRef) -> + demonitor(MonitorRef, [flush]) + end, maps:values(Monitors)), + Connection#stream_connection{monitors = #{}}. frame(Transport, #stream_connection{socket = S}, Frame) -> FrameSize = byte_size(Frame),