use a pool of workers which cache connections

This commit is contained in:
ash-lshift 2015-03-06 17:41:05 +00:00
parent e64c959d69
commit afb38b8747
8 changed files with 456 additions and 29 deletions

View File

@ -7,3 +7,5 @@ WITH_BROKER_TEST_CONFIG:=$(PACKAGE_DIR)/etc/rabbit-test
else
$(warning Not running LDAP tests; no LDAP server found on localhost)
endif
STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_auth_backend_ldap_pool_coord_test,rabbit_auth_backend_ldap_pool_test],[verbose])

View File

@ -246,8 +246,6 @@ with_ldap(_Creds, _Fun, undefined) ->
with_ldap({error, _} = E, _Fun, _State) ->
E;
%% TODO - ATM we create and destroy a new LDAP connection on every
%% call. This could almost certainly be more efficient.
with_ldap({ok, Creds}, Fun, Servers) ->
Opts0 = [{port, env(port)}],
Opts1 = case env(log) of
@ -266,32 +264,57 @@ with_ldap({ok, Creds}, Fun, Servers) ->
infinity -> Opts1;
MS -> [{timeout, MS} | Opts1]
end,
case eldap_open(Servers, Opts) of
{ok, LDAP} ->
try Creds of
anon ->
?L1("anonymous bind", []),
Fun(LDAP);
{UserDN, Password} ->
case eldap:simple_bind(LDAP, UserDN, Password) of
ok ->
?L1("bind succeeded: ~s", [UserDN]),
Fun(LDAP);
{error, invalidCredentials} ->
?L1("bind returned \"invalid credentials\": ~s",
[UserDN]),
{refused, UserDN, []};
{error, E} ->
?L1("bind error: ~s ~p", [UserDN, E]),
{error, E}
end
after
eldap:close(LDAP)
end;
Error ->
?L1("connect error: ~p", [Error]),
Error
end.
rabbit_auth_backend_ldap_pool_coord:do_work(fun (State) ->
%% cache up to two connections - one for anonymous use, one for binding
%% (re-binding is easy but I couldn't figure out how to un-bind)
Anonness = case Creds of
anon -> anon;
_ -> bound
end,
{Conn, State1} = get_or_create_conn(Servers, Opts, Anonness, State),
Ans = case Conn of
{ok, LDAP} ->
case Creds of
anon ->
?L1("anonymous bind", []),
Fun(LDAP);
{UserDN, Password} ->
case eldap:simple_bind(LDAP, UserDN, Password) of
ok ->
?L1("bind succeeded: ~s", [UserDN]),
Fun(LDAP);
{error, invalidCredentials} ->
?L1("bind returned \"invalid credentials\": ~s",
[UserDN]),
{refused, UserDN, []};
{error, E} ->
?L1("bind error: ~s ~p", [UserDN, E]),
{error, E}
end
end;
Error ->
?L1("connect error: ~p", [Error]),
Error
end,
{Ans, State1}
end,
%% wait indefinitely for the worker to finish since the connection has
%% its own timeout
infinity).
get_or_create_conn(Servers, Opts, Anonness, Conns) ->
%% the worker's initial state is undefined
Conns1 = case Conns of
undefined -> dict:new();
Dict -> Dict
end,
Key = {Servers, Opts, Anonness},
case dict:find(Key, Conns1) of
{ok, Conn} -> {Conn, Conns1};
error ->
Conn = eldap_open(Servers, Opts),
{Conn, dict:store(Key, Conn, Conns1)}
end.
eldap_open(Servers, Opts) ->
case eldap:open(Servers, ssl_conf() ++ Opts) of

View File

@ -50,4 +50,18 @@ configured(M, [_ |T]) -> configured(M, T).
%%----------------------------------------------------------------------------
init([]) -> {ok, {{one_for_one, 3, 10}, []}}.
init([]) ->
CoordSpec = {rabbit_auth_backend_ldap_pool_coord,
{rabbit_auth_backend_ldap_pool_coord, start_link, []},
permanent,
30,
worker,
[rabbit_auth_backend_ldap_pool_coord]},
WorkerSupSpec ={rabbit_auth_backend_ldap_pool_worker_sup,
{rabbit_auth_backend_ldap_pool_worker_sup, start_link, [10, {3, 10}]},
permanent,
infinity,
supervisor,
[rabbit_auth_backend_ldap_pool_worker_sup]},
{ok, {{one_for_all, 3, 10},
[CoordSpec, WorkerSupSpec]}}.

View File

@ -0,0 +1,188 @@
-module(rabbit_auth_backend_ldap_pool_coord).
%% 'public' exports
-export([start_link/0,
do_work/2,
get_work/1,
done_work/3,
stop/1,
hello_from_worker/1]).
-behaviour(gen_server).
%% exports to satisfy gen_server
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {idle_workers=queue:new(),
work_queue=queue:new(),
busy_workers=dict:new()}).
%% testing stuff ---------------------------------------------------------------
-export([get_info_for_testing/0]).
get_info_for_testing() ->
gen_server:call(?MODULE, get_info_for_testing).
%% 'public' interface ----------------------------------------------------------
%% called as part of managing the supervision tree:
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop(Timeout) ->
try
gen_server:call(?MODULE, stop, Timeout)
catch
exit: {normal, _} -> ok
end.
%% called by 'client' (process that wants work to be done):
do_work(Work, Timeout) ->
case gen_server:call(?MODULE, {do_work, Work}, Timeout) of
{ok, Answer} -> Answer;
Error -> exit(Error)
end.
%% called by worker:
get_work(Timeout) ->
gen_server:call(?MODULE, get_work, Timeout).
done_work(Client={_, _}, Answer, Timeout) ->
well_done = gen_server:call(?MODULE, {done_work, Client, Answer}, Timeout),
ok.
hello_from_worker(Timeout) ->
hello_from_coord = gen_server:call(?MODULE, hello_from_worker, Timeout).
%% gen_server callbacks --------------------------------------------------------
init(Args) ->
{ok, #state{}}.
handle_call(get_work, Worker, State) ->
State1 = enqueue_worker(Worker, State),
State2 = distribute_work(State1),
{noreply, State2};
handle_call({do_work, Work}, Client, State) ->
State1 = enqueue_work(Client, Work, State),
State2 = distribute_work(State1),
{noreply, State2};
handle_call({done_work, Client={_,_}, Answer}, {WorkerPid, _}, State) ->
State1 = done_work_impl(Client, Answer, WorkerPid, State),
{reply, well_done, State1};
handle_call(get_info_for_testing, _,
State=#state{idle_workers=Workers, work_queue=WorkQueue}) ->
Info = [{idle_worker_count, queue:len(Workers)},
{work_queue_count, queue:len(WorkQueue)}],
{reply, Info, State};
handle_call(stop, _, State) ->
{stop, normal, State};
handle_call(hello_from_worker, {Worker, _}, State) ->
erlang:monitor(process, Worker),
{reply, hello_from_coord, State}.
handle_cast(Request, State) ->
{noreply, State}.
handle_info(Info={'DOWN', Ref, process, Pid, Reason}, State) ->
State1 = notify_client_of_worker_failure(Pid, State),
{noreply, State1};
handle_info(Info, State) ->
{noreply, State}.
terminate(Reason,State) ->
ok.
code_change(OldVsn, State, Extra) ->
{ok, State}.
%% internal stuff --------------------------------------------------------------
enqueue_work(Client={Pid, _}, Work, State=#state{work_queue=WorkQueue}) ->
%% note that we're not monitor'ing the client; if it dies, then the worker
%% will die when it tries to reply.
State1 = State#state{work_queue=queue:in({Client, Work}, WorkQueue)},
check_invariants(State1),
State1.
enqueue_worker(Worker={Pid,_}, State=#state{idle_workers=Workers}) ->
State1 = State#state{idle_workers=queue:in(Worker,Workers)},
check_invariants(State1),
State1.
distribute_work(State=#state{idle_workers=IdleWorkers,
work_queue=WorkQueue,
busy_workers=BusyWorkers}) ->
State1 = case {queue:out(IdleWorkers), queue:out(WorkQueue)} of
{{{value, Worker}, Workers1}, {{value, Work}, WorkQueue1}} ->
try
gen_server:reply(Worker, Work)
catch
E -> io:format("exception: ~p~n", [E])
end,
{WorkerPid, _} = Worker,
BusyWorkers1 = dict:store(WorkerPid, Work, BusyWorkers),
distribute_work(State#state{idle_workers=Workers1,
work_queue=WorkQueue1,
busy_workers=BusyWorkers1});
_ -> State
end,
check_invariants(State1),
State1.
check_invariants(State=#state{idle_workers=IdleWorkers,
busy_workers=BusyWorkers}) ->
IdlePidSet = sets:from_list(lists:map(fun ({Pid, _}) -> Pid end,
queue:to_list(IdleWorkers))),
BusyPidSet = sets:from_list(dict:fetch_keys(BusyWorkers)),
BothPidList = sets:to_list(sets:intersection(IdlePidSet, BusyPidSet)),
if
length(BothPidList) > 0 ->
exit({invariant_violated, State});
true ->
ok
end,
State.
done_work_impl(Client, Answer, WorkerPid,
State=#state{busy_workers=BusyWorkers}) when is_pid(WorkerPid) ->
gen_server:reply(Client, {ok, Answer}),
State1 = State#state{busy_workers=dict:erase(WorkerPid, BusyWorkers)},
check_invariants(State1),
State1.
notify_client_of_worker_failure(Worker,
State=#state{idle_workers=IdleWorkers,
busy_workers=BusyWorkers}) ->
case dict:find(Worker, BusyWorkers) of
{ok, Work = {Client, _}} ->
try
R = gen_server:reply(Client, {error, worker_failed})
catch
E -> io:format(user,"exception: ~p~n", [E])
end,
State#state{busy_workers=dict:erase(Worker, BusyWorkers)};
error ->
IdleWorkers1 = queue:filter(fun ({Pid, _}) ->
case Pid of
Worker -> false;
_ -> true
end
end, IdleWorkers),
State#state{idle_workers=IdleWorkers1}
end.

View File

@ -0,0 +1,21 @@
-module(rabbit_auth_backend_ldap_pool_worker).
-export([start_link/0,
run/0]).
%% how long we're willing to wait for the coordinator before giving up
-define(TIMEOUT, 5000).
start_link() ->
Pid = spawn_link(?MODULE, run, []),
{ok, Pid}.
run() ->
rabbit_auth_backend_ldap_pool_coord:hello_from_worker(?TIMEOUT),
run1(undefined).
run1(State) ->
{From, Work} = rabbit_auth_backend_ldap_pool_coord:get_work(?TIMEOUT),
{Answer, State1} = apply(Work, [State]),
rabbit_auth_backend_ldap_pool_coord:done_work(From, Answer, ?TIMEOUT),
run1(State1).

View File

@ -0,0 +1,23 @@
-module(rabbit_auth_backend_ldap_pool_worker_sup).
-export([start_link/2]).
-behaviour(supervisor).
-export([init/1]).
start_link(WorkerCount, MaxRestart) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [WorkerCount, MaxRestart]).
init([WorkerCount, {MaxR, MaxT}]) ->
WorkerSpecs = [worker_spec(N) || N <- lists:seq(1, WorkerCount)],
{ok, {{one_for_one, MaxR, MaxT}, WorkerSpecs}}.
worker_spec(N) ->
Name = list_to_atom("rabbit_auth_backend_ldap_pool_worker_" ++ integer_to_list(N)),
{Name,
{rabbit_auth_backend_ldap_pool_worker, start_link, []},
permanent,
30,
worker,
[rabbit_auth_backend_ldap_pool_worker]}.

