Replace orddicts with maps
This commit is contained in:
parent
c0dcb2b3ee
commit
3e6d1ad207
29
src/gm.erl
29
src/gm.erl
|
@ -399,7 +399,6 @@
|
||||||
-define(FORCE_GC_TIMER, 250).
|
-define(FORCE_GC_TIMER, 250).
|
||||||
-define(VERSION_START, 0).
|
-define(VERSION_START, 0).
|
||||||
-define(SETS, ordsets).
|
-define(SETS, ordsets).
|
||||||
-define(DICT, orddict).
|
|
||||||
|
|
||||||
-record(state,
|
-record(state,
|
||||||
{ self,
|
{ self,
|
||||||
|
@ -824,8 +823,8 @@ handle_msg({catchup, Left, MembersStateLeft},
|
||||||
members_state = MembersState })
|
members_state = MembersState })
|
||||||
when MembersState =/= undefined ->
|
when MembersState =/= undefined ->
|
||||||
MembersStateLeft1 = build_members_state(MembersStateLeft),
|
MembersStateLeft1 = build_members_state(MembersStateLeft),
|
||||||
AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
|
AllMembers = lists:usort(maps:keys(MembersState) ++
|
||||||
?DICT:fetch_keys(MembersStateLeft1)),
|
maps:keys(MembersStateLeft1)),
|
||||||
{MembersState1, Activity} =
|
{MembersState1, Activity} =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun (Id, MembersStateActivity) ->
|
fun (Id, MembersStateActivity) ->
|
||||||
|
@ -995,21 +994,21 @@ is_member_alias(Member, Self, View) ->
|
||||||
dead_member_id({dead, Member}) -> Member.
|
dead_member_id({dead, Member}) -> Member.
|
||||||
|
|
||||||
store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
|
store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
|
||||||
{Ver, ?DICT:store(Id, VMember, View)}.
|
{Ver, maps:put(Id, VMember, View)}.
|
||||||
|
|
||||||
with_view_member(Fun, View, Id) ->
|
with_view_member(Fun, View, Id) ->
|
||||||
store_view_member(Fun(fetch_view_member(Id, View)), View).
|
store_view_member(Fun(fetch_view_member(Id, View)), View).
|
||||||
|
|
||||||
fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
|
fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View).
|
||||||
|
|
||||||
find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
|
find_view_member(Id, {_Ver, View}) -> maps:find(Id, View).
|
||||||
|
|
||||||
blank_view(Ver) -> {Ver, ?DICT:new()}.
|
blank_view(Ver) -> {Ver, maps:new()}.
|
||||||
|
|
||||||
alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
|
alive_view_members({_Ver, View}) -> maps:keys(View).
|
||||||
|
|
||||||
all_known_members({_Ver, View}) ->
|
all_known_members({_Ver, View}) ->
|
||||||
?DICT:fold(
|
maps:fold(
|
||||||
fun (Member, #view_member { aliases = Aliases }, Acc) ->
|
fun (Member, #view_member { aliases = Aliases }, Acc) ->
|
||||||
?SETS:to_list(Aliases) ++ [Member | Acc]
|
?SETS:to_list(Aliases) ++ [Member | Acc]
|
||||||
end, [], View).
|
end, [], View).
|
||||||
|
@ -1374,24 +1373,24 @@ with_member_acc(Fun, Id, {MembersState, Acc}) ->
|
||||||
{store_member(Id, MemberState, MembersState), Acc1}.
|
{store_member(Id, MemberState, MembersState), Acc1}.
|
||||||
|
|
||||||
find_member_or_blank(Id, MembersState) ->
|
find_member_or_blank(Id, MembersState) ->
|
||||||
case ?DICT:find(Id, MembersState) of
|
case maps:find(Id, MembersState) of
|
||||||
{ok, Result} -> Result;
|
{ok, Result} -> Result;
|
||||||
error -> blank_member()
|
error -> blank_member()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
|
erase_member(Id, MembersState) -> maps:remove(Id, MembersState).
|
||||||
|
|
||||||
blank_member() ->
|
blank_member() ->
|
||||||
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
|
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
|
||||||
|
|
||||||
blank_member_state() -> ?DICT:new().
|
blank_member_state() -> maps:new().
|
||||||
|
|
||||||
store_member(Id, MemberState, MembersState) ->
|
store_member(Id, MemberState, MembersState) ->
|
||||||
?DICT:store(Id, MemberState, MembersState).
|
maps:put(Id, MemberState, MembersState).
|
||||||
|
|
||||||
prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
|
prepare_members_state(MembersState) -> maps:to_list(MembersState).
|
||||||
|
|
||||||
build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
|
build_members_state(MembersStateList) -> maps:from_list(MembersStateList).
|
||||||
|
|
||||||
make_member(GroupName) ->
|
make_member(GroupName) ->
|
||||||
{case dirty_read_group(GroupName) of
|
{case dirty_read_group(GroupName) of
|
||||||
|
|
|
@ -184,7 +184,7 @@
|
||||||
%% 'Notify' is a boolean that indicates whether a queue should be
|
%% 'Notify' is a boolean that indicates whether a queue should be
|
||||||
%% notified of a change in the limit or volume that may allow it to
|
%% notified of a change in the limit or volume that may allow it to
|
||||||
%% deliver more messages via the limiter's channel.
|
%% deliver more messages via the limiter's channel.
|
||||||
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
|
queues = maps:new(), % QPid -> {MonitorRef, Notify}
|
||||||
volume = 0}).
|
volume = 0}).
|
||||||
|
|
||||||
%% mode is of type credit_mode()
|
%% mode is of type credit_mode()
|
||||||
|
@ -402,28 +402,28 @@ prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
|
||||||
Limit =/= 0 andalso Volume >= Limit.
|
Limit =/= 0 andalso Volume >= Limit.
|
||||||
|
|
||||||
remember_queue(QPid, State = #lim{queues = Queues}) ->
|
remember_queue(QPid, State = #lim{queues = Queues}) ->
|
||||||
case orddict:is_key(QPid, Queues) of
|
case maps:is_key(QPid, Queues) of
|
||||||
false -> MRef = erlang:monitor(process, QPid),
|
false -> MRef = erlang:monitor(process, QPid),
|
||||||
State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)};
|
State#lim{queues = maps:put(QPid, {MRef, false}, Queues)};
|
||||||
true -> State
|
true -> State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
forget_queue(QPid, State = #lim{queues = Queues}) ->
|
forget_queue(QPid, State = #lim{queues = Queues}) ->
|
||||||
case orddict:find(QPid, Queues) of
|
case maps:find(QPid, Queues) of
|
||||||
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
|
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
|
||||||
State#lim{queues = orddict:erase(QPid, Queues)};
|
State#lim{queues = maps:remove(QPid, Queues)};
|
||||||
error -> State
|
error -> State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
limit_queue(QPid, State = #lim{queues = Queues}) ->
|
limit_queue(QPid, State = #lim{queues = Queues}) ->
|
||||||
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
|
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
|
||||||
State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}.
|
State#lim{queues = maps:update_with(QPid, UpdateFun, Queues)}.
|
||||||
|
|
||||||
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
|
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
|
||||||
{QList, NewQueues} =
|
{QList, NewQueues} =
|
||||||
orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
|
maps:fold(fun (_QPid, {_, false}, Acc) -> Acc;
|
||||||
(QPid, {MRef, true}, {L, D}) ->
|
(QPid, {MRef, true}, {L, D}) ->
|
||||||
{[QPid | L], orddict:store(QPid, {MRef, false}, D)}
|
{[QPid | L], maps:put(QPid, {MRef, false}, D)}
|
||||||
end, {[], Queues}, Queues),
|
end, {[], Queues}, Queues),
|
||||||
case length(QList) of
|
case length(QList) of
|
||||||
0 -> ok;
|
0 -> ok;
|
||||||
|
|
|
@ -780,7 +780,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
|
||||||
sync_timer_ref = undefined,
|
sync_timer_ref = undefined,
|
||||||
sum_valid_data = 0,
|
sum_valid_data = 0,
|
||||||
sum_file_size = 0,
|
sum_file_size = 0,
|
||||||
pending_gc_completion = orddict:new(),
|
pending_gc_completion = maps:new(),
|
||||||
gc_pid = GCPid,
|
gc_pid = GCPid,
|
||||||
file_handles_ets = FileHandlesEts,
|
file_handles_ets = FileHandlesEts,
|
||||||
file_summary_ets = FileSummaryEts,
|
file_summary_ets = FileSummaryEts,
|
||||||
|
@ -1269,7 +1269,7 @@ contains_message(MsgId, From,
|
||||||
gen_server2:reply(From, false),
|
gen_server2:reply(From, false),
|
||||||
State;
|
State;
|
||||||
#msg_location { file = File } ->
|
#msg_location { file = File } ->
|
||||||
case orddict:is_key(File, Pending) of
|
case maps:is_key(File, Pending) of
|
||||||
true -> add_to_pending_gc_completion(
|
true -> add_to_pending_gc_completion(
|
||||||
{contains, MsgId, From}, File, State);
|
{contains, MsgId, From}, File, State);
|
||||||
false -> gen_server2:reply(From, true),
|
false -> gen_server2:reply(From, true),
|
||||||
|
@ -1280,16 +1280,16 @@ contains_message(MsgId, From,
|
||||||
add_to_pending_gc_completion(
|
add_to_pending_gc_completion(
|
||||||
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
|
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
|
||||||
State #msstate { pending_gc_completion =
|
State #msstate { pending_gc_completion =
|
||||||
rabbit_misc:orddict_cons(File, Op, Pending) }.
|
rabbit_misc:maps_cons(File, Op, Pending) }.
|
||||||
|
|
||||||
run_pending(Files, State) ->
|
run_pending(Files, State) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
|
fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
|
||||||
Pending1 = orddict:erase(File, Pending),
|
Pending1 = maps:remove(File, Pending),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun run_pending_action/2,
|
fun run_pending_action/2,
|
||||||
State1 #msstate { pending_gc_completion = Pending1 },
|
State1 #msstate { pending_gc_completion = Pending1 },
|
||||||
lists:reverse(orddict:fetch(File, Pending)))
|
lists:reverse(maps:get(File, Pending)))
|
||||||
end, State, Files).
|
end, State, Files).
|
||||||
|
|
||||||
run_pending_action({read, MsgId, From}, State) ->
|
run_pending_action({read, MsgId, From}, State) ->
|
||||||
|
@ -1320,9 +1320,9 @@ adjust_valid_total_size(File, Delta, State = #msstate {
|
||||||
[{#file_summary.valid_total_size, Delta}]),
|
[{#file_summary.valid_total_size, Delta}]),
|
||||||
State #msstate { sum_valid_data = SumValid + Delta }.
|
State #msstate { sum_valid_data = SumValid + Delta }.
|
||||||
|
|
||||||
orddict_store(Key, Val, Dict) ->
|
maps_store(Key, Val, Dict) ->
|
||||||
false = orddict:is_key(Key, Dict),
|
false = maps:is_key(Key, Dict),
|
||||||
orddict:store(Key, Val, Dict).
|
maps:put(Key, Val, Dict).
|
||||||
|
|
||||||
update_pending_confirms(Fun, CRef,
|
update_pending_confirms(Fun, CRef,
|
||||||
State = #msstate { clients = Clients,
|
State = #msstate { clients = Clients,
|
||||||
|
@ -1860,7 +1860,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
|
||||||
%% complete traversal of FileSummaryEts.
|
%% complete traversal of FileSummaryEts.
|
||||||
First = ets:first(FileSummaryEts),
|
First = ets:first(FileSummaryEts),
|
||||||
case First =:= '$end_of_table' orelse
|
case First =:= '$end_of_table' orelse
|
||||||
orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
|
maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
|
||||||
true ->
|
true ->
|
||||||
State;
|
State;
|
||||||
false ->
|
false ->
|
||||||
|
@ -1869,8 +1869,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
|
||||||
not_found ->
|
not_found ->
|
||||||
State;
|
State;
|
||||||
{Src, Dst} ->
|
{Src, Dst} ->
|
||||||
Pending1 = orddict_store(Dst, [],
|
Pending1 = maps_store(Dst, [],
|
||||||
orddict_store(Src, [], Pending)),
|
maps_store(Src, [], Pending)),
|
||||||
State1 = close_handle(Src, close_handle(Dst, State)),
|
State1 = close_handle(Src, close_handle(Dst, State)),
|
||||||
true = ets:update_element(FileSummaryEts, Src,
|
true = ets:update_element(FileSummaryEts, Src,
|
||||||
{#file_summary.locked, true}),
|
{#file_summary.locked, true}),
|
||||||
|
@ -1926,7 +1926,7 @@ delete_file_if_empty(File, State = #msstate {
|
||||||
0 -> true = ets:update_element(FileSummaryEts, File,
|
0 -> true = ets:update_element(FileSummaryEts, File,
|
||||||
{#file_summary.locked, true}),
|
{#file_summary.locked, true}),
|
||||||
ok = rabbit_msg_store_gc:delete(GCPid, File),
|
ok = rabbit_msg_store_gc:delete(GCPid, File),
|
||||||
Pending1 = orddict_store(File, [], Pending),
|
Pending1 = maps_store(File, [], Pending),
|
||||||
close_handle(File,
|
close_handle(File,
|
||||||
State #msstate { pending_gc_completion = Pending1 });
|
State #msstate { pending_gc_completion = Pending1 });
|
||||||
_ -> State
|
_ -> State
|
||||||
|
|
|
@ -350,7 +350,7 @@ init([]) ->
|
||||||
subscribers = pmon:new(),
|
subscribers = pmon:new(),
|
||||||
partitions = [],
|
partitions = [],
|
||||||
guid = rabbit_guid:gen(),
|
guid = rabbit_guid:gen(),
|
||||||
node_guids = orddict:new(),
|
node_guids = maps:new(),
|
||||||
autoheal = rabbit_autoheal:init()})}.
|
autoheal = rabbit_autoheal:init()})}.
|
||||||
|
|
||||||
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
|
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
|
||||||
|
@ -405,17 +405,17 @@ handle_cast({node_up, Node, NodeType, GUID},
|
||||||
State = #state{guid = MyGUID,
|
State = #state{guid = MyGUID,
|
||||||
node_guids = GUIDs}) ->
|
node_guids = GUIDs}) ->
|
||||||
cast(Node, {announce_guid, node(), MyGUID}),
|
cast(Node, {announce_guid, node(), MyGUID}),
|
||||||
GUIDs1 = orddict:store(Node, GUID, GUIDs),
|
GUIDs1 = maps:put(Node, GUID, GUIDs),
|
||||||
handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
|
handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
|
||||||
|
|
||||||
handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
|
handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
|
||||||
{noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
|
{noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}};
|
||||||
|
|
||||||
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
|
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
|
||||||
State = #state{guid = MyGUID,
|
State = #state{guid = MyGUID,
|
||||||
node_guids = GUIDs}) ->
|
node_guids = GUIDs}) ->
|
||||||
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
|
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
|
||||||
orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of
|
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
|
||||||
true -> spawn_link( %%[1]
|
true -> spawn_link( %%[1]
|
||||||
fun () ->
|
fun () ->
|
||||||
case rpc:call(Node, rabbit, is_running, []) of
|
case rpc:call(Node, rabbit, is_running, []) of
|
||||||
|
@ -560,10 +560,10 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
|
||||||
cast(N, {check_partial_partition,
|
cast(N, {check_partial_partition,
|
||||||
Node, node(), DownGUID, CheckGUID, MyGUID})
|
Node, node(), DownGUID, CheckGUID, MyGUID})
|
||||||
end,
|
end,
|
||||||
case orddict:find(Node, GUIDs) of
|
case maps:find(Node, GUIDs) of
|
||||||
{ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
|
{ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
|
||||||
-- [node(), Node],
|
-- [node(), Node],
|
||||||
[case orddict:find(N, GUIDs) of
|
[case maps:find(N, GUIDs) of
|
||||||
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
|
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
|
||||||
error -> ok
|
error -> ok
|
||||||
end || N <- Alive];
|
end || N <- Alive];
|
||||||
|
|
|
@ -207,13 +207,13 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
|
||||||
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
|
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
|
||||||
|
|
||||||
batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
|
batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
|
||||||
PubDict = partition_publish_batch(Publishes, MaxP),
|
PubMap = partition_publish_batch(Publishes, MaxP),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun ({Priority, Pubs}, St) ->
|
fun ({Priority, Pubs}, St) ->
|
||||||
pick1(fun (_P, BQSN) ->
|
pick1(fun (_P, BQSN) ->
|
||||||
BQ:batch_publish(Pubs, ChPid, Flow, BQSN)
|
BQ:batch_publish(Pubs, ChPid, Flow, BQSN)
|
||||||
end, Priority, St)
|
end, Priority, St)
|
||||||
end, State, orddict:to_list(PubDict));
|
end, State, maps:to_list(PubMap));
|
||||||
batch_publish(Publishes, ChPid, Flow,
|
batch_publish(Publishes, ChPid, Flow,
|
||||||
State = #passthrough{bq = BQ, bqs = BQS}) ->
|
State = #passthrough{bq = BQ, bqs = BQS}) ->
|
||||||
?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)).
|
?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)).
|
||||||
|
@ -229,7 +229,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
|
||||||
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
|
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
|
||||||
|
|
||||||
batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
|
batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
|
||||||
PubDict = partition_publish_delivered_batch(Publishes, MaxP),
|
PubMap = partition_publish_delivered_batch(Publishes, MaxP),
|
||||||
{PrioritiesAndAcks, State1} =
|
{PrioritiesAndAcks, State1} =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
|
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
|
||||||
|
@ -241,7 +241,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [
|
||||||
{priority_on_acktags(P, AckTags), BQSN1}
|
{priority_on_acktags(P, AckTags), BQSN1}
|
||||||
end, Priority, St),
|
end, Priority, St),
|
||||||
{[PriosAndAcks1 | PriosAndAcks], St1}
|
{[PriosAndAcks1 | PriosAndAcks], St1}
|
||||||
end, {[], State}, orddict:to_list(PubDict)),
|
end, {[], State}, maps:to_list(PubMap)),
|
||||||
{lists:reverse(PrioritiesAndAcks), State1};
|
{lists:reverse(PrioritiesAndAcks), State1};
|
||||||
batch_publish_delivered(Publishes, ChPid, Flow,
|
batch_publish_delivered(Publishes, ChPid, Flow,
|
||||||
State = #passthrough{bq = BQ, bqs = BQS}) ->
|
State = #passthrough{bq = BQ, bqs = BQS}) ->
|
||||||
|
@ -327,7 +327,7 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
|
||||||
AckTagsByPriority = partition_acktags(AckTags),
|
AckTagsByPriority = partition_acktags(AckTags),
|
||||||
fold2(
|
fold2(
|
||||||
fun (P, BQSN, AccN) ->
|
fun (P, BQSN, AccN) ->
|
||||||
case orddict:find(P, AckTagsByPriority) of
|
case maps:find(P, AckTagsByPriority) of
|
||||||
{ok, ATagsN} -> {AccN1, BQSN1} =
|
{ok, ATagsN} -> {AccN1, BQSN1} =
|
||||||
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
|
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
|
||||||
{priority_on_acktags(P, AccN1), BQSN1};
|
{priority_on_acktags(P, AccN1), BQSN1};
|
||||||
|
@ -439,7 +439,7 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
|
||||||
MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
|
MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
|
||||||
lists:foldl(fun (Acks, MAs) ->
|
lists:foldl(fun (Acks, MAs) ->
|
||||||
{P, _AckTag} = hd(Acks),
|
{P, _AckTag} = hd(Acks),
|
||||||
Pubs = orddict:fetch(P, MsgsByPriority),
|
Pubs = maps:get(P, MsgsByPriority),
|
||||||
MAs0 = zip_msgs_and_acks(Pubs, Acks),
|
MAs0 = zip_msgs_and_acks(Pubs, Acks),
|
||||||
MAs ++ MAs0
|
MAs ++ MAs0
|
||||||
end, Accumulator, AckTags);
|
end, Accumulator, AckTags);
|
||||||
|
@ -527,7 +527,7 @@ fold_min2(Fun, State) ->
|
||||||
fold_by_acktags2(Fun, AckTags, State) ->
|
fold_by_acktags2(Fun, AckTags, State) ->
|
||||||
AckTagsByPriority = partition_acktags(AckTags),
|
AckTagsByPriority = partition_acktags(AckTags),
|
||||||
fold_append2(fun (P, BQSN) ->
|
fold_append2(fun (P, BQSN) ->
|
||||||
case orddict:find(P, AckTagsByPriority) of
|
case maps:find(P, AckTagsByPriority) of
|
||||||
{ok, AckTagsN} -> Fun(AckTagsN, BQSN);
|
{ok, AckTagsN} -> Fun(AckTagsN, BQSN);
|
||||||
error -> {[], BQSN}
|
error -> {[], BQSN}
|
||||||
end
|
end
|
||||||
|
@ -597,11 +597,11 @@ partition_publishes(Publishes, ExtractMsg, MaxP) ->
|
||||||
Partitioned =
|
Partitioned =
|
||||||
lists:foldl(fun (Pub, Dict) ->
|
lists:foldl(fun (Pub, Dict) ->
|
||||||
Msg = ExtractMsg(Pub),
|
Msg = ExtractMsg(Pub),
|
||||||
rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict)
|
rabbit_misc:maps_cons(priority(Msg, MaxP), Pub, Dict)
|
||||||
end, orddict:new(), Publishes),
|
end, maps:new(), Publishes),
|
||||||
orddict:map(fun (_P, RevPubs) ->
|
maps:map(fun (_P, RevPubs) ->
|
||||||
lists:reverse(RevPubs)
|
lists:reverse(RevPubs)
|
||||||
end, Partitioned).
|
end, Partitioned).
|
||||||
|
|
||||||
|
|
||||||
priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
|
priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
|
||||||
|
@ -625,14 +625,14 @@ add_maybe_infinity(infinity, _) -> infinity;
|
||||||
add_maybe_infinity(_, infinity) -> infinity;
|
add_maybe_infinity(_, infinity) -> infinity;
|
||||||
add_maybe_infinity(A, B) -> A + B.
|
add_maybe_infinity(A, B) -> A + B.
|
||||||
|
|
||||||
partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()).
|
partition_acktags(AckTags) -> partition_acktags(AckTags, maps:new()).
|
||||||
|
|
||||||
partition_acktags([], Partitioned) ->
|
partition_acktags([], Partitioned) ->
|
||||||
orddict:map(fun (_P, RevAckTags) ->
|
maps:map(fun (_P, RevAckTags) ->
|
||||||
lists:reverse(RevAckTags)
|
lists:reverse(RevAckTags)
|
||||||
end, Partitioned);
|
end, Partitioned);
|
||||||
partition_acktags([{P, AckTag} | Rest], Partitioned) ->
|
partition_acktags([{P, AckTag} | Rest], Partitioned) ->
|
||||||
partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)).
|
partition_acktags(Rest, rabbit_misc:maps_cons(P, AckTag, Partitioned)).
|
||||||
|
|
||||||
priority_on_acktags(P, AckTags) ->
|
priority_on_acktags(P, AckTags) ->
|
||||||
[case Tag of
|
[case Tag of
|
||||||
|
|
|
@ -254,9 +254,9 @@ subtract_acks(ChPid, AckTags, State) ->
|
||||||
not_found;
|
not_found;
|
||||||
C = #cr{acktags = ChAckTags, limiter = Lim} ->
|
C = #cr{acktags = ChAckTags, limiter = Lim} ->
|
||||||
{CTagCounts, AckTags2} = subtract_acks(
|
{CTagCounts, AckTags2} = subtract_acks(
|
||||||
AckTags, [], orddict:new(), ChAckTags),
|
AckTags, [], maps:new(), ChAckTags),
|
||||||
{Unblocked, Lim2} =
|
{Unblocked, Lim2} =
|
||||||
orddict:fold(
|
maps:fold(
|
||||||
fun (CTag, Count, {UnblockedN, LimN}) ->
|
fun (CTag, Count, {UnblockedN, LimN}) ->
|
||||||
{Unblocked1, LimN1} =
|
{Unblocked1, LimN1} =
|
||||||
rabbit_limiter:ack_from_queue(LimN, CTag, Count),
|
rabbit_limiter:ack_from_queue(LimN, CTag, Count),
|
||||||
|
@ -278,7 +278,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
|
||||||
case queue:out(AckQ) of
|
case queue:out(AckQ) of
|
||||||
{{value, {T, CTag}}, QTail} ->
|
{{value, {T, CTag}}, QTail} ->
|
||||||
subtract_acks(TL, Prefix,
|
subtract_acks(TL, Prefix,
|
||||||
orddict:update_counter(CTag, 1, CTagCounts), QTail);
|
maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
|
||||||
{{value, V}, QTail} ->
|
{{value, V}, QTail} ->
|
||||||
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
|
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
|
||||||
{empty, _} ->
|
{empty, _} ->
|
||||||
|
|
|
@ -1776,7 +1776,7 @@ remove_queue_entries(Q, DelsAndAcksFun,
|
||||||
State = #vqstate{msg_store_clients = MSCState}) ->
|
State = #vqstate{msg_store_clients = MSCState}) ->
|
||||||
{MsgIdsByStore, Delivers, Acks, State1} =
|
{MsgIdsByStore, Delivers, Acks, State1} =
|
||||||
?QUEUE:foldl(fun remove_queue_entries1/2,
|
?QUEUE:foldl(fun remove_queue_entries1/2,
|
||||||
{orddict:new(), [], [], State}, Q),
|
{maps:new(), [], [], State}, Q),
|
||||||
remove_msgs_by_id(MsgIdsByStore, MSCState),
|
remove_msgs_by_id(MsgIdsByStore, MSCState),
|
||||||
DelsAndAcksFun(Delivers, Acks, State1).
|
DelsAndAcksFun(Delivers, Acks, State1).
|
||||||
|
|
||||||
|
@ -1786,7 +1786,7 @@ remove_queue_entries1(
|
||||||
is_persistent = IsPersistent} = MsgStatus,
|
is_persistent = IsPersistent} = MsgStatus,
|
||||||
{MsgIdsByStore, Delivers, Acks, State}) ->
|
{MsgIdsByStore, Delivers, Acks, State}) ->
|
||||||
{case MsgInStore of
|
{case MsgInStore of
|
||||||
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
|
true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
|
||||||
false -> MsgIdsByStore
|
false -> MsgIdsByStore
|
||||||
end,
|
end,
|
||||||
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
|
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
|
||||||
|
@ -2143,27 +2143,27 @@ purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
|
||||||
qi_pending_ack = gb_trees:empty()},
|
qi_pending_ack = gb_trees:empty()},
|
||||||
{IndexOnDiskSeqIds, MsgIdsByStore, State1}.
|
{IndexOnDiskSeqIds, MsgIdsByStore, State1}.
|
||||||
|
|
||||||
%% MsgIdsByStore is an orddict with two keys:
|
%% MsgIdsByStore is an map with two keys:
|
||||||
%%
|
%%
|
||||||
%% true: holds a list of Persistent Message Ids.
|
%% true: holds a list of Persistent Message Ids.
|
||||||
%% false: holds a list of Transient Message Ids.
|
%% false: holds a list of Transient Message Ids.
|
||||||
%%
|
%%
|
||||||
%% When we call orddict:to_list/1 we get two sets of msg ids, where
|
%% When we call maps:to_list/1 we get two sets of msg ids, where
|
||||||
%% IsPersistent is either true for persistent messages or false for
|
%% IsPersistent is either true for persistent messages or false for
|
||||||
%% transient ones. The msg_store_remove/3 function takes this boolean
|
%% transient ones. The msg_store_remove/3 function takes this boolean
|
||||||
%% flag to determine from which store the messages should be removed
|
%% flag to determine from which store the messages should be removed
|
||||||
%% from.
|
%% from.
|
||||||
remove_msgs_by_id(MsgIdsByStore, MSCState) ->
|
remove_msgs_by_id(MsgIdsByStore, MSCState) ->
|
||||||
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|
||||||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
|
|| {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)].
|
||||||
|
|
||||||
remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
|
remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
|
||||||
case orddict:find(false, MsgIdsByStore) of
|
case maps:find(false, MsgIdsByStore) of
|
||||||
error -> ok;
|
error -> ok;
|
||||||
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
|
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
accumulate_ack_init() -> {[], orddict:new(), []}.
|
accumulate_ack_init() -> {[], maps:new(), []}.
|
||||||
|
|
||||||
accumulate_ack(#msg_status { seq_id = SeqId,
|
accumulate_ack(#msg_status { seq_id = SeqId,
|
||||||
msg_id = MsgId,
|
msg_id = MsgId,
|
||||||
|
@ -2173,7 +2173,7 @@ accumulate_ack(#msg_status { seq_id = SeqId,
|
||||||
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
|
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
|
||||||
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
|
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
|
||||||
case MsgInStore of
|
case MsgInStore of
|
||||||
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
|
true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
|
||||||
false -> MsgIdsByStore
|
false -> MsgIdsByStore
|
||||||
end,
|
end,
|
||||||
[MsgId | AllMsgIds]}.
|
[MsgId | AllMsgIds]}.
|
||||||
|
|
|
@ -197,10 +197,10 @@ categorise_by_scope(Version) when is_list(Version) ->
|
||||||
rabbit_misc:all_module_attributes(rabbit_upgrade),
|
rabbit_misc:all_module_attributes(rabbit_upgrade),
|
||||||
{Name, Scope, _Requires} <- Attributes,
|
{Name, Scope, _Requires} <- Attributes,
|
||||||
lists:member(Name, Version)],
|
lists:member(Name, Version)],
|
||||||
orddict:to_list(
|
maps:to_list(
|
||||||
lists:foldl(fun ({Scope, Name}, CatVersion) ->
|
lists:foldl(fun ({Scope, Name}, CatVersion) ->
|
||||||
rabbit_misc:orddict_cons(Scope, Name, CatVersion)
|
rabbit_misc:maps_cons(Scope, Name, CatVersion)
|
||||||
end, orddict:new(), Categorised)).
|
end, maps:new(), Categorised)).
|
||||||
|
|
||||||
dir() -> rabbit_mnesia:dir().
|
dir() -> rabbit_mnesia:dir().
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue