From 596b8851698fdc5b2cff952edfda0c5ce0847771 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 4 Sep 2024 12:58:36 +0200 Subject: [PATCH 1/4] Refactor rabbit_channel from gen_server2 to gen_server --- deps/rabbit/src/rabbit_channel.erl | 56 ++++++++++++++++++------------ 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 38614fc4de..b621bebc4d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -44,7 +44,7 @@ -include("amqqueue.hrl"). --behaviour(gen_server2). +-behaviour(gen_server). -export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2]). @@ -221,6 +221,8 @@ put({Type, Key}, none) end). +-define(HIBERNATE_AFTER, 6_000). + %%---------------------------------------------------------------------------- -export_type([channel_number/0]). @@ -258,9 +260,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams) -> - gen_server2:start_link( + Opts = [{hibernate_after, ?HIBERNATE_AFTER}], + gen_server:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []). + User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts). -spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. @@ -286,17 +289,17 @@ do_flow(Pid, Method, Content) -> -spec flush(pid()) -> 'ok'. flush(Pid) -> - gen_server2:call(Pid, flush, infinity). + gen_server:call(Pid, flush, infinity). -spec shutdown(pid()) -> 'ok'. shutdown(Pid) -> - gen_server2:cast(Pid, terminate). + gen_server:cast(Pid, terminate). -spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. send_command(Pid, Msg) -> - gen_server2:cast(Pid, {command, Msg}). + gen_server:cast(Pid, {command, Msg}). -spec deliver_reply(binary(), mc:state()) -> 'ok'. @@ -317,7 +320,7 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> deliver_reply_local(Pid, Key, Message) -> case pg_local:in_group(rabbit_channels, Pid) of - true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); + true -> gen_server:cast(Pid, {deliver_reply, Key, Message}); false -> ok end. @@ -330,13 +333,25 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> Msg = {declare_fast_reply_to, Key}, rabbit_misc:with_exit_handler( rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end); + fun() -> gen_server:call(Pid, Msg, infinity) end); {error, _} -> not_found end; declare_fast_reply_to(_) -> not_found. +declare_fast_reply_to_v1(EncodedBin) -> + %% the the original encoding function + case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of + {ok, V1Pid, V1Key} -> + Msg = {declare_fast_reply_to, V1Key}, + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server:call(V1Pid, Msg, infinity) end); + {error, _} -> + not_found + end. + -spec list() -> [pid()]. list() -> @@ -357,7 +372,7 @@ info_keys() -> ?INFO_KEYS. info(Pid) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try - case gen_server2:call(Pid, {info, Deadline}, Timeout) of + case gen_server:call(Pid, {info, Deadline}, Timeout) of {ok, Res} -> Res; {error, Error} -> throw(Error) end @@ -372,7 +387,7 @@ info(Pid) -> info(Pid, Items) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try - case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of + case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of {ok, Res} -> Res; {error, Error} -> throw(Error) end @@ -412,7 +427,7 @@ refresh_config_local() -> _ = rabbit_misc:upmap( fun (C) -> try - gen_server2:call(C, refresh_config, infinity) + gen_server:call(C, refresh_config, infinity) catch _:Reason -> rabbit_log:error("Failed to refresh channel config " "for channel ~tp. Reason ~tp", @@ -426,7 +441,7 @@ refresh_interceptors() -> _ = rabbit_misc:upmap( fun (C) -> try - gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) + gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) catch _:Reason -> rabbit_log:error("Failed to refresh channel interceptors " "for channel ~tp. Reason ~tp", @@ -447,11 +462,11 @@ ready_for_close(Pid) -> % This event is necessary for the stats timer to be initialized with % the correct values once the management agent has started force_event_refresh(Ref) -> - [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], + [gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. list_queue_states(Pid) -> - gen_server2:call(Pid, list_queue_states). + gen_server:call(Pid, list_queue_states). -spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}. @@ -467,8 +482,6 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), - rabbit_process_flag:adjust_for_message_handling_proc(), - ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), @@ -542,8 +555,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, fun() -> emit_stats(State2) end), put_operation_timeout(), State3 = init_tick_timer(State2), - {ok, State3, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + {ok, State3}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of @@ -722,7 +734,7 @@ handle_info(emit_stats, State) -> State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), %% NB: don't call noreply/1 since we don't want to kick off the %% stats timer. - {noreply, send_confirms_and_nacks(State1), hibernate}; + {noreply, send_confirms_and_nacks(State1)}; handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason}, #ch{queue_states = QStates0} = State0) -> @@ -822,14 +834,14 @@ get_consumer_timeout() -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. +reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}. -noreply(NewState) -> {noreply, next_state(NewState), hibernate}. +noreply(NewState) -> {noreply, next_state(NewState)}. next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)). noreply_coalesce(#ch{confirmed = [], rejected = []} = State) -> - {noreply, ensure_stats_timer(State), hibernate}; + {noreply, ensure_stats_timer(State)}; noreply_coalesce(#ch{} = State) -> % Immediately process 'timeout' info message {noreply, ensure_stats_timer(State), 0}. From e4fa0e8f1a297bd094dafe3800a4e6b1c53f86a6 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 4 Sep 2024 22:27:49 +0200 Subject: [PATCH 2/4] Fix test relying on GS2 state --- deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl b/deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl index ba5cc5a498..43c314f0cc 100644 --- a/deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl +++ b/deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl @@ -214,11 +214,10 @@ when_global_qos_is_not_permitted_from_conf(Config) -> list_server_channels(Config, Node) -> rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_channel, list, []). -is_prefetch_limited(ServerCh) -> - GenServer2State = sys:get_state(ServerCh), - ChState = element(4, GenServer2State), - ct:pal("Server channel (~p) state: ~p", [ServerCh, ChState]), - LimiterState = element(3, ChState), +is_prefetch_limited(ChannelPid) -> + ChannelState = sys:get_state(ChannelPid), + ct:pal("Server channel (~p) state: ~p", [ChannelPid, ChannelState]), + LimiterState = element(3, ChannelState), element(3, LimiterState). %% ------------------------------------------------------------------- From 35486d2876ffa5ae7a0cd2992983de95c23e5853 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 5 Sep 2024 11:45:47 +0200 Subject: [PATCH 3/4] Remove GS2 `priority_*` callbacks; format_status `format_status` replaces the queue of pending messages, which is sometimes very long, with just the number of messages pending. --- deps/rabbit/src/rabbit_channel.erl | 32 ++++++++---------------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index b621bebc4d..065374b644 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -56,9 +56,7 @@ -export([update_user_state/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, - prioritise_call/4, prioritise_cast/3, prioritise_info/3, - format_message_queue/2]). + handle_info/2, format_status/1]). %% Internal -export([list_local/0, emit_info_local/3, deliver_reply_local/3]). @@ -557,26 +555,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, State3 = init_tick_timer(State2), {ok, State3}. -prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - info -> 9; - {info, _Items} -> 9; - _ -> 0 - end. - -prioritise_cast(Msg, _Len, _State) -> - case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - {reject_publish, _MsgSeqNos, _QPid} -> 5; - _ -> 0 - end. - -prioritise_info(Msg, _Len, _State) -> - case Msg of - emit_stats -> 7; - _ -> 0 - end. - handle_call(flush, _From, State) -> reply(ok, State); @@ -822,7 +800,13 @@ terminate(_Reason, code_change(_OldVsn, State, _Extra) -> {ok, State}. -format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +format_status(Status) -> + maps:map( + fun(state, State = #ch{unacked_message_q = UnackedMessageQ}) -> + State#ch{unacked_message_q = lqueue:len(UnackedMessageQ)}; + (_,Value) -> + Value + end, Status). get_consumer_timeout() -> case application:get_env(rabbit, consumer_timeout) of From 04144fd302dbaf3b76dac71deb524d74c9976918 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 5 Sep 2024 11:54:01 +0200 Subject: [PATCH 4/4] WIP: pre/post_hibernate --- deps/rabbit/src/rabbit_channel.erl | 56 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 065374b644..6538da6266 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -756,20 +756,20 @@ handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{user = User}}). -handle_pre_hibernate(State0) -> - ok = clear_permission_cache(), - State = maybe_cancel_tick_timer(State0), - rabbit_event:if_enabled( - State, #ch.stats_timer, - fun () -> emit_stats(State, - [{idle_since, - os:system_time(millisecond)}]) - end), - {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. - -handle_post_hibernate(State0) -> - State = init_tick_timer(State0), - {noreply, State}. +%handle_pre_hibernate(State0) -> +% ok = clear_permission_cache(), +% State = maybe_cancel_tick_timer(State0), +% rabbit_event:if_enabled( +% State, #ch.stats_timer, +% fun () -> emit_stats(State, +% [{idle_since, +% os:system_time(millisecond)}]) +% end), +% {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. +% +%handle_post_hibernate(State0) -> +% State = init_tick_timer(State0), +% {noreply, State}. terminate(_Reason, State = #ch{cfg = #conf{user = #user{username = Username}}, @@ -2654,20 +2654,20 @@ init_tick_timer(State) -> reset_tick_timer(State) -> State#ch{tick_timer = undefined}. -maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> - State; -maybe_cancel_tick_timer(#ch{tick_timer = TRef, - unacked_message_q = UMQ} = State) -> - case ?QUEUE:len(UMQ) of - 0 -> - %% we can only cancel the tick timer if the unacked messages - %% queue is empty. - _ = erlang:cancel_timer(TRef), - State#ch{tick_timer = undefined}; - _ -> - %% let the timer continue - State - end. +%maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> +% State; +%maybe_cancel_tick_timer(#ch{tick_timer = TRef, +% unacked_message_q = UMQ} = State) -> +% case ?QUEUE:len(UMQ) of +% 0 -> +% %% we can only cancel the tick timer if the unacked messages +% %% queue is empty. +% _ = erlang:cancel_timer(TRef), +% State#ch{tick_timer = undefined}; +% _ -> +% %% let the timer continue +% State +% end. now_millis() -> erlang:monotonic_time(millisecond).