| 
									
										
										
										
											2020-07-14 08:51:47 +08:00
										 |  |  | %% This Source Code Form is subject to the terms of the Mozilla Public
 | 
					
						
							|  |  |  | %% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
					
						
							|  |  |  | %% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | %%
 | 
					
						
							| 
									
										
										
										
											2021-01-22 14:00:14 +08:00
										 |  |  | %% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | %%
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -module(rabbit_maintenance). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  | -include("rabbit.hrl"). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -export([ | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  |     is_enabled/0, | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     drain/0, | 
					
						
							|  |  |  |     revive/0, | 
					
						
							|  |  |  |     mark_as_being_drained/0, | 
					
						
							|  |  |  |     unmark_as_being_drained/0, | 
					
						
							|  |  |  |     is_being_drained_local_read/1, | 
					
						
							|  |  |  |     is_being_drained_consistent_read/1, | 
					
						
							|  |  |  |     status_local_read/1, | 
					
						
							|  |  |  |     status_consistent_read/1, | 
					
						
							|  |  |  |     filter_out_drained_nodes_local_read/1, | 
					
						
							|  |  |  |     filter_out_drained_nodes_consistent_read/1, | 
					
						
							|  |  |  |     suspend_all_client_listeners/0, | 
					
						
							|  |  |  |     resume_all_client_listeners/0, | 
					
						
							|  |  |  |     close_all_client_connections/0, | 
					
						
							|  |  |  |     primary_replica_transfer_candidate_nodes/0, | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  |     random_primary_replica_transfer_candidate_node/2, | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     transfer_leadership_of_quorum_queues/1, | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  |     transfer_leadership_of_classic_mirrored_queues/1, | 
					
						
							|  |  |  |     status_table_name/0, | 
					
						
							|  |  |  |     status_table_definition/0 | 
					
						
							|  |  |  | ]). | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | -define(TABLE, rabbit_node_maintenance_states). | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  | -define(FEATURE_FLAG, maintenance_mode_status). | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  | -define(DEFAULT_STATUS,  regular). | 
					
						
							|  |  |  | -define(DRAINING_STATUS, draining). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -type maintenance_status() :: ?DEFAULT_STATUS | ?DRAINING_STATUS. | 
					
						
							| 
									
										
										
										
											2020-11-02 18:40:24 +08:00
										 |  |  | -type mnesia_table() :: atom(). | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | -export_type([ | 
					
						
							|  |  |  |     maintenance_status/0 | 
					
						
							|  |  |  | ]). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | %%
 | 
					
						
							|  |  |  | %% API
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-02 18:40:24 +08:00
										 |  |  | -spec status_table_name() -> mnesia_table(). | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  | status_table_name() -> | 
					
						
							|  |  |  |     ?TABLE. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -spec status_table_definition() -> list(). | 
					
						
							|  |  |  | status_table_definition() -> | 
					
						
							|  |  |  |     maps:to_list(#{ | 
					
						
							|  |  |  |         record_name => node_maintenance_state, | 
					
						
							|  |  |  |         attributes  => record_info(fields, node_maintenance_state) | 
					
						
							|  |  |  |     }). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -spec is_enabled() -> boolean(). | 
					
						
							|  |  |  | is_enabled() -> | 
					
						
							|  |  |  |     rabbit_feature_flags:is_enabled(?FEATURE_FLAG). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-25 22:59:08 +08:00
										 |  |  | -spec drain() -> ok. | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  | drain() -> | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  |     case is_enabled() of | 
					
						
							|  |  |  |         true  -> do_drain(); | 
					
						
							|  |  |  |         false -> rabbit_log:warning("Feature flag `~s` is not enabled, draining is a no-op", [?FEATURE_FLAG]) | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -spec do_drain() -> ok. | 
					
						
							|  |  |  | do_drain() -> | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     rabbit_log:alert("This node is being put into maintenance (drain) mode"), | 
					
						
							|  |  |  |     mark_as_being_drained(), | 
					
						
							|  |  |  |     rabbit_log:info("Marked this node as undergoing maintenance"), | 
					
						
							|  |  |  |     suspend_all_client_listeners(), | 
					
						
							|  |  |  |     rabbit_log:alert("Suspended all listeners and will no longer accept client connections"), | 
					
						
							|  |  |  |     {ok, NConnections} = close_all_client_connections(), | 
					
						
							| 
									
										
										
										
											2020-07-09 00:10:55 +08:00
										 |  |  |     %% allow plugins to react e.g. by closing their protocol connections
 | 
					
						
							|  |  |  |     rabbit_event:notify(maintenance_connections_closed, #{ | 
					
						
							|  |  |  |         reason => <<"node is being put into maintenance">> | 
					
						
							|  |  |  |     }), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     rabbit_log:alert("Closed ~b local client connections", [NConnections]), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     TransferCandidates = primary_replica_transfer_candidate_nodes(), | 
					
						
							| 
									
										
										
										
											2021-01-27 23:52:17 +08:00
										 |  |  |     %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
 | 
					
						
							|  |  |  |     %% in the cluster, unlike with CMQs.
 | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |     transfer_leadership_of_quorum_queues(TransferCandidates), | 
					
						
							| 
									
										
										
										
											2020-10-20 00:47:05 +08:00
										 |  |  |     stop_local_quorum_queue_followers(), | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-07-09 00:10:55 +08:00
										 |  |  |     %% allow plugins to react
 | 
					
						
							|  |  |  |     rabbit_event:notify(maintenance_draining, #{ | 
					
						
							|  |  |  |         reason => <<"node is being put into maintenance">> | 
					
						
							|  |  |  |     }), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     ok. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-25 22:59:08 +08:00
										 |  |  | -spec revive() -> ok. | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  | revive() -> | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  |     case is_enabled() of | 
					
						
							|  |  |  |         true  -> do_revive(); | 
					
						
							|  |  |  |         false -> rabbit_log:warning("Feature flag `~s` is not enabled, reviving is a no-op", [?FEATURE_FLAG]) | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -spec do_revive() -> ok. | 
					
						
							|  |  |  | do_revive() -> | 
					
						
							| 
									
										
										
										
											2020-06-24 09:52:01 +08:00
										 |  |  |     rabbit_log:alert("This node is being revived from maintenance (drain) mode"), | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |     revive_local_quorum_queue_replicas(), | 
					
						
							|  |  |  |     rabbit_log:alert("Resumed all listeners and will accept client connections again"), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     resume_all_client_listeners(), | 
					
						
							| 
									
										
										
										
											2020-06-24 09:52:01 +08:00
										 |  |  |     rabbit_log:alert("Resumed all listeners and will accept client connections again"), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     unmark_as_being_drained(), | 
					
						
							| 
									
										
										
										
											2020-06-24 09:52:01 +08:00
										 |  |  |     rabbit_log:info("Marked this node as back from maintenance and ready to serve clients"), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-07-09 00:10:55 +08:00
										 |  |  |     %% allow plugins to react
 | 
					
						
							|  |  |  |     rabbit_event:notify(maintenance_revived, #{}), | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-07-14 08:41:25 +08:00
										 |  |  |     ok. | 
					
						
							|  |  |  |   | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  | -spec mark_as_being_drained() -> boolean(). | 
					
						
							|  |  |  | mark_as_being_drained() -> | 
					
						
							| 
									
										
										
										
											2020-06-30 06:12:22 +08:00
										 |  |  |     rabbit_log:debug("Marking the node as undergoing maintenance"), | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     set_maintenance_status_status(?DRAINING_STATUS). | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  |   | 
					
						
							|  |  |  | -spec unmark_as_being_drained() -> boolean(). | 
					
						
							|  |  |  | unmark_as_being_drained() -> | 
					
						
							| 
									
										
										
										
											2020-06-30 06:12:22 +08:00
										 |  |  |     rabbit_log:debug("Unmarking the node as undergoing maintenance"), | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     set_maintenance_status_status(?DEFAULT_STATUS). | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  | set_maintenance_status_status(Status) -> | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  |     Res = mnesia:transaction(fun () -> | 
					
						
							|  |  |  |         case mnesia:wread({?TABLE, node()}) of | 
					
						
							|  |  |  |            [] -> | 
					
						
							|  |  |  |                 Row = #node_maintenance_state{ | 
					
						
							|  |  |  |                         node   = node(), | 
					
						
							|  |  |  |                         status = Status | 
					
						
							|  |  |  |                      }, | 
					
						
							|  |  |  |                 mnesia:write(?TABLE, Row, write); | 
					
						
							|  |  |  |             [Row0] -> | 
					
						
							|  |  |  |                 Row = Row0#node_maintenance_state{ | 
					
						
							|  |  |  |                         node   = node(), | 
					
						
							|  |  |  |                         status = Status | 
					
						
							|  |  |  |                       }, | 
					
						
							|  |  |  |                 mnesia:write(?TABLE, Row, write) | 
					
						
							|  |  |  |         end | 
					
						
							|  |  |  |     end), | 
					
						
							|  |  |  |     case Res of | 
					
						
							|  |  |  |         {atomic, ok} -> true; | 
					
						
							|  |  |  |         _            -> false | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  | -spec is_being_drained_local_read(node()) -> boolean(). | 
					
						
							|  |  |  | is_being_drained_local_read(Node) -> | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     Status = status_local_read(Node), | 
					
						
							|  |  |  |     Status =:= ?DRAINING_STATUS. | 
					
						
							| 
									
										
										
										
											2020-05-30 00:21:25 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  | -spec is_being_drained_consistent_read(node()) -> boolean(). | 
					
						
							|  |  |  | is_being_drained_consistent_read(Node) -> | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |     Status = status_consistent_read(Node), | 
					
						
							|  |  |  |     Status =:= ?DRAINING_STATUS. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -spec status_local_read(node()) -> maintenance_status(). | 
					
						
							|  |  |  | status_local_read(Node) -> | 
					
						
							| 
									
										
										
										
											2020-07-16 01:47:35 +08:00
										 |  |  |     case catch mnesia:dirty_read(?TABLE, Node) of | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |         []  -> ?DEFAULT_STATUS; | 
					
						
							|  |  |  |         [#node_maintenance_state{node = Node, status = Status}] -> | 
					
						
							|  |  |  |             Status; | 
					
						
							|  |  |  |         _   -> ?DEFAULT_STATUS | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  | -spec status_consistent_read(node()) -> maintenance_status(). | 
					
						
							|  |  |  | status_consistent_read(Node) -> | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  |     case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |         {atomic, []} -> ?DEFAULT_STATUS; | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  |         {atomic, [#node_maintenance_state{node = Node, status = Status}]} -> | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |             Status; | 
					
						
							|  |  |  |         {atomic, _}  -> ?DEFAULT_STATUS; | 
					
						
							|  |  |  |         {aborted, _Reason} -> ?DEFAULT_STATUS | 
					
						
							| 
									
										
										
										
											2020-06-08 18:37:54 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2020-07-09 13:26:52 +08:00
										 |  |  |   | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |  -spec filter_out_drained_nodes_local_read([node()]) -> [node()]. | 
					
						
							|  |  |  | filter_out_drained_nodes_local_read(Nodes) -> | 
					
						
							| 
									
										
										
										
											2020-06-16 04:12:51 +08:00
										 |  |  |     lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes). | 
					
						
							|  |  |  |   | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  | -spec filter_out_drained_nodes_consistent_read([node()]) -> [node()]. | 
					
						
							|  |  |  | filter_out_drained_nodes_consistent_read(Nodes) -> | 
					
						
							|  |  |  |     lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes). | 
					
						
							|  |  |  |   | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  | -spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). | 
					
						
							| 
									
										
										
										
											2020-05-19 08:50:17 +08:00
										 |  |  |  %% Pauses all listeners on the current node except for
 | 
					
						
							|  |  |  |  %% Erlang distribution (clustering and CLI tools).
 | 
					
						
							| 
									
										
										
										
											2021-02-01 20:05:07 +08:00
										 |  |  |  %% A resumed listener will not accept any new client connections
 | 
					
						
							| 
									
										
										
										
											2020-05-19 08:50:17 +08:00
										 |  |  |  %% but previously established connections won't be interrupted.
 | 
					
						
							|  |  |  | suspend_all_client_listeners() -> | 
					
						
							|  |  |  |     Listeners = rabbit_networking:node_client_listeners(node()), | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  |     rabbit_log:info("Asked to suspend ~b client connection listeners. " | 
					
						
							|  |  |  |                     "No new client connections will be accepted until these listeners are resumed!", [length(Listeners)]), | 
					
						
							|  |  |  |     Results = lists:foldl(local_listener_fold_fun(fun ranch:suspend_listener/1), [], Listeners), | 
					
						
							|  |  |  |     lists:foldl(fun ok_or_first_error/2, ok, Results). | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  |  -spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()). | 
					
						
							| 
									
										
										
										
											2020-05-19 08:50:17 +08:00
										 |  |  |  %% Resumes all listeners on the current node except for
 | 
					
						
							|  |  |  |  %% Erlang distribution (clustering and CLI tools).
 | 
					
						
							|  |  |  |  %% A resumed listener will accept new client connections.
 | 
					
						
							|  |  |  | resume_all_client_listeners() -> | 
					
						
							|  |  |  |     Listeners = rabbit_networking:node_client_listeners(node()), | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  |     rabbit_log:info("Asked to resume ~b client connection listeners. " | 
					
						
							|  |  |  |                     "New client connections will be accepted from now on", [length(Listeners)]), | 
					
						
							|  |  |  |     Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners), | 
					
						
							|  |  |  |     lists:foldl(fun ok_or_first_error/2, ok, Results). | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |  -spec close_all_client_connections() -> {'ok', non_neg_integer()}. | 
					
						
							| 
									
										
										
										
											2020-05-12 22:42:00 +08:00
										 |  |  | close_all_client_connections() -> | 
					
						
							| 
									
										
										
										
											2020-06-16 20:03:25 +08:00
										 |  |  |     Pids = rabbit_networking:local_connections(), | 
					
						
							|  |  |  |     rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"), | 
					
						
							|  |  |  |     {ok, length(Pids)}. | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  | -spec transfer_leadership_of_quorum_queues([node()]) -> ok. | 
					
						
							|  |  |  | transfer_leadership_of_quorum_queues([]) -> | 
					
						
							|  |  |  |     rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " | 
					
						
							|  |  |  |                        "(online, not under maintenance) nodes to transfer to!"); | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  | transfer_leadership_of_quorum_queues(_TransferCandidates) -> | 
					
						
							|  |  |  |     %% we only transfer leadership for QQs that have local leaders
 | 
					
						
							|  |  |  |     Queues = rabbit_amqqueue:list_local_leaders(), | 
					
						
							|  |  |  |     rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", | 
					
						
							|  |  |  |                     [length(Queues)]), | 
					
						
							|  |  |  |     [begin | 
					
						
							|  |  |  |         Name = amqqueue:get_name(Q), | 
					
						
							|  |  |  |         rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", | 
					
						
							|  |  |  |                          [rabbit_misc:rs(Name)]), | 
					
						
							|  |  |  |         %% we trigger an election and exclude this node from the list of candidates
 | 
					
						
							|  |  |  |         %% by simply shutting its local QQ replica (Ra server)
 | 
					
						
							|  |  |  |         RaLeader = amqqueue:get_pid(Q), | 
					
						
							|  |  |  |         rabbit_log:debug("Will stop Ra server ~p", [RaLeader]), | 
					
						
							|  |  |  |         case ra:stop_server(RaLeader) of | 
					
						
							|  |  |  |             ok     -> | 
					
						
							|  |  |  |                 rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]); | 
					
						
							|  |  |  |             {error, nodedown} -> | 
					
						
							|  |  |  |                 rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") | 
					
						
							|  |  |  |         end | 
					
						
							|  |  |  |      end || Q <- Queues], | 
					
						
							|  |  |  |     rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"). | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | -spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok. | 
					
						
							| 
									
										
										
										
											2021-01-27 23:52:17 +08:00
										 |  |  | %% This function is no longer used by maintanence mode. We retain it in case
 | 
					
						
							|  |  |  | %% classic mirrored queue leadership transfer would be reconsidered.
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% With a lot of CMQs in a cluster, the transfer procedure can take prohibitively long
 | 
					
						
							|  |  |  | %% for a pre-upgrade task.
 | 
					
						
							|  |  |  | transfer_leadership_of_classic_mirrored_queues([]) -> | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |     rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate " | 
					
						
							|  |  |  |                        "(online, not under maintenance) nodes to transfer to!"); | 
					
						
							|  |  |  | transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> | 
					
						
							|  |  |  |     Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(), | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |     ReadableCandidates = readable_candidate_list(TransferCandidates), | 
					
						
							|  |  |  |     rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s", | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |                     [length(Queues), ReadableCandidates]), | 
					
						
							|  |  |  |     [begin | 
					
						
							|  |  |  |          Name = amqqueue:get_name(Q), | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  |          ExistingReplicaNodes = [node(Pid) || Pid <- amqqueue:get_sync_slave_pids(Q)], | 
					
						
							|  |  |  |          rabbit_log:debug("Local ~s has replicas on nodes ~s", | 
					
						
							|  |  |  |                           [rabbit_misc:rs(Name), readable_candidate_list(ExistingReplicaNodes)]), | 
					
						
							|  |  |  |          case random_primary_replica_transfer_candidate_node(TransferCandidates, ExistingReplicaNodes) of | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |              {ok, Pick} -> | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  |                  rabbit_log:debug("Will transfer leadership of local ~s to node ~s", | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |                           [rabbit_misc:rs(Name), Pick]), | 
					
						
							| 
									
										
										
										
											2021-01-26 20:35:16 +08:00
										 |  |  |                  case rabbit_mirror_queue_misc:migrate_leadership_to_existing_replica(Q, Pick) of | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |                      {migrated, _} -> | 
					
						
							|  |  |  |                          rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s", | 
					
						
							|  |  |  |                                           [rabbit_misc:rs(Name), Pick]); | 
					
						
							|  |  |  |                      Other -> | 
					
						
							|  |  |  |                          rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p", | 
					
						
							|  |  |  |                                             [rabbit_misc:rs(Name), Pick, Other]) | 
					
						
							|  |  |  |                  end; | 
					
						
							|  |  |  |              undefined -> | 
					
						
							|  |  |  |                  rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?", | 
					
						
							|  |  |  |                                     [Name]) | 
					
						
							|  |  |  |          end | 
					
						
							|  |  |  |      end || Q <- Queues], | 
					
						
							|  |  |  |     rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-20 00:47:05 +08:00
										 |  |  | -spec stop_local_quorum_queue_followers() -> ok. | 
					
						
							|  |  |  | stop_local_quorum_queue_followers() -> | 
					
						
							|  |  |  |     Queues = rabbit_amqqueue:list_local_followers(), | 
					
						
							|  |  |  |     rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", | 
					
						
							|  |  |  |                     [length(Queues)]), | 
					
						
							|  |  |  |     [begin | 
					
						
							|  |  |  |         Name = amqqueue:get_name(Q), | 
					
						
							|  |  |  |         rabbit_log:debug("Will stop a local follower replica of quorum queue ~s", | 
					
						
							|  |  |  |                          [rabbit_misc:rs(Name)]), | 
					
						
							|  |  |  |         %% shut down Ra nodes so that they are not considered for leader election
 | 
					
						
							|  |  |  |         {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), | 
					
						
							|  |  |  |         RaNode = {RegisteredName, node()}, | 
					
						
							|  |  |  |         rabbit_log:debug("Will stop Ra server ~p", [RaNode]), | 
					
						
							|  |  |  |         case ra:stop_server(RaNode) of | 
					
						
							|  |  |  |             ok     -> | 
					
						
							|  |  |  |                 rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]); | 
					
						
							|  |  |  |             {error, nodedown} -> | 
					
						
							|  |  |  |                 rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") | 
					
						
							|  |  |  |         end | 
					
						
							|  |  |  |      end || Q <- Queues], | 
					
						
							|  |  |  |     rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  | -spec primary_replica_transfer_candidate_nodes() -> [node()]. | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  | primary_replica_transfer_candidate_nodes() -> | 
					
						
							|  |  |  |     filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  | -spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined. | 
					
						
							|  |  |  | random_primary_replica_transfer_candidate_node([], _Preferred) -> | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |     undefined; | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  | random_primary_replica_transfer_candidate_node(Candidates, PreferredNodes) -> | 
					
						
							|  |  |  |     Overlap = sets:to_list(sets:intersection(sets:from_list(Candidates), sets:from_list(PreferredNodes))), | 
					
						
							|  |  |  |     Candidate = case Overlap of | 
					
						
							|  |  |  |                     [] -> | 
					
						
							|  |  |  |                         %% Since ownership transfer is meant to be run only when we are sure
 | 
					
						
							|  |  |  |                         %% there are in-sync replicas to transfer to, this is an edge case.
 | 
					
						
							|  |  |  |                         %% We skip the transfer.
 | 
					
						
							|  |  |  |                         undefined; | 
					
						
							|  |  |  |                     Nodes -> | 
					
						
							|  |  |  |                         random_nth(Nodes) | 
					
						
							|  |  |  |                 end, | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |     {ok, Candidate}. | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-26 19:47:15 +08:00
										 |  |  | random_nth(Nodes) -> | 
					
						
							|  |  |  |     Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)), | 
					
						
							|  |  |  |     lists:nth(Nth + 1, Nodes). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  | revive_local_quorum_queue_replicas() -> | 
					
						
							|  |  |  |     Queues = rabbit_amqqueue:list_local_followers(), | 
					
						
							|  |  |  |     [begin | 
					
						
							|  |  |  |         Name = amqqueue:get_name(Q), | 
					
						
							|  |  |  |         rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", | 
					
						
							|  |  |  |                          [rabbit_misc:rs(Name)]), | 
					
						
							|  |  |  |         %% start local QQ replica (Ra server) of this queue
 | 
					
						
							| 
									
										
										
										
											2020-06-25 22:59:08 +08:00
										 |  |  |         {Prefix, _Node} = amqqueue:get_pid(Q), | 
					
						
							|  |  |  |         RaServer = {Prefix, node()}, | 
					
						
							|  |  |  |         rabbit_log:debug("Will start Ra server ~p", [RaServer]), | 
					
						
							|  |  |  |         case ra:restart_server(RaServer) of | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |             ok     -> | 
					
						
							| 
									
										
										
										
											2020-06-25 22:59:08 +08:00
										 |  |  |                 rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]); | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |             {error, {already_started, _Pid}} -> | 
					
						
							| 
									
										
										
										
											2020-06-25 22:59:08 +08:00
										 |  |  |                 rabbit_log:debug("Ra server ~p is already running", [RaServer]); | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |             {error, nodedown} -> | 
					
						
							|  |  |  |                 rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down") | 
					
						
							|  |  |  |         end | 
					
						
							|  |  |  |      end || Q <- Queues], | 
					
						
							|  |  |  |     rabbit_log:info("Restart of local quorum queue replicas is complete"). | 
					
						
							| 
									
										
										
										
											2020-06-17 23:12:15 +08:00
										 |  |  |   | 
					
						
							| 
									
										
										
										
											2020-05-19 16:24:13 +08:00
										 |  |  | %%
 | 
					
						
							|  |  |  | %% Implementation
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | local_listener_fold_fun(Fun) -> | 
					
						
							|  |  |  |     fun(#listener{node = Node, ip_address = Addr, port = Port}, Acc) when Node =:= node() -> | 
					
						
							|  |  |  |             RanchRef = rabbit_networking:ranch_ref(Addr, Port), | 
					
						
							|  |  |  |             [Fun(RanchRef) | Acc]; | 
					
						
							|  |  |  |         (_, Acc) -> | 
					
						
							|  |  |  |             Acc | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  | ok_or_first_error(ok, Acc) -> | 
					
						
							|  |  |  |     Acc; | 
					
						
							|  |  |  | ok_or_first_error({error, _} = Err, _Acc) -> | 
					
						
							|  |  |  |     Err. | 
					
						
							| 
									
										
										
										
											2020-06-25 02:57:04 +08:00
										 |  |  |   | 
					
						
							|  |  |  | readable_candidate_list(Nodes) -> | 
					
						
							|  |  |  |     string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", "). |