From d9ec0b35adb4c5bd0cf8cce02c965bc0b9f34e26 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 25 Apr 2025 17:21:05 +0200 Subject: [PATCH 1/3] Shovel: de-flake dynamic_SUITE checking that not a single process has a message in the mailbox is prone to flakes. (cherry picked from commit 0ec41c6c414debeea745ad9c601df6217ccd7075) --- deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index e6e21e02dd..aa1f34e386 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -10,6 +10,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-import(rabbit_ct_helpers, [eventually/1]). + -compile(export_all). -export([spawn_suspender_proc/1]). @@ -696,9 +698,11 @@ credit_flow(Config) -> 5000), %% There should be only one process with a message buildup - [{WriterPid, MQLen, _}, {_, 0, _} | _] = + Top = [{WriterPid, MQLen, _}, {_, P, _} | _] = rabbit_ct_broker_helpers:rpc( Config, 0, recon, proc_count, [message_queue_len, 10]), + ct:pal("Top processes by message queue length: ~p", [Top]), + ?assert(P < 3), %% The writer process should have only a limited %% message queue. The shovel stops sending messages @@ -725,9 +729,10 @@ credit_flow(Config) -> end, 5000), #{messages := 1000} = message_count(Config, <<"dest">>), - [{_, 0, _}] = + [{_, P, _}] = rabbit_ct_broker_helpers:rpc( Config, 0, recon, proc_count, [message_queue_len, 1]), + ?assert(P < 3), %% Status only transitions from flow to running %% after a 1 second state-change-interval @@ -839,9 +844,12 @@ dest_resource_alarm(AckMode, Config) -> MsgCnts = message_count(Config, <<"src">>), %% There should be no process with a message buildup - [{_, 0, _}] = - rabbit_ct_broker_helpers:rpc( - Config, 0, recon, proc_count, [message_queue_len, 1]), + eventually(?_assertEqual(0, begin + Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc( + Config, 0, recon, proc_count, [message_queue_len, 1]), + ct:pal("Top process by message queue length: ~p", [Top]), + P + end)), %% Clear the resource alarm, all messages should %% arrive to the dest queue From f21a32d822d25f20eecf25a966d4274aa9ad5368 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 28 Apr 2025 15:02:35 +0200 Subject: [PATCH 2/3] Add a `flush` handler to amqp_channel rabbit_channel may use amqp_channel as the writer. When terminating, rabbit_channel sends a `flush` message to its writer. If amqp_channel is in use, that led to a `function_clause` crash. (cherry picked from commit 0ce6ad0f0fd0fabdb7c0db2bd193ab763ebb25c1) --- deps/amqp_client/src/amqp_channel.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index 3a9aca680e..d46439a320 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -384,6 +384,10 @@ init([Driver, Connection, ChannelNumber, Consumer, Identity]) -> handle_call(open, From, State) -> {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)}; %% @private +handle_call(flush, _From, State) -> + flush_writer(State), + {noreply, State}; +%% @private handle_call({close, Code, Text}, From, State) -> handle_close(Code, Text, From, State); %% @private From eadbfd4a38186b703f38dd6731fb55e29e0aa9a3 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 28 Apr 2025 15:16:27 +0200 Subject: [PATCH 3/3] Don't log a crash on connection termination (cherry picked from commit 0f36610e9dc2ccb438ec82154ea9f8d63f987391) --- deps/amqp_client/src/amqp_direct_connection.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index 8c912577ba..5fd0b68404 100644 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -72,7 +72,8 @@ handle_message({'DOWN', _MRef, process, _ConnSup, shutdown}, State) -> handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) -> {stop, {remote_node_down, Reason}, State}; handle_message({'EXIT', Pid, Reason}, State) -> - {stop, rabbit_misc:format("stopping because dependent process ~tp died: ~tp", [Pid, Reason]), State}; + ?LOG_INFO("stopping because dependent process ~tp died: ~tp", [Pid, Reason]), + {stop, normal, State}; handle_message(Msg, State) -> {stop, {unexpected_msg, Msg}, State}.