diff --git a/deps/rabbitmq_auth_backend_cache/src/rabbit_auth_cache_ets_segmented_stateless.erl b/deps/rabbitmq_auth_backend_cache/src/rabbit_auth_cache_ets_segmented_stateless.erl new file mode 100644 index 0000000000..082b1b3bf0 --- /dev/null +++ b/deps/rabbitmq_auth_backend_cache/src/rabbit_auth_cache_ets_segmented_stateless.erl @@ -0,0 +1,140 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_auth_cache_ets_segmented_stateless). +-behaviour(gen_server2). + +-export([start_link/1, + get/1, put/3, delete/1]). +-export([gc/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SEGMENT_TABLE, rabbit_auth_cache_ets_segmented_stateless_segment_table). + +-record(state, {gc_timer}). + +start_link(SegmentSize) -> + gen_server2:start_link({local, ?MODULE}, ?MODULE, [SegmentSize], []). + +get(Key) -> + case get_from_segments(Key) of + [] -> {error, not_found}; + [V|_] -> {ok, V} + end. + +put(Key, Value, TTL) -> + Expiration = expiration(TTL), + [{_, SegmentSize}] = ets:lookup(?SEGMENT_TABLE, segment_size), + Segment = segment(Expiration, SegmentSize), + Table = case ets:lookup(?SEGMENT_TABLE, Segment) of + [{Segment, T}] -> T; + [] -> add_segment(Segment) + end, + ets:insert(Table, {Key, {Expiration, Value}}), + ok. + +delete(Key) -> + [ets:delete(Table, Key) + || Table <- get_all_segment_tables()]. + +gc() -> + case whereis(?MODULE) of + undefined -> ok; + Pid -> Pid ! gc + end. + +init([SegmentSize]) -> + ets:new(?SEGMENT_TABLE, [ordered_set, named_table, public]), + ets:insert(?SEGMENT_TABLE, {segment_size, SegmentSize}), + + InitSegment = segment(expiration(SegmentSize), SegmentSize), + do_add_segment(InitSegment), + + {ok, GCTimer} = timer:send_interval(SegmentSize * 2, gc), + {ok, #state{gc_timer = GCTimer}}. + +handle_call({add_segment, Segment}, _From, State) -> + %% Double check segment if it's already created + Table = do_add_segment(Segment), + {reply, Table, State}. + +handle_cast(_, State = #state{}) -> + {noreply, State}. + +handle_info(gc, State = #state{}) -> + Now = time_compat:erlang_system_time(milli_seconds), + MatchSpec = [{{'$1', '$2'}, [{'<', '$1', {const, Now}}], ['$2']}], + Expired = ets:select(?SEGMENT_TABLE, MatchSpec), + [ets:delete(Table) || Table <- Expired], + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State = #state{gc_timer = Timer}) -> + timer:cancel(Timer), + State. + +expiration(TTL) -> + time_compat:erlang_system_time(milli_seconds) + TTL. + +segment(Expiration, SegmentSize) -> + Begin = ((Expiration div SegmentSize) * SegmentSize), + End = Begin + SegmentSize, + End. + +add_segment(Segment) -> + gen_server2:call(?MODULE, {add_segment, Segment}). + +do_add_segment(Segment) -> + case ets:lookup(?SEGMENT_TABLE, Segment) of + [{Segment, Table}] -> Table; + [] -> Table = ets:new(segment, [set, public]), + ets:insert(?SEGMENT_TABLE, {Segment, Table}), + Table + end. + +expired(Exp) -> + time_compat:erlang_system_time(milli_seconds) > Exp. + +get_segment_tables(_Now) -> + get_all_segment_tables(). + % get_matching_segment_tables(Now). + +get_all_segment_tables() -> + [V || {K, V} <- ets:tab2list(?SEGMENT_TABLE), K =/= segment_size]. + +get_from_segments(Key) -> + Now = time_compat:erlang_system_time(milli_seconds), + Tables = get_segment_tables(Now), + lists:flatmap( + fun(undefined) -> []; + (T) -> + case ets:lookup(T, Key) of + [{Key, {Exp, Val}}] -> + case expired(Exp) of + true -> []; + false -> [Val] + end; + [] -> [] + end + end, + Tables). + diff --git a/deps/rabbitmq_auth_backend_cache/test/src/rabbit_auth_cache_SUITE.erl b/deps/rabbitmq_auth_backend_cache/test/src/rabbit_auth_cache_SUITE.erl index 2b23db71d8..f7dc8f1485 100644 --- a/deps/rabbitmq_auth_backend_cache/test/src/rabbit_auth_cache_SUITE.erl +++ b/deps/rabbitmq_auth_backend_cache/test/src/rabbit_auth_cache_SUITE.erl @@ -8,7 +8,8 @@ all() -> [ {group, rabbit_auth_cache_dict}, {group, rabbit_auth_cache_ets}, - {group, rabbit_auth_cache_ets_segmented} + {group, rabbit_auth_cache_ets_segmented}, + {group, rabbit_auth_cache_ets_segmented_stateless} ]. groups() -> @@ -16,7 +17,8 @@ groups() -> [ {rabbit_auth_cache_dict, [sequence], CommonTests}, {rabbit_auth_cache_ets, [sequence], CommonTests}, - {rabbit_auth_cache_ets_segmented, [sequence], CommonTests} + {rabbit_auth_cache_ets_segmented, [sequence], CommonTests}, + {rabbit_auth_cache_ets_segmented_stateless, [sequence], CommonTests} ]. init_per_suite(Config) -> @@ -27,9 +29,11 @@ init_per_suite(Config) -> init_per_group(Group, Config) when Group =:= rabbit_auth_cache_dict; Group =:= rabbit_auth_cache_ets -> set_auth_cache_module(Group, [], Config); -init_per_group(rabbit_auth_cache_ets_segmented, Config) -> +init_per_group(Group, Config) + when Group =:= rabbit_auth_cache_ets_segmented; + Group =:= rabbit_auth_cache_ets_segmented_stateless -> TTL = ?config(current_ttl, Config), - set_auth_cache_module(rabbit_auth_cache_ets_segmented, [TTL * 2], Config); + set_auth_cache_module(Group, [TTL * 2], Config); init_per_group(_, Config) -> Config. set_auth_cache_module(Module, Args, Config) ->