From 4ff2c4a03dbfd6646ffb33d80933e3ca9e750394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 15 Dec 2021 11:16:53 +0100 Subject: [PATCH] Refactor stream SAC coordinator Separate logic between single SAC and SAC in a partition of a super stream (makes code clearer), wait for former active consumer notification to select the new active consumer (avoids a lock of some sort on the group, so consumers can come and go). References #3753 --- deps/rabbit/src/rabbit_stream_coordinator.erl | 7 +- .../src/rabbit_stream_sac_coordinator.erl | 333 +++++++++++++----- .../src/rabbit_stream_reader.erl | 45 ++- 3 files changed, 276 insertions(+), 109 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 818e0b2868..a0a8643161 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -45,7 +45,7 @@ state/0]). %% Single Active Consumer API --export([register_consumer/6, unregister_consumer/5]). +-export([register_consumer/6, unregister_consumer/5, activate_consumer/3]). -rabbit_boot_step({?MODULE, [{description, "Restart stream coordinator"}, @@ -299,6 +299,11 @@ unregister_consumer(VirtualHost, {unregister_consumer, VirtualHost, Stream, ConsumerName, ConnectionPid, SubscriptionId}}), Res. +-spec activate_consumer(binary(), binary(), binary()) -> ok. +activate_consumer(VirtualHost, Stream, ConsumerName) -> + {ok, Res, _} = process_command({sac, {activate_consumer, VirtualHost, Stream, ConsumerName}}), + Res. + process_command(Cmd) -> Servers = ensure_coordinator_started(), process_command(Servers, Cmd). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 8966008610..b74645eb8a 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -17,12 +17,27 @@ -module(rabbit_stream_sac_coordinator). -type vhost() :: binary(). +-type partition_index() :: integer(). -type stream() :: binary(). -type consumer_name() :: binary(). +-type connection_pid() :: pid(). -type subscription_id() :: byte(). -opaque command() :: - {register_consumer, vhost()} | {unregister_consumer, vhost()}. + {register_consumer, + vhost(), + stream(), + partition_index(), + consumer_name(), + connection_pid(), + subscription_id()} | + {unregister_consumer, + vhost(), + stream(), + consumer_name(), + connection_pid(), + subscription_id() | + {activate_consumer, vhost(), stream(), consumer_name()}}. -record(consumer, {pid :: pid(), subscription_id :: subscription_id(), @@ -74,38 +89,14 @@ apply({register_consumer, PartitionIndex, ConsumerName, StreamGroups0), - Group0 = - lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups1), - rabbit_log:debug("Group: ~p", [Group0]), - FormerActive = - case lookup_active_consumer(Group0) of - {value, FA} -> - FA; - false -> - undefined - end, - Consumer0 = - #consumer{pid = ConnectionPid, - subscription_id = SubscriptionId, - active = false}, - Group1 = add_to_group(Consumer0, Group0), - rabbit_log:debug("Consumer added to group: ~p", [Group1]), - Group2 = compute_active_consumer(Group1), - rabbit_log:debug("Consumers in group after active consumer computation: ~p", - [Group2]), - StreamGroups2 = - update_groups(VirtualHost, - Stream, - ConsumerName, - Group2, - StreamGroups1), - - {value, Consumer1} = - lookup_consumer(ConnectionPid, SubscriptionId, Group2), - Effects = notify_consumers(FormerActive, Consumer1, Group2), - #consumer{active = Active} = Consumer1, - {State#?MODULE{groups = StreamGroups2}, {ok, Active}, Effects}; + do_register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + SubscriptionId, + State#?MODULE{groups = StreamGroups1}); apply({unregister_consumer, VirtualHost, Stream, @@ -124,30 +115,10 @@ apply({unregister_consumer, {value, Consumer} -> rabbit_log:debug("Unregistering consumer ~p from group", [Consumer]), - {value, ActiveInPreviousGroupInstance} = - lookup_active_consumer(Group0), G1 = remove_from_group(Consumer, Group0), rabbit_log:debug("Consumer removed from group: ~p", [G1]), - G2 = compute_active_consumer(G1), - rabbit_log:debug("Consumers in group after active consumer computation: ~p", - [G2]), - NewActive = - case lookup_active_consumer(G2) of - {value, AC} -> - AC; - false -> - undefined - end, - AIPGI = - case ActiveInPreviousGroupInstance of - Consumer -> - undefined; - _ -> - ActiveInPreviousGroupInstance - end, - Effs = notify_consumers(AIPGI, NewActive, G2), - {G2, Effs}; + handle_consumer_removal(G1, Consumer); false -> rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p", [ConnectionPid, @@ -164,7 +135,209 @@ apply({unregister_consumer, StreamGroups0), {State0#?MODULE{groups = SGS}, Effects} end, - {State1, ok, Effects1}. + {State1, ok, Effects1}; +apply({activate_consumer, VirtualHost, Stream, ConsumerName}, + #?MODULE{groups = StreamGroups0} = State0) -> + {G, Eff} = + case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + undefined -> + rabbit_log:warning("trying to activate consumer in group ~p, but " + "the group does not longer exist", + [{VirtualHost, Stream, ConsumerName}]); + Group -> + #consumer{pid = Pid, subscription_id = SubId} = + evaluate_active_consumer(Group), + Group1 = + update_consumer_state_in_group(Group, Pid, SubId, true), + {Group1, + [mod_call_effect(Pid, + {sac, + {{subscription_id, SubId}, {active, true}, + {extra, []}}})]} + end, + StreamGroups1 = + update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0), + {State0#?MODULE{groups = StreamGroups1}, ok, Eff}. + +do_register_consumer(VirtualHost, + Stream, + -1, + ConsumerName, + ConnectionPid, + SubscriptionId, + #?MODULE{groups = StreamGroups0} = State) -> + Group0 = + lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), + + rabbit_log:debug("Group: ~p", [Group0]), + Consumer = + case lookup_active_consumer(Group0) of + {value, _} -> + #consumer{pid = ConnectionPid, + subscription_id = SubscriptionId, + active = false}; + false -> + #consumer{pid = ConnectionPid, + subscription_id = SubscriptionId, + active = true} + end, + Group1 = add_to_group(Consumer, Group0), + rabbit_log:debug("Consumer added to group: ~p", [Group1]), + StreamGroups1 = + update_groups(VirtualHost, + Stream, + ConsumerName, + Group1, + StreamGroups0), + + #consumer{active = Active} = Consumer, + Effects = + case Consumer of + #consumer{active = true} -> + [mod_call_effect(ConnectionPid, + {sac, + {{subscription_id, SubscriptionId}, + {active, Active}, {extra, []}}})]; + _ -> + [] + end, + + {State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}; +do_register_consumer(VirtualHost, + Stream, + _, + ConsumerName, + ConnectionPid, + SubscriptionId, + #?MODULE{groups = StreamGroups0} = State) -> + Group0 = + lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), + + rabbit_log:debug("Group: ~p", [Group0]), + {Group1, Effects} = + case Group0 of + #group{consumers = []} -> + %% first consumer in the group, it's the active one + Consumer0 = + #consumer{pid = ConnectionPid, + subscription_id = SubscriptionId, + active = true}, + G1 = add_to_group(Consumer0, Group0), + {G1, + [mod_call_effect(ConnectionPid, + {sac, + {{subscription_id, SubscriptionId}, + {active, true}, {extra, []}}})]}; + _G -> + %% whatever the current state is, the newcomer will be passive + Consumer0 = + #consumer{pid = ConnectionPid, + subscription_id = SubscriptionId, + active = false}, + G1 = add_to_group(Consumer0, Group0), + + case lookup_active_consumer(G1) of + {value, + #consumer{pid = ActPid, subscription_id = ActSubId} = + CurrentActive} -> + case evaluate_active_consumer(G1) of + CurrentActive -> + %% the current active stays the same + {G1, []}; + _ -> + %% there's a change, telling the active it's not longer active + {update_consumer_state_in_group(G1, + ActPid, + ActSubId, + false), + [mod_call_effect(ActPid, + {sac, + {{subscription_id, ActSubId}, + {active, false}, + {extra, + [{stepping_down, + true}]}}})]} + end; + undefined -> + %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + {G1, []} + end + end, + StreamGroups1 = + update_groups(VirtualHost, + Stream, + ConsumerName, + Group1, + StreamGroups0), + {value, #consumer{active = Active}} = + lookup_consumer(ConnectionPid, SubscriptionId, Group1), + {State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}. + +handle_consumer_removal(#group{consumers = []} = G, _) -> + {G, []}; +handle_consumer_removal(#group{partition_index = -1} = Group0, + Consumer) -> + case Consumer of + #consumer{active = true} -> + Group1 = compute_active_consumer(Group0), + rabbit_log:debug("This is the active consumer, group after active " + "consumer calculation: ~p", + [Group1]), + case lookup_active_consumer(Group1) of + {value, #consumer{pid = Pid, subscription_id = SubId} = C} -> + rabbit_log:debug("Creating side effect to notify new active consumer ~p", + [C]), + {Group1, + [mod_call_effect(Pid, + {sac, + {{subscription_id, SubId}, + {active, true}, {extra, []}}})]}; + _ -> + rabbit_log:debug("No active consumer found in the group, nothing " + "to do"), + {Group1, []} + end; + #consumer{active = false} -> + rabbit_log:debug("Not the active consumer, nothing to do."), + {Group0, []} + end; +handle_consumer_removal(Group0, Consumer) -> + case lookup_active_consumer(Group0) of + {value, + #consumer{pid = ActPid, subscription_id = ActSubId} = + CurrentActive} -> + case evaluate_active_consumer(Group0) of + CurrentActive -> + %% the current active stays the same + {Group0, []}; + _ -> + %% there's a change, telling the active it's not longer active + {update_consumer_state_in_group(Group0, + ActPid, + ActSubId, + false), + [mod_call_effect(ActPid, + {sac, + {{subscription_id, ActSubId}, + {active, false}, + {extra, [{stepping_down, true}]}}})]} + end; + false -> + case Consumer#consumer.active of + true -> + %% the active one is going away, picking a new one + #consumer{pid = P, subscription_id = SID} = + evaluate_active_consumer(Group0), + {update_consumer_state_in_group(Group0, P, SID, true), + [mod_call_effect(P, + {sac, + {{subscription_id, SID}, {active, true}, + {extra, []}}})]}; + false -> + %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + {Group0, []} + end + end. maybe_create_group(VirtualHost, Stream, @@ -181,7 +354,8 @@ maybe_create_group(VirtualHost, end. lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) -> - maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups). + maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups, + undefined). add_to_group(Consumer, #group{consumers = Consumers} = Group) -> Group#group{consumers = Consumers ++ [Consumer]}. @@ -219,6 +393,11 @@ compute_active_consumer(#group{partition_index = PartitionIndex, {length(Consumers0) - 1, []}, Consumers0), Group#group{consumers = Consumers1}. +evaluate_active_consumer(#group{partition_index = PartitionIndex, + consumers = Consumers}) -> + ActiveConsumerIndex = PartitionIndex rem length(Consumers), + lists:nth(ActiveConsumerIndex + 1, Consumers). + lookup_consumer(ConnectionPid, SubscriptionId, #group{consumers = Consumers}) -> lists:search(fun(#consumer{pid = ConnPid, subscription_id = SubId}) -> @@ -232,35 +411,6 @@ lookup_active_consumer(#group{consumers = Consumers}) -> notify_consumers(_, _, #group{consumers = []}) -> []; -notify_consumers(_, - #consumer{pid = ConnectionPid, - subscription_id = SubscriptionId} = - NewConsumer, - #group{partition_index = -1, consumers = [NewConsumer]}) -> - [mod_call_effect(ConnectionPid, - {sac, - {{subscription_id, SubscriptionId}, {active, true}, - {side_effects, []}}})]; -notify_consumers(_, - #consumer{pid = ConnectionPid, - subscription_id = SubscriptionId} = - NewConsumer, - #group{partition_index = -1, consumers = [NewConsumer | _]}) -> - [mod_call_effect(ConnectionPid, - {sac, - {{subscription_id, SubscriptionId}, {active, true}, - {side_effects, []}}})]; -notify_consumers(_, - #consumer{pid = ConnectionPid, - subscription_id = SubscriptionId}, - #group{partition_index = -1, consumers = _}) -> - %% notifying a newcomer that it's inactive - %% FIXME is consumer update always necessary for inactive newcomers? - %% can't they assume they are inactive by default? - [mod_call_effect(ConnectionPid, - {sac, - {{subscription_id, SubscriptionId}, {active, false}, - {side_effects, []}}})]; notify_consumers(undefined, #consumer{pid = ConnectionPid, subscription_id = SubscriptionId}, @@ -315,6 +465,21 @@ update_groups(VirtualHost, StreamGroups) -> maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups). +update_consumer_state_in_group(#group{consumers = Consumers0} = G, + Pid, + SubId, + NewState) -> + CS1 = lists:foldr(fun(C0, Acc) -> + case C0 of + #consumer{pid = Pid, subscription_id = SubId} -> + C1 = C0#consumer{active = NewState}, + [C1 | Acc]; + C -> [C | Acc] + end + end, + [], Consumers0), + G#group{consumers = CS1}. + mod_call_effect(Pid, Msg) -> {mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 994ec7c30d..13052ab8ab 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -777,9 +777,7 @@ open(info, {OK, S, Data}, connection_state = State2}} end; open(info, - {sac, - {{subscription_id, SubId}, {active, Active}, - {side_effects, Effects}}}, + {sac, {{subscription_id, SubId}, {active, Active}, {extra, Extra}}}, #statem_data{transport = Transport, connection = Connection0, connection_state = ConnState0} = @@ -817,7 +815,7 @@ open(info, SubId, Active, true, - Effects), + Extra), {Conn1, ConnState0#stream_connection_state{consumers = Consumers0#{SubId => @@ -2434,8 +2432,7 @@ handle_frame_post_auth(Transport, ResponseOffsetSpec}}) -> %% FIXME check response code? It's supposed to be OK all the time. case maps:take(CorrelationId, Requests0) of - {{{subscription_id, SubscriptionId}, {side_effects, SideEffects}}, - Rs} -> + {{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} -> rabbit_log:debug("Received consumer update response for subscription ~p", [SubscriptionId]), Consumers1 = @@ -2513,16 +2510,28 @@ handle_frame_post_auth(Transport, Consumers#{SubscriptionId => Consumer2}; #{SubscriptionId := #consumer{configuration = - #consumer_configuration{active = - false}}} -> + #consumer_configuration{active = false, + stream = Stream, + properties = + Properties}}} -> rabbit_log:debug("Not an active consumer"), + + case Extra of + [{stepping_down, true}] -> + ConsumerName = consumer_name(Properties), + rabbit_stream_coordinator:activate_consumer(VirtualHost, + Stream, + ConsumerName); + _ -> + ok + end, + Consumers; _ -> rabbit_log:debug("No consumer found for subscription ~p", [SubscriptionId]), Consumers end, - apply_sac_side_effects(SideEffects), {Connection#stream_connection{outstanding_requests = Rs}, State#stream_connection_state{consumers = Consumers1}}; @@ -2689,7 +2698,7 @@ maybe_register_consumer(VirtualHost, SubscriptionId), Active. -maybe_notify_consumer(_, Connection, _, _, _, false = _Sac) -> +maybe_notify_consumer(_, Connection, _, _, false = _Sac, _) -> Connection; maybe_notify_consumer(Transport, #stream_connection{socket = S, @@ -2700,7 +2709,7 @@ maybe_notify_consumer(Transport, SubscriptionId, Active, true = _Sac, - SideEffects) -> + Extra) -> rabbit_log:debug("SAC subscription ~p, active = ~p", [SubscriptionId, Active]), Frame = @@ -2709,8 +2718,7 @@ maybe_notify_consumer(Transport, OutstandingRequests1 = maps:put(CorrIdSeq, - {{subscription_id, SubscriptionId}, - {side_effects, SideEffects}}, + {{subscription_id, SubscriptionId}, {extra, Extra}}, OutstandingRequests0), send(Transport, S, Frame), Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1, @@ -2749,17 +2757,6 @@ partition_index(VirtualHost, Stream, Properties) -> -1 end. -apply_sac_side_effects([]) -> - ok; -apply_sac_side_effects([Effect | T]) -> - apply_sac_side_effect(Effect), - apply_sac_side_effects(T). - -apply_sac_side_effect({message, Pid, Msg}) -> - Pid ! Msg; -apply_sac_side_effect(Effect) -> - rabbit_log:warning("Unknown SAC side effect: ~p", [Effect]). - notify_connection_closed(#statem_data{connection = #stream_connection{name = Name, publishers =