View File

@ -0,0 +1,88 @@
%% unit tests for rabbit_auth_backend_ldap_pool_coord
-module(rabbit_auth_backend_ldap_pool_coord_test).
-include_lib("eunit/include/eunit.hrl").
%% support gubbins -------------------------------------------------------------
-define(IT, rabbit_auth_backend_ldap_pool_coord).
-define(assertEventuallyEqual(E, A), assertEventuallyEqual(E, fun() -> A end)).
assertEventuallyEqual(E, A) ->
case apply(A, []) of
E -> ok;
Other ->
receive
after
50 -> assertEventuallyEqual(E, A)
end
end.
get_info_for_testing(Key) ->
Info = ?IT:get_info_for_testing(),
proplists:get_value(Key, Info).
recv() ->
receive
X -> X
after
1000 -> timeout
end.
%% the actual tests! -----------------------------------------------------------
worker_arrives_first() ->
Worker = spawn(fun () ->
?IT:hello_from_worker(infinity),
{From, Work} = ?IT:get_work(infinity),
?IT:done_work(From, {i_have_done, Work}, infinity)
end),
?assertEventuallyEqual(1, get_info_for_testing(idle_worker_count)),
R = ?IT:do_work(some_work, infinity),
?assertEqual(R, {i_have_done, some_work}).
work_arrives_first() ->
Self = self(),
Client = spawn(fun() ->
Response = ?IT:do_work(some_work, infinity),
Self ! {client_finished, Response}
end),
?assertEventuallyEqual(1, get_info_for_testing(work_queue_count)),
{From, Work} = ?IT:get_work(infinity),
?assertMatch({Client, _}, From),
?assertEqual(some_work, Work),
?IT:done_work(From, {i_have_done, Work}, infinity),
?assertEqual(0, get_info_for_testing(work_queue_count)),
{client_finished, {i_have_done, Work}} = recv().
busy_worker_fails() ->
Worker = spawn(fun () ->
?debugHere,
?IT:hello_from_worker(infinity),
?debugHere,
{From, Work} = ?IT:get_work(infinity),
?debugHere
end),
R = (catch ?IT:do_work(some_work, infinity)),
?assertMatch({'EXIT', _}, R).
idle_worker_fails() ->
Worker = spawn(fun () ->
?IT:hello_from_worker(infinity),
{From, Work} = ?IT:get_work(infinity)
end),
?assertEventuallyEqual(1, get_info_for_testing(idle_worker_count)),
erlang:exit(Worker, cest_la_vie),
?assertEventuallyEqual(0, get_info_for_testing(idle_worker_count)).
all_test_() ->
{inorder,
{foreach,
fun () -> ?IT:start_link() end,
fun (_) -> ?IT:stop(5000) end,
[fun work_arrives_first/0,
fun worker_arrives_first/0,
fun busy_worker_fails/0,
fun idle_worker_fails/0]}}.

