Replace dicts with maps
This commit is contained in:
parent
d779ff4c79
commit
85c1946b6a
|
|
@ -30,7 +30,7 @@
|
|||
-record(state, {connection,
|
||||
channel_sup_sup,
|
||||
map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState}
|
||||
map_pid_num = dict:new(), %% Pid -> Number
|
||||
map_pid_num = #{}, %% Pid -> Number
|
||||
channel_max = ?MAX_CHANNEL_NUMBER,
|
||||
closing = false}).
|
||||
|
||||
|
|
@ -222,14 +222,14 @@ internal_pass_frame(Number, Frame, State) ->
|
|||
internal_register(Number, Pid, AState,
|
||||
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
|
||||
MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
|
||||
MapPN1 = dict:store(Pid, Number, MapPN),
|
||||
MapPN1 = maps:put(Pid, Number, MapPN),
|
||||
State#state{map_num_pa = MapNPA1,
|
||||
map_pid_num = MapPN1}.
|
||||
|
||||
internal_unregister(Number, Pid,
|
||||
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
|
||||
MapNPA1 = gb_trees:delete(Number, MapNPA),
|
||||
MapPN1 = dict:erase(Pid, MapPN),
|
||||
MapPN1 = maps:remove(Pid, MapPN),
|
||||
State#state{map_num_pa = MapNPA1,
|
||||
map_pid_num = MapPN1}.
|
||||
|
||||
|
|
@ -245,7 +245,7 @@ internal_lookup_npa(Number, #state{map_num_pa = MapNPA}) ->
|
|||
end.
|
||||
|
||||
internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
|
||||
case dict:find(Pid, MapPN) of {ok, Number} -> Number;
|
||||
case maps:find(Pid, MapPN) of {ok, Number} -> Number;
|
||||
error -> undefined
|
||||
end.
|
||||
|
||||
|
|
@ -255,4 +255,4 @@ internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
|
|||
signal_channels_connection_closing(ChannelCloseType, Reason,
|
||||
#state{map_pid_num = MapPN}) ->
|
||||
[amqp_channel:connection_closing(Pid, ChannelCloseType, Reason)
|
||||
|| Pid <- dict:fetch_keys(MapPN)].
|
||||
|| Pid <- maps:keys(MapPN)].
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@
|
|||
reply_queue,
|
||||
exchange,
|
||||
routing_key,
|
||||
continuations = dict:new(),
|
||||
continuations = #{},
|
||||
correlation_id = 0}).
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
|
|
@ -116,7 +116,7 @@ publish(Payload, From,
|
|||
amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
|
||||
payload = Payload}),
|
||||
State#state{correlation_id = CorrelationId + 1,
|
||||
continuations = dict:store(EncodedCorrelationId, From, Continuations)}.
|
||||
continuations = maps:put(EncodedCorrelationId, From, Continuations)}.
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
|
@ -175,10 +175,10 @@ handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
|
|||
#amqp_msg{props = #'P_basic'{correlation_id = Id},
|
||||
payload = Payload}},
|
||||
State = #state{continuations = Conts, channel = Channel}) ->
|
||||
From = dict:fetch(Id, Conts),
|
||||
From = maps:get(Id, Conts),
|
||||
gen_server:reply(From, Payload),
|
||||
amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
||||
{noreply, State#state{continuations = dict:erase(Id, Conts) }}.
|
||||
{noreply, State#state{continuations = maps:remove(Id, Conts) }}.
|
||||
|
||||
%% @private
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
|
|
|||
|
|
@ -49,9 +49,9 @@
|
|||
handle_deliver/3, handle_deliver/4,
|
||||
handle_info/2, handle_call/3, terminate/2]).
|
||||
|
||||
-record(state, {consumers = dict:new(), %% Tag -> ConsumerPid
|
||||
-record(state, {consumers = #{}, %% Tag -> ConsumerPid
|
||||
unassigned = undefined, %% Pid
|
||||
monitors = dict:new(), %% Pid -> {Count, MRef}
|
||||
monitors = #{}, %% Pid -> {Count, MRef}
|
||||
default_consumer = none}).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
|
|
@ -99,7 +99,7 @@ handle_consume(#'basic.consume'{consumer_tag = Tag,
|
|||
case {Result, NoWait} of
|
||||
{ok, true} ->
|
||||
{ok, State#state
|
||||
{consumers = dict:store(Tag, Pid, Consumers),
|
||||
{consumers = maps:put(Tag, Pid, Consumers),
|
||||
monitors = add_to_monitor_dict(Pid, Monitors)}};
|
||||
{ok, false} ->
|
||||
{ok, State#state{unassigned = Pid}};
|
||||
|
|
@ -119,7 +119,7 @@ handle_consume_ok(BasicConsumeOk, _BasicConsume,
|
|||
when is_pid(Pid) ->
|
||||
State1 =
|
||||
State#state{
|
||||
consumers = dict:store(tag(BasicConsumeOk), Pid, Consumers),
|
||||
consumers = maps:put(tag(BasicConsumeOk), Pid, Consumers),
|
||||
monitors = add_to_monitor_dict(Pid, Monitors),
|
||||
unassigned = undefined},
|
||||
deliver(BasicConsumeOk, State1),
|
||||
|
|
@ -169,18 +169,18 @@ handle_info({'DOWN', _MRef, process, Pid, _Info},
|
|||
State = #state{monitors = Monitors,
|
||||
consumers = Consumers,
|
||||
default_consumer = DConsumer }) ->
|
||||
case dict:find(Pid, Monitors) of
|
||||
case maps:find(Pid, Monitors) of
|
||||
{ok, _CountMRef} ->
|
||||
{ok, State#state{monitors = dict:erase(Pid, Monitors),
|
||||
{ok, State#state{monitors = maps:remove(Pid, Monitors),
|
||||
consumers =
|
||||
dict:filter(
|
||||
maps:filter(
|
||||
fun (_, Pid1) when Pid1 =:= Pid -> false;
|
||||
(_, _) -> true
|
||||
end, Consumers)}};
|
||||
error ->
|
||||
case Pid of
|
||||
DConsumer -> {ok, State#state{
|
||||
monitors = dict:erase(Pid, Monitors),
|
||||
monitors = maps:remove(Pid, Monitors),
|
||||
default_consumer = none}};
|
||||
_ -> {ok, State} %% unnamed consumer went down
|
||||
%% before receiving consume_ok
|
||||
|
|
@ -230,9 +230,9 @@ deliver(Method, Message, DeliveryCtx, State) ->
|
|||
do_cancel(Cancel, State = #state{consumers = Consumers,
|
||||
monitors = Monitors}) ->
|
||||
Tag = tag(Cancel),
|
||||
case dict:find(Tag, Consumers) of
|
||||
case maps:find(Tag, Consumers) of
|
||||
{ok, Pid} -> State#state{
|
||||
consumers = dict:erase(Tag, Consumers),
|
||||
consumers = maps:remove(Tag, Consumers),
|
||||
monitors = remove_from_monitor_dict(Pid, Monitors)};
|
||||
error -> %% Untracked consumer. Do nothing.
|
||||
State
|
||||
|
|
@ -240,7 +240,7 @@ do_cancel(Cancel, State = #state{consumers = Consumers,
|
|||
|
||||
resolve_consumer(Tag, #state{consumers = Consumers,
|
||||
default_consumer = DefaultConsumer}) ->
|
||||
case dict:find(Tag, Consumers) of
|
||||
case maps:find(Tag, Consumers) of
|
||||
{ok, ConsumerPid} -> {consumer, ConsumerPid};
|
||||
error -> case DefaultConsumer of
|
||||
none -> error;
|
||||
|
|
@ -255,16 +255,16 @@ tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
|
|||
tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag.
|
||||
|
||||
add_to_monitor_dict(Pid, Monitors) ->
|
||||
case dict:find(Pid, Monitors) of
|
||||
error -> dict:store(Pid,
|
||||
{1, erlang:monitor(process, Pid)},
|
||||
Monitors);
|
||||
{ok, {Count, MRef}} -> dict:store(Pid, {Count + 1, MRef}, Monitors)
|
||||
case maps:find(Pid, Monitors) of
|
||||
error -> maps:put(Pid,
|
||||
{1, erlang:monitor(process, Pid)},
|
||||
Monitors);
|
||||
{ok, {Count, MRef}} -> maps:put(Pid, {Count + 1, MRef}, Monitors)
|
||||
end.
|
||||
|
||||
remove_from_monitor_dict(Pid, Monitors) ->
|
||||
case dict:fetch(Pid, Monitors) of
|
||||
case maps:get(Pid, Monitors) of
|
||||
{1, MRef} -> erlang:demonitor(MRef),
|
||||
dict:erase(Pid, Monitors);
|
||||
{Count, MRef} -> dict:store(Pid, {Count - 1, MRef}, Monitors)
|
||||
maps:remove(Pid, Monitors);
|
||||
{Count, MRef} -> maps:put(Pid, {Count - 1, MRef}, Monitors)
|
||||
end.
|
||||
|
|
|
|||
Loading…
Reference in New Issue