Slaves detect stale master pids on startup

This commit is contained in:
Emile Joubert 2013-07-19 13:23:10 +01:00
parent 5319f96acd
commit 61e69ebee4
2 changed files with 28 additions and 4 deletions

View File

@ -81,6 +81,12 @@
%% Provide the Pid. Returns a proplist with various facts, including
%% the group name and the current group members.
%%
%% validate_members/2
%% Check whether a given member list agrees with the chosen member's
%% view. Any differences will be communicated via the members_changed
%% callback. If there are no differences then there will be no reply.
%% Note that members will not necessarily share the same view.
%%
%% forget_group/1
%% Provide the group name. Removes its mnesia record. Makes no attempt
%% to ensure the group is empty.
@ -377,7 +383,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
@ -438,6 +444,7 @@
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(validate_members/2 :: (pid(), [pid()]) -> 'ok').
-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
@ -524,6 +531,9 @@ confirmed_broadcast(Server, Msg) ->
info(Server) ->
gen_server2:call(Server, info, infinity).
validate_members(Server, Members) ->
gen_server2:cast(Server, {validate_members, Members}).
forget_group(GroupName) ->
{atomic, ok} = mnesia:sync_transaction(
fun () ->
@ -659,6 +669,19 @@ handle_cast(join, State = #state { self = Self,
handle_callback_result(
{Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast({validate_members, OldMembers},
State = #state { view = View,
module = Module,
callback_args = Args }) ->
NewMembers = get_pids(all_known_members(View)),
Births = NewMembers -- OldMembers,
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> noreply(State);
_ -> Result = Module:members_changed(Args, Births, Deaths),
handle_callback_result({Result, State})
end;
handle_cast(leave, State) ->
{stop, normal, State}.

View File

@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) ->
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
fun() -> init_it(Self, GM, Node, QName) end) of
{new, QPid} ->
{new, QPid, GMPids} ->
erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@ -127,6 +127,7 @@ init(Q = #amqqueue { name = QName }) ->
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) ->
mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
{new, QPid};
{new, QPid, GMPids};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) ->
gm_pids = [T || T = {_, S} <- GMPids,
S =/= SPid] },
add_slave(Q1, Self, GM),
{new, QPid}
{new, QPid, GMPids}
end
end.