View File

@ -0,0 +1,68 @@
-module(rabbit_auth_backend_ldap_pool_test).
-include_lib("eunit/include/eunit.hrl").
-define(CLIENT_COUNT, 10).
-define(WORKER_COUNT, 10).
-define(CLIENT_OPS, 100).
-define(MAX_SLEEP, 3).
%% expect a lot of restarts
-define(MAX_RESTART, {100000, 1}).
%% a kind of stress test where we have some clients and workers, and the clients
%% submit a bunch of random work, some of which causes the workers to fail.
marches_inexorably_forward_in_a_chaotic_universe_test_() ->
{setup,
fun () ->
erlang:process_flag(trap_exit, true),
rabbit_auth_backend_ldap_pool_coord:start_link(),
rabbit_auth_backend_ldap_pool_worker_sup:start_link(?WORKER_COUNT, ?MAX_RESTART),
ok
end,
fun (_) ->
rabbit_auth_backend_ldap_pool_coord:stop(5000)
end,
fun () ->
Clients = make_clients(),
lists:foreach(fun await_client/1, Clients)
end}.
make_clients() ->
[spawn_link(fun () ->
random:seed(N, N, N),
march(?CLIENT_OPS),
receive
{ping, Pid} -> Pid ! {pong, self()}
end
end)
|| N <- lists:seq(1, ?CLIENT_COUNT)].
await_client(Client) ->
Client ! {ping, self()},
receive
{pong, Client} -> ok;
{'EXIT', E} -> ?debugVal(E);
Other -> ?debugVal(Other)
end.
%% generate a {Work, Check} where Work is to be sent to the worker and
%% Check is how to verify the response
make_work() ->
N = random:uniform(?MAX_SLEEP) - 1,
if
N > 0 ->
{fun (State) -> timer:sleep(N), {N, State} end,
fun (A) -> ?assertEqual(N, A) end};
true ->
{fun (_) -> exit(cest_la_vie) end,
fun (A) -> ?assertMatch({'EXIT', _}, A) end}
end.
%% send N random pieces of work to the workers
march(N) when N > 0 ->
{Work, Check} = make_work(),
Answer = (catch rabbit_auth_backend_ldap_pool_coord:do_work(Work, infinity)),
apply(Check, [Answer]),
march(N - 1);
march(0) -> ok.