Merge pull request #13821 from rabbitmq/mergify/bp/v4.1.x/pr-13820

Shovel test flakes and logging fixes (backport #13820)
This commit is contained in:
Michael Klishin 2025-04-28 20:07:09 +04:00 committed by GitHub
commit 8bc760ece6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19 additions and 6 deletions

View File

@ -384,6 +384,10 @@ init([Driver, Connection, ChannelNumber, Consumer, Identity]) ->
handle_call(open, From, State) -> handle_call(open, From, State) ->
{noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)}; {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
%% @private %% @private
handle_call(flush, _From, State) ->
flush_writer(State),
{noreply, State};
%% @private
handle_call({close, Code, Text}, From, State) -> handle_call({close, Code, Text}, From, State) ->
handle_close(Code, Text, From, State); handle_close(Code, Text, From, State);
%% @private %% @private

View File

@ -72,7 +72,8 @@ handle_message({'DOWN', _MRef, process, _ConnSup, shutdown}, State) ->
handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) -> handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) ->
{stop, {remote_node_down, Reason}, State}; {stop, {remote_node_down, Reason}, State};
handle_message({'EXIT', Pid, Reason}, State) -> handle_message({'EXIT', Pid, Reason}, State) ->
{stop, rabbit_misc:format("stopping because dependent process ~tp died: ~tp", [Pid, Reason]), State}; ?LOG_INFO("stopping because dependent process ~tp died: ~tp", [Pid, Reason]),
{stop, normal, State};
handle_message(Msg, State) -> handle_message(Msg, State) ->
{stop, {unexpected_msg, Msg}, State}. {stop, {unexpected_msg, Msg}, State}.

View File

@ -10,6 +10,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
-import(rabbit_ct_helpers, [eventually/1]).
-compile(export_all). -compile(export_all).
-export([spawn_suspender_proc/1]). -export([spawn_suspender_proc/1]).
@ -696,9 +698,11 @@ credit_flow(Config) ->
5000), 5000),
%% There should be only one process with a message buildup %% There should be only one process with a message buildup
[{WriterPid, MQLen, _}, {_, 0, _} | _] = Top = [{WriterPid, MQLen, _}, {_, P, _} | _] =
rabbit_ct_broker_helpers:rpc( rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 10]), Config, 0, recon, proc_count, [message_queue_len, 10]),
ct:pal("Top processes by message queue length: ~p", [Top]),
?assert(P < 3),
%% The writer process should have only a limited %% The writer process should have only a limited
%% message queue. The shovel stops sending messages %% message queue. The shovel stops sending messages
@ -725,9 +729,10 @@ credit_flow(Config) ->
end, end,
5000), 5000),
#{messages := 1000} = message_count(Config, <<"dest">>), #{messages := 1000} = message_count(Config, <<"dest">>),
[{_, 0, _}] = [{_, P, _}] =
rabbit_ct_broker_helpers:rpc( rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 1]), Config, 0, recon, proc_count, [message_queue_len, 1]),
?assert(P < 3),
%% Status only transitions from flow to running %% Status only transitions from flow to running
%% after a 1 second state-change-interval %% after a 1 second state-change-interval
@ -839,9 +844,12 @@ dest_resource_alarm(AckMode, Config) ->
MsgCnts = message_count(Config, <<"src">>), MsgCnts = message_count(Config, <<"src">>),
%% There should be no process with a message buildup %% There should be no process with a message buildup
[{_, 0, _}] = eventually(?_assertEqual(0, begin
rabbit_ct_broker_helpers:rpc( Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 1]), Config, 0, recon, proc_count, [message_queue_len, 1]),
ct:pal("Top process by message queue length: ~p", [Top]),
P
end)),
%% Clear the resource alarm, all messages should %% Clear the resource alarm, all messages should
%% arrive to the dest queue %% arrive to the dest queue