Log errors from `ranch:handshake`
Fixes #11171 An MQTT user encountered TLS handshake timeouts with their IoT device, and the actual error from `ssl:handshake` / `ranch:handshake` was not caught and logged. At this time, `ranch` uses `exit(normal)` in the case of timeouts, but that should change in the future (https://github.com/ninenines/ranch/issues/336)
This commit is contained in:
parent
746c06f77c
commit
620fff22f1
|
@ -560,7 +560,7 @@ failed_to_recv_proxy_header(Ref, Error) ->
|
||||||
end,
|
end,
|
||||||
rabbit_log:debug(Msg, [Error]),
|
rabbit_log:debug(Msg, [Error]),
|
||||||
% The following call will clean up resources then exit
|
% The following call will clean up resources then exit
|
||||||
_ = ranch:handshake(Ref),
|
_ = catch ranch:handshake(Ref),
|
||||||
exit({shutdown, failed_to_recv_proxy_header}).
|
exit({shutdown, failed_to_recv_proxy_header}).
|
||||||
|
|
||||||
handshake(Ref, ProxyProtocolEnabled) ->
|
handshake(Ref, ProxyProtocolEnabled) ->
|
||||||
|
@ -572,14 +572,22 @@ handshake(Ref, ProxyProtocolEnabled) ->
|
||||||
{error, protocol_error, Error} ->
|
{error, protocol_error, Error} ->
|
||||||
failed_to_recv_proxy_header(Ref, Error);
|
failed_to_recv_proxy_header(Ref, Error);
|
||||||
{ok, ProxyInfo} ->
|
{ok, ProxyInfo} ->
|
||||||
{ok, Sock} = ranch:handshake(Ref),
|
case catch ranch:handshake(Ref) of
|
||||||
setup_socket(Sock),
|
{'EXIT', normal} ->
|
||||||
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
|
{error, handshake_failed};
|
||||||
|
{ok, Sock} ->
|
||||||
|
setup_socket(Sock),
|
||||||
|
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
{ok, Sock} = ranch:handshake(Ref),
|
case catch ranch:handshake(Ref) of
|
||||||
setup_socket(Sock),
|
{'EXIT', normal} ->
|
||||||
{ok, Sock}
|
{error, handshake_failed};
|
||||||
|
{ok, Sock} ->
|
||||||
|
setup_socket(Sock),
|
||||||
|
{ok, Sock}
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
setup_socket(Sock) ->
|
setup_socket(Sock) ->
|
||||||
|
|
|
@ -162,6 +162,10 @@ shutdown(Pid, Explanation) ->
|
||||||
no_return().
|
no_return().
|
||||||
init(Parent, HelperSups, Ref) ->
|
init(Parent, HelperSups, Ref) ->
|
||||||
?LG_PROCESS_TYPE(reader),
|
?LG_PROCESS_TYPE(reader),
|
||||||
|
%% Note:
|
||||||
|
%% This function could return an error if the handshake times out.
|
||||||
|
%% It is less likely to happen here as compared to MQTT, so
|
||||||
|
%% crashing with a `badmatch` seems appropriate.
|
||||||
{ok, Sock} = rabbit_networking:handshake(Ref,
|
{ok, Sock} = rabbit_networking:handshake(Ref,
|
||||||
application:get_env(rabbit, proxy_protocol, false)),
|
application:get_env(rabbit, proxy_protocol, false)),
|
||||||
Deb = sys:debug_options([]),
|
Deb = sys:debug_options([]),
|
||||||
|
|
|
@ -71,34 +71,39 @@ close_connection(Pid, Reason) ->
|
||||||
init(Ref) ->
|
init(Ref) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}),
|
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}),
|
||||||
{ok, Sock} = rabbit_networking:handshake(Ref,
|
ProxyProtocolEnabled = application:get_env(?APP_NAME, proxy_protocol, false),
|
||||||
application:get_env(?APP_NAME, proxy_protocol, false)),
|
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
|
||||||
RealSocket = rabbit_net:unwrap_socket(Sock),
|
|
||||||
case rabbit_net:connection_string(Sock, inbound) of
|
|
||||||
{ok, ConnStr} ->
|
|
||||||
ConnName = rabbit_data_coercion:to_binary(ConnStr),
|
|
||||||
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
|
|
||||||
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
|
|
||||||
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
|
|
||||||
erlang:send_after(LoginTimeout, self(), login_timeout),
|
|
||||||
State0 = #state{socket = RealSocket,
|
|
||||||
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
|
|
||||||
conn_name = ConnName,
|
|
||||||
await_recv = false,
|
|
||||||
connection_state = running,
|
|
||||||
conserve = false,
|
|
||||||
parse_state = rabbit_mqtt_packet:init_state()},
|
|
||||||
State1 = control_throttle(State0),
|
|
||||||
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
|
|
||||||
gen_server:enter_loop(?MODULE, [], State);
|
|
||||||
{error, Reason = enotconn} ->
|
|
||||||
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
|
|
||||||
rabbit_net:fast_close(RealSocket),
|
|
||||||
ignore;
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
|
?LOG_ERROR("MQTT could not establish connection: ~s", [Reason]),
|
||||||
rabbit_net:fast_close(RealSocket),
|
{stop, Reason};
|
||||||
{stop, Reason}
|
{ok, Sock} ->
|
||||||
|
RealSocket = rabbit_net:unwrap_socket(Sock),
|
||||||
|
case rabbit_net:connection_string(Sock, inbound) of
|
||||||
|
{ok, ConnStr} ->
|
||||||
|
ConnName = rabbit_data_coercion:to_binary(ConnStr),
|
||||||
|
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
|
||||||
|
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
|
||||||
|
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
|
||||||
|
erlang:send_after(LoginTimeout, self(), login_timeout),
|
||||||
|
State0 = #state{socket = RealSocket,
|
||||||
|
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
|
||||||
|
conn_name = ConnName,
|
||||||
|
await_recv = false,
|
||||||
|
connection_state = running,
|
||||||
|
conserve = false,
|
||||||
|
parse_state = rabbit_mqtt_packet:init_state()},
|
||||||
|
State1 = control_throttle(State0),
|
||||||
|
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
|
||||||
|
gen_server:enter_loop(?MODULE, [], State);
|
||||||
|
{error, Reason = enotconn} ->
|
||||||
|
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
|
||||||
|
rabbit_net:fast_close(RealSocket),
|
||||||
|
ignore;
|
||||||
|
{error, Reason} ->
|
||||||
|
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
|
||||||
|
rabbit_net:fast_close(RealSocket),
|
||||||
|
{stop, Reason}
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call({info, InfoItems}, _From, State) ->
|
handle_call({info, InfoItems}, _From, State) ->
|
||||||
|
|
|
@ -63,50 +63,54 @@ close_connection(Pid, Reason) ->
|
||||||
|
|
||||||
init([SupHelperPid, Ref, Configuration]) ->
|
init([SupHelperPid, Ref, Configuration]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, Sock} = rabbit_networking:handshake(Ref,
|
ProxyProtocolEnabled = application:get_env(rabbitmq_stomp, proxy_protocol, false),
|
||||||
application:get_env(rabbitmq_stomp, proxy_protocol, false)),
|
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
|
||||||
RealSocket = rabbit_net:unwrap_socket(Sock),
|
|
||||||
|
|
||||||
case rabbit_net:connection_string(Sock, inbound) of
|
|
||||||
{ok, ConnStr} ->
|
|
||||||
ConnName = rabbit_data_coercion:to_binary(ConnStr),
|
|
||||||
ProcInitArgs = processor_args(Configuration, Sock),
|
|
||||||
ProcState = rabbit_stomp_processor:initial_state(Configuration,
|
|
||||||
ProcInitArgs),
|
|
||||||
|
|
||||||
rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
|
|
||||||
[self(), ConnName]),
|
|
||||||
|
|
||||||
ParseState = rabbit_stomp_frame:initial_state(),
|
|
||||||
_ = register_resource_alarm(),
|
|
||||||
|
|
||||||
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
|
|
||||||
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
|
|
||||||
erlang:send_after(LoginTimeout, self(), login_timeout),
|
|
||||||
|
|
||||||
gen_server2:enter_loop(?MODULE, [],
|
|
||||||
rabbit_event:init_stats_timer(
|
|
||||||
run_socket(control_throttle(
|
|
||||||
#reader_state{socket = RealSocket,
|
|
||||||
conn_name = ConnName,
|
|
||||||
parse_state = ParseState,
|
|
||||||
processor_state = ProcState,
|
|
||||||
heartbeat_sup = SupHelperPid,
|
|
||||||
heartbeat = {none, none},
|
|
||||||
max_frame_size = MaxFrameSize,
|
|
||||||
current_frame_size = 0,
|
|
||||||
state = running,
|
|
||||||
conserve_resources = false,
|
|
||||||
recv_outstanding = false})), #reader_state.stats_timer),
|
|
||||||
{backoff, 1000, 1000, 10000});
|
|
||||||
{error, enotconn} ->
|
|
||||||
rabbit_net:fast_close(RealSocket),
|
|
||||||
terminate(shutdown, undefined);
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
rabbit_net:fast_close(RealSocket),
|
rabbit_log_connection:error(
|
||||||
terminate({network_error, Reason}, undefined)
|
"STOMP could not establish connection: ~s", [Reason]),
|
||||||
end.
|
{stop, Reason};
|
||||||
|
{ok, Sock} ->
|
||||||
|
RealSocket = rabbit_net:unwrap_socket(Sock),
|
||||||
|
case rabbit_net:connection_string(Sock, inbound) of
|
||||||
|
{ok, ConnStr} ->
|
||||||
|
ConnName = rabbit_data_coercion:to_binary(ConnStr),
|
||||||
|
ProcInitArgs = processor_args(Configuration, Sock),
|
||||||
|
ProcState = rabbit_stomp_processor:initial_state(Configuration,
|
||||||
|
ProcInitArgs),
|
||||||
|
|
||||||
|
rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
|
||||||
|
[self(), ConnName]),
|
||||||
|
|
||||||
|
ParseState = rabbit_stomp_frame:initial_state(),
|
||||||
|
_ = register_resource_alarm(),
|
||||||
|
|
||||||
|
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
|
||||||
|
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
|
||||||
|
erlang:send_after(LoginTimeout, self(), login_timeout),
|
||||||
|
|
||||||
|
gen_server2:enter_loop(?MODULE, [],
|
||||||
|
rabbit_event:init_stats_timer(
|
||||||
|
run_socket(control_throttle(
|
||||||
|
#reader_state{socket = RealSocket,
|
||||||
|
conn_name = ConnName,
|
||||||
|
parse_state = ParseState,
|
||||||
|
processor_state = ProcState,
|
||||||
|
heartbeat_sup = SupHelperPid,
|
||||||
|
heartbeat = {none, none},
|
||||||
|
max_frame_size = MaxFrameSize,
|
||||||
|
current_frame_size = 0,
|
||||||
|
state = running,
|
||||||
|
conserve_resources = false,
|
||||||
|
recv_outstanding = false})), #reader_state.stats_timer),
|
||||||
|
{backoff, 1000, 1000, 10000});
|
||||||
|
{error, enotconn} ->
|
||||||
|
rabbit_net:fast_close(RealSocket),
|
||||||
|
terminate(shutdown, undefined);
|
||||||
|
{error, Reason} ->
|
||||||
|
rabbit_net:fast_close(RealSocket),
|
||||||
|
terminate({network_error, Reason}, undefined)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
handle_call({info, InfoItems}, _From, State) ->
|
handle_call({info, InfoItems}, _From, State) ->
|
||||||
Infos = lists:map(
|
Infos = lists:map(
|
||||||
|
|
|
@ -135,10 +135,13 @@ init([KeepaliveSup,
|
||||||
heartbeat := Heartbeat,
|
heartbeat := Heartbeat,
|
||||||
transport := ConnTransport}]) ->
|
transport := ConnTransport}]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, Sock} =
|
ProxyProtocolEnabled =
|
||||||
rabbit_networking:handshake(Ref,
|
application:get_env(rabbitmq_stream, proxy_protocol, false),
|
||||||
application:get_env(rabbitmq_stream,
|
%% Note:
|
||||||
proxy_protocol, false)),
|
%% This function could return an error if the handshake times out.
|
||||||
|
%% It is less likely to happen here as compared to MQTT, so
|
||||||
|
%% crashing with a `badmatch` seems appropriate.
|
||||||
|
{ok, Sock} = rabbit_networking:handshake(Ref, ProxyProtocolEnabled),
|
||||||
RealSocket = rabbit_net:unwrap_socket(Sock),
|
RealSocket = rabbit_net:unwrap_socket(Sock),
|
||||||
case rabbit_net:connection_string(Sock, inbound) of
|
case rabbit_net:connection_string(Sock, inbound) of
|
||||||
{ok, ConnStr} ->
|
{ok, ConnStr} ->
|
||||||
|
|
Loading…
Reference in New Issue