This commit is contained in:
Michal Kuratczyk 2025-05-07 17:59:50 +00:00 committed by GitHub
commit 4484f27439
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 79 deletions

View File

@ -44,7 +44,7 @@
-include("amqqueue.hrl").
-behaviour(gen_server2).
-behaviour(gen_server).
-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2]).
@ -56,9 +56,7 @@
-export([update_user_state/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
format_message_queue/2]).
handle_info/2, format_status/1]).
%% Internal
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
@ -221,6 +219,8 @@
put({Type, Key}, none)
end).
-define(HIBERNATE_AFTER, 6_000).
%%----------------------------------------------------------------------------
-export_type([channel_number/0]).
@ -258,9 +258,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts).
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
@ -286,17 +287,17 @@ do_flow(Pid, Method, Content) ->
-spec flush(pid()) -> 'ok'.
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
gen_server:call(Pid, flush, infinity).
-spec shutdown(pid()) -> 'ok'.
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
gen_server:cast(Pid, terminate).
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
send_command(Pid, Msg) ->
gen_server2:cast(Pid, {command, Msg}).
gen_server:cast(Pid, {command, Msg}).
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
@ -317,7 +318,7 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
deliver_reply_local(Pid, Key, Message) ->
case pg_local:in_group(rabbit_channels, Pid) of
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
true -> gen_server:cast(Pid, {deliver_reply, Key, Message});
false -> ok
end.
@ -330,13 +331,25 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(Pid, Msg, infinity) end);
fun() -> gen_server:call(Pid, Msg, infinity) end);
{error, _} ->
not_found
end;
declare_fast_reply_to(_) ->
not_found.
declare_fast_reply_to_v1(EncodedBin) ->
%% the the original encoding function
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
{ok, V1Pid, V1Key} ->
Msg = {declare_fast_reply_to, V1Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server:call(V1Pid, Msg, infinity) end);
{error, _} ->
not_found
end.
-spec list() -> [pid()].
list() ->
@ -357,7 +370,7 @@ info_keys() -> ?INFO_KEYS.
info(Pid) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
case gen_server2:call(Pid, {info, Deadline}, Timeout) of
case gen_server:call(Pid, {info, Deadline}, Timeout) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end
@ -372,7 +385,7 @@ info(Pid) ->
info(Pid, Items) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end
@ -412,7 +425,7 @@ refresh_config_local() ->
_ = rabbit_misc:upmap(
fun (C) ->
try
gen_server2:call(C, refresh_config, infinity)
gen_server:call(C, refresh_config, infinity)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel config "
"for channel ~tp. Reason ~tp",
@ -426,7 +439,7 @@ refresh_interceptors() ->
_ = rabbit_misc:upmap(
fun (C) ->
try
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel interceptors "
"for channel ~tp. Reason ~tp",
@ -447,11 +460,11 @@ ready_for_close(Pid) ->
% This event is necessary for the stats timer to be initialized with
% the correct values once the management agent has started
force_event_refresh(Ref) ->
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
[gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()],
ok.
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
gen_server:call(Pid, list_queue_states).
-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}.
@ -467,8 +480,6 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
@ -542,28 +553,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
fun() -> emit_stats(State2) end),
put_operation_timeout(),
State3 = init_tick_timer(State2),
{ok, State3, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
_ -> 0
end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
{reject_publish, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
case Msg of
emit_stats -> 7;
_ -> 0
end.
{ok, State3}.
handle_call(flush, _From, State) ->
reply(ok, State);
@ -722,7 +712,7 @@ handle_info(emit_stats, State) ->
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
%% NB: don't call noreply/1 since we don't want to kick off the
%% stats timer.
{noreply, send_confirms_and_nacks(State1), hibernate};
{noreply, send_confirms_and_nacks(State1)};
handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
#ch{queue_states = QStates0} = State0) ->
@ -766,20 +756,20 @@ handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{user = User}}).
handle_pre_hibernate(State0) ->
ok = clear_permission_cache(),
State = maybe_cancel_tick_timer(State0),
rabbit_event:if_enabled(
State, #ch.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
os:system_time(millisecond)}])
end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
handle_post_hibernate(State0) ->
State = init_tick_timer(State0),
{noreply, State}.
%handle_pre_hibernate(State0) ->
% ok = clear_permission_cache(),
% State = maybe_cancel_tick_timer(State0),
% rabbit_event:if_enabled(
% State, #ch.stats_timer,
% fun () -> emit_stats(State,
% [{idle_since,
% os:system_time(millisecond)}])
% end),
% {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
%
%handle_post_hibernate(State0) ->
% State = init_tick_timer(State0),
% {noreply, State}.
terminate(_Reason,
State = #ch{cfg = #conf{user = #user{username = Username}},
@ -810,7 +800,13 @@ terminate(_Reason,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
format_status(Status) ->
maps:map(
fun(state, State = #ch{unacked_message_q = UnackedMessageQ}) ->
State#ch{unacked_message_q = lqueue:len(UnackedMessageQ)};
(_,Value) ->
Value
end, Status).
get_consumer_timeout() ->
case application:get_env(rabbit, consumer_timeout) of
@ -822,14 +818,14 @@ get_consumer_timeout() ->
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}.
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
noreply(NewState) -> {noreply, next_state(NewState)}.
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
noreply_coalesce(#ch{confirmed = [], rejected = []} = State) ->
{noreply, ensure_stats_timer(State), hibernate};
{noreply, ensure_stats_timer(State)};
noreply_coalesce(#ch{} = State) ->
% Immediately process 'timeout' info message
{noreply, ensure_stats_timer(State), 0}.
@ -2658,20 +2654,20 @@ init_tick_timer(State) ->
reset_tick_timer(State) ->
State#ch{tick_timer = undefined}.
maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
State;
maybe_cancel_tick_timer(#ch{tick_timer = TRef,
unacked_message_q = UMQ} = State) ->
case ?QUEUE:len(UMQ) of
0 ->
%% we can only cancel the tick timer if the unacked messages
%% queue is empty.
_ = erlang:cancel_timer(TRef),
State#ch{tick_timer = undefined};
_ ->
%% let the timer continue
State
end.
%maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
% State;
%maybe_cancel_tick_timer(#ch{tick_timer = TRef,
% unacked_message_q = UMQ} = State) ->
% case ?QUEUE:len(UMQ) of
% 0 ->
% %% we can only cancel the tick timer if the unacked messages
% %% queue is empty.
% _ = erlang:cancel_timer(TRef),
% State#ch{tick_timer = undefined};
% _ ->
% %% let the timer continue
% State
% end.
now_millis() ->
erlang:monotonic_time(millisecond).

View File

@ -214,11 +214,10 @@ when_global_qos_is_not_permitted_from_conf(Config) ->
list_server_channels(Config, Node) ->
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_channel, list, []).
is_prefetch_limited(ServerCh) ->
GenServer2State = sys:get_state(ServerCh),
ChState = element(4, GenServer2State),
ct:pal("Server channel (~p) state: ~p", [ServerCh, ChState]),
LimiterState = element(3, ChState),
is_prefetch_limited(ChannelPid) ->
ChannelState = sys:get_state(ChannelPid),
ct:pal("Server channel (~p) state: ~p", [ChannelPid, ChannelState]),
LimiterState = element(3, ChannelState),
element(3, LimiterState).
%% -------------------------------------------------------------------