Introduce batching (again - same diff as 5f7d8d07f94f)

This commit is contained in:
Matthew Sackman 2011-03-21 17:57:54 +00:00
parent 0cd4e2f3dd
commit 4ba94e18c6
1 changed files with 91 additions and 41 deletions

View File

@ -376,15 +376,16 @@
confirmed_broadcast/2, group_members/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
code_change/3, prioritise_cast/2, prioritise_info/2]).
-export([behaviour_info/1]).
-export([table_definitions/0]).
-export([table_definitions/0, flush/1]).
-define(GROUP_TABLE, gm_group).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
-define(SETS, ordsets).
-define(DICT, orddict).
@ -398,7 +399,9 @@
pub_count,
members_state,
callback_args,
confirms
confirms,
broadcast_buffer,
broadcast_timer
}).
-record(gm_group, { name, version, members }).
@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) ->
group_members(Server) ->
gen_server2:call(Server, group_members, infinity).
flush(Server) ->
gen_server2:cast(Server, flush).
init([GroupName, Module, Args]) ->
random:seed(now()),
gen_server2:cast(self(), join),
Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
group_name = GroupName,
module = Module,
view = undefined,
pub_count = 0,
members_state = undefined,
callback_args = Args,
confirms = queue:new() }, hibernate,
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
group_name = GroupName,
module = Module,
view = undefined,
pub_count = 0,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
broadcast_timer = undefined }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, all_known_members(View)), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
{stop, normal, State};
handle_cast(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined })).
handle_info({'DOWN', MRef, process, _Pid, _Reason},
@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
end.
terminate(Reason, #state { module = Module,
callback_args = Args }) ->
terminate(Reason, State = #state { module = Module,
callback_args = Args }) ->
flush_broadcast_buffer(State),
Module:terminate(Args, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
prioritise_cast(flush, _State) -> 1;
prioritise_cast(_ , _State) -> 0.
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
prioritise_info(_ , _State) -> 0.
@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
{noreply, State, hibernate}.
{noreply, ensure_broadcast_timer(State), hibernate}.
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
{reply, Reply, ensure_broadcast_timer(State), hibernate}.
internal_broadcast(Msg, From, State = #state { self = Self,
pub_count = PubCount,
members_state = MembersState,
module = Module,
confirms = Confirms,
callback_args = Args }) ->
PubMsg = {PubCount, Msg},
Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 =
with_member(
fun (Member = #member { pending_ack = PA }) ->
Member #member { pending_ack = queue:in(PubMsg, PA) }
end, Self, MembersState),
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = TRef }) ->
timer:cancel(TRef),
State #state { broadcast_timer = undefined };
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
{ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]),
State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->
State.
internal_broadcast(Msg, From, State = #state { self = Self,
pub_count = PubCount,
module = Module,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
Result = Module:handle_msg(Args, Self, Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
_ -> queue:in({PubCount, From}, Confirms)
end,
handle_callback_result({Module:handle_msg(Args, Self, Msg),
State #state { pub_count = PubCount + 1,
members_state = MembersState1,
confirms = Confirms1 }}).
State1 = State #state { pub_count = PubCount + 1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
true ->
handle_callback_result({Result, flush_broadcast_buffer(State1)});
false ->
handle_callback_result(
{Result, State1 #state { broadcast_buffer = Buffer1 }})
end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
broadcast_buffer = Buffer }) ->
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
Member #member { pending_ack = PA1 }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.
%% ---------------------------------------------------------------------------
@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) ->
maybe_monitor(Other, _Self) ->
erlang:monitor(process, Other).
check_neighbours(State = #state { self = Self,
left = Left,
right = Right,
view = View }) ->
check_neighbours(State = #state { self = Self,
left = Left,
right = Right,
view = View,
broadcast_buffer = Buffer }) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
Right1 = ensure_neighbour(Ver, Self, Right, VRight),
State1 = State #state { left = Left1, right = Right1 },
Buffer1 = case Right1 of
{Self, undefined} -> [];
_ -> Buffer
end,
State1 = State #state { left = Left1, right = Right1,
broadcast_buffer = Buffer1 },
ok = maybe_send_catchup(Right, State1),
State1.