diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl old mode 100755 new mode 100644 index 5d9118b1da..7a38bea0b5 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -43,6 +43,9 @@ -export([register_return_handler/2]). -export([register_flow_handler/2]). +-define(TIMEOUT_FLUSH, 60000). +-define(TIMEOUT_CLOSE_OK, 3000). + %% This diagram shows the interaction between the different component %% processes in an AMQP client scenario. %% @@ -483,6 +486,19 @@ handle_info(shutdown, State) -> handle_info({shutdown, Reason}, State) -> shutdown_with_reason(Reason, State); +%% @private +handle_info({shutdown, FailShutdownReason, InitialReason}, + #channel_state{number = Number} = State) -> + case FailShutdownReason of + {connection_closing, timed_out_flushing_channel} -> + ?LOG_WARN("Channel ~p closing: timed out flushing while connection " + "closing~n", [Number]); + {connection_closing, timed_out_waiting_close_ok} -> + ?LOG_WARN("Channel ~p closing: timed out waiting for " + "channel.close_ok while connection closing~n", [Number]) + end, + {stop, {FailShutdownReason, InitialReason}, State}; + %% Handles the situation when the connection closes without closing the channel %% beforehand. The channel must block all further RPCs, %% flush the RPC queue (optional), and terminate @@ -492,7 +508,17 @@ handle_info({connection_closing, CloseType, Reason}, closing = Closing} = State) -> case {CloseType, Closing, queue:is_empty(RpcQueue)} of {flush, false, false} -> + erlang:send_after(?TIMEOUT_FLUSH, self(), + {shutdown, + {connection_closing, timed_out_flushing_channel}, + Reason}), {noreply, State#channel_state{closing = {connection, Reason}}}; + {flush, just_channel, false} -> + erlang:send_after(?TIMEOUT_CLOSE_OK, self(), + {shutdown, + {connection_closing, timed_out_waiting_close_ok}, + Reason}), + {noreply, State}; _ -> shutdown_with_reason(Reason, State) end; diff --git a/deps/amqp_client/src/amqp_network_driver.erl b/deps/amqp_client/src/amqp_network_driver.erl index 7794ab992f..3ddb1a26b9 100644 --- a/deps/amqp_client/src/amqp_network_driver.erl +++ b/deps/amqp_client/src/amqp_network_driver.erl @@ -134,8 +134,14 @@ receive_writer_send_command_signal(Writer) -> %--------------------------------------------------------------------------- send_frame(Channel, Frame) -> - {framing_pid, FramingPid} = resolve_framing_channel({channel, Channel}), - rabbit_framing_channel:process(FramingPid, Frame). + case resolve_framing_channel({channel, Channel}) of + {framing_pid, FramingPid} -> + rabbit_framing_channel:process(FramingPid, Frame); + undefined -> + ?LOG_INFO("Dropping frame ~p for invalid or closed channel " + "number ~p~n", [Frame, Channel]), + ok + end. recv(#connection_state{main_reader_pid = MainReaderPid}) -> receive diff --git a/deps/amqp_client/test/test_util.erl b/deps/amqp_client/test/test_util.erl old mode 100755 new mode 100644