Use maybe expression instead of messy patterns
This commit is pure refactoring making the code base more maintainable. Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe expression because "Frequent ways in which people work with sequences of failable operations include folds over lists of functions, and abusing list comprehensions. Both patterns have heavy weaknesses that makes them less than ideal." https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns Additionally, this commit is more restrictive in the type spec of rabbit_mqtt_processor state fields. Specifically, many fields were defined to be `undefined | T` where `undefined` was only temporarily until the first CONNECT packet was processed by the processor. It's better to initialise the MQTT processor upon first CONNECT packet because there is no point in having a processor without having received any packet. This allows many type specs in the processor to change from `undefined | T` to just `T`. Additionally, memory is saved by removing the `received_connect_packet` field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
This commit is contained in:
parent
802ef318c6
commit
79c12b60bc
|
@ -77,6 +77,7 @@ start_rabbitmq_server() {
|
|||
-syslog logger '[]' \
|
||||
-syslog syslog_error_logger false \
|
||||
-kernel prevent_overlapping_partitions false \
|
||||
-enable-feature maybe_expr \
|
||||
"$@"
|
||||
}
|
||||
|
||||
|
|
|
@ -71,6 +71,7 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" (
|
|||
-syslog logger [] ^
|
||||
-syslog syslog_error_logger false ^
|
||||
-kernel prevent_overlapping_partitions false ^
|
||||
-enable-feature maybe_expr ^
|
||||
!STAR!
|
||||
|
||||
if ERRORLEVEL 1 (
|
||||
|
|
|
@ -201,6 +201,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
|
|||
-syslog logger [] ^
|
||||
-syslog syslog_error_logger false ^
|
||||
-kernel prevent_overlapping_partitions false ^
|
||||
-enable-feature maybe_expr ^
|
||||
!STARVAR!
|
||||
|
||||
set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:\=\\!
|
||||
|
|
|
@ -1401,18 +1401,10 @@ auth_phase(Response,
|
|||
State = #v1{connection = Connection =
|
||||
#connection{protocol = Protocol,
|
||||
auth_mechanism = {Name, AuthMechanism},
|
||||
auth_state = AuthState},
|
||||
auth_state = AuthState,
|
||||
host = RemoteAddress},
|
||||
sock = Sock}) ->
|
||||
rabbit_log:debug("Raw client connection hostname during authN phase: ~tp", [Connection#connection.host]),
|
||||
RemoteAddress = case Connection#connection.host of
|
||||
%% the hostname was already resolved, e.g. by reverse DNS lookups
|
||||
Bin when is_binary(Bin) -> Bin;
|
||||
%% the hostname is an IP address
|
||||
Tuple when is_tuple(Tuple) ->
|
||||
rabbit_data_coercion:to_binary(inet:ntoa(Connection#connection.host));
|
||||
Other -> rabbit_data_coercion:to_binary(Other)
|
||||
end,
|
||||
rabbit_log:debug("Resolved client hostname during authN phase: ~ts", [RemoteAddress]),
|
||||
rabbit_log:debug("Client address during authN phase: ~tp", [RemoteAddress]),
|
||||
case AuthMechanism:handle_response(Response, AuthState) of
|
||||
{refused, Username, Msg, Args} ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
|
||||
|
|
|
@ -390,7 +390,7 @@ gen_server2_deleted(Pid) ->
|
|||
|
||||
get_gen_server2_stats(Pid) ->
|
||||
case ets:lookup(gen_server2_metrics, Pid) of
|
||||
[{Pid, BufferLength}] ->
|
||||
[{_, BufferLength}] ->
|
||||
BufferLength;
|
||||
[] ->
|
||||
not_found
|
||||
|
@ -409,11 +409,17 @@ update_auth_attempt(RemoteAddress, Username, Protocol, Incr) ->
|
|||
%% It's up to the operator to enable them, and reset it required
|
||||
_ = case application:get_env(rabbit, track_auth_attempt_source) of
|
||||
{ok, true} ->
|
||||
case {RemoteAddress, Username} of
|
||||
Addr = case inet:is_ip_address(RemoteAddress) of
|
||||
true ->
|
||||
list_to_binary(inet:ntoa(RemoteAddress));
|
||||
false ->
|
||||
rabbit_data_coercion:to_binary(RemoteAddress)
|
||||
end,
|
||||
case {Addr, Username} of
|
||||
{<<>>, <<>>} ->
|
||||
ok;
|
||||
_ ->
|
||||
Key = {RemoteAddress, Username, Protocol},
|
||||
Key = {Addr, Username, Protocol},
|
||||
_ = ets:update_counter(auth_attempt_detailed_metrics, Key, Incr, {Key, 0, 0, 0})
|
||||
end;
|
||||
{ok, false} ->
|
||||
|
|
|
@ -79,7 +79,6 @@
|
|||
-export([raw_read_file/1]).
|
||||
-export([find_child/2]).
|
||||
-export([is_regular_file/1]).
|
||||
-export([pipeline/3]).
|
||||
|
||||
%% Horrible macro to use in guards
|
||||
-define(IS_BENIGN_EXIT(R),
|
||||
|
@ -1442,45 +1441,3 @@ find_powershell() ->
|
|||
PwshExe ->
|
||||
PwshExe
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------
|
||||
%% Begin copy from
|
||||
%% https://github.com/emqx/emqx/blob/cffdcb42843d48bf99d8bd13695bc73149c98a23/apps/emqx/src/emqx_misc.erl#L141-L157
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
pipeline([], Input, State) ->
|
||||
{ok, Input, State};
|
||||
pipeline([Fun | More], Input, State) ->
|
||||
case apply_fun(Fun, Input, State) of
|
||||
ok -> pipeline(More, Input, State);
|
||||
{ok, NState} -> pipeline(More, Input, NState);
|
||||
{ok, Output, NState} -> pipeline(More, Output, NState);
|
||||
{error, Reason} -> {error, Reason, State};
|
||||
{error, Reason, NState} -> {error, Reason, NState}
|
||||
end.
|
||||
|
||||
-compile({inline, [apply_fun/3]}).
|
||||
apply_fun(Fun, Input, State) ->
|
||||
case erlang:fun_info(Fun, arity) of
|
||||
{arity, 1} -> Fun(Input);
|
||||
{arity, 2} -> Fun(Input, State)
|
||||
end.
|
||||
|
||||
%% End copy from
|
||||
%% https://github.com/emqx/emqx/blob/cffdcb42843d48bf99d8bd13695bc73149c98a23/apps/emqx/src/emqx_misc.erl#L141-L157
|
||||
%% -------------------------------------------------------------------------
|
||||
|
|
|
@ -276,7 +276,6 @@ is_authorized(ReqData, Context, Username, Password, ErrorMsg, Fun, ReplyWhenFail
|
|||
_ -> []
|
||||
end,
|
||||
{IP, _} = cowboy_req:peer(ReqData),
|
||||
RemoteAddress = list_to_binary(inet:ntoa(IP)),
|
||||
case rabbit_access_control:check_user_login(Username, AuthProps) of
|
||||
{ok, User = #user{username = ResolvedUsername, tags = Tags}} ->
|
||||
case rabbit_access_control:check_user_loopback(ResolvedUsername, IP) of
|
||||
|
@ -285,26 +284,24 @@ is_authorized(ReqData, Context, Username, Password, ErrorMsg, Fun, ReplyWhenFail
|
|||
true ->
|
||||
case Fun(User) of
|
||||
true ->
|
||||
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
|
||||
ResolvedUsername, http),
|
||||
rabbit_core_metrics:auth_attempt_succeeded(IP, ResolvedUsername, http),
|
||||
{true, ReqData,
|
||||
Context#context{user = User,
|
||||
password = Password}};
|
||||
false ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
|
||||
ResolvedUsername, http),
|
||||
rabbit_core_metrics:auth_attempt_failed(IP, ResolvedUsername, http),
|
||||
ErrFun(ResolvedUsername, ErrorMsg)
|
||||
end;
|
||||
false ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, ResolvedUsername, http),
|
||||
rabbit_core_metrics:auth_attempt_failed(IP, ResolvedUsername, http),
|
||||
ErrFun(ResolvedUsername, <<"Not management user">>)
|
||||
end;
|
||||
not_allowed ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, ResolvedUsername, http),
|
||||
rabbit_core_metrics:auth_attempt_failed(IP, ResolvedUsername, http),
|
||||
ErrFun(ResolvedUsername, <<"User can only log in via localhost">>)
|
||||
end;
|
||||
{refused, _Username, Msg, Args} ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, http),
|
||||
rabbit_core_metrics:auth_attempt_failed(IP, Username, http),
|
||||
rabbit_log:warning("HTTP access denied: ~ts",
|
||||
[rabbit_misc:format(Msg, Args)]),
|
||||
case ReplyWhenFailed of
|
||||
|
|
|
@ -45,6 +45,8 @@
|
|||
%% The Client is not authorized to connect.
|
||||
-define(CONNACK_NOT_AUTHORIZED, 5).
|
||||
|
||||
-type connack_return_code() :: ?CONNACK_ACCEPT..?CONNACK_NOT_AUTHORIZED.
|
||||
|
||||
%% qos levels
|
||||
|
||||
-define(QOS_0, 0).
|
||||
|
@ -70,20 +72,20 @@
|
|||
|
||||
-type mqtt_packet() :: #mqtt_packet{}.
|
||||
|
||||
-record(mqtt_packet_connect, {proto_ver,
|
||||
will_retain,
|
||||
will_qos,
|
||||
will_flag,
|
||||
clean_sess,
|
||||
keep_alive,
|
||||
client_id,
|
||||
will_topic,
|
||||
will_msg,
|
||||
username,
|
||||
password}).
|
||||
-record(mqtt_packet_connect, {proto_ver :: 3 | 4,
|
||||
will_retain :: boolean(),
|
||||
will_qos :: 0..2,
|
||||
will_flag :: boolean(),
|
||||
clean_sess :: boolean(),
|
||||
keep_alive :: non_neg_integer(),
|
||||
client_id :: binary(),
|
||||
will_topic :: option(binary()),
|
||||
will_msg :: option(binary()),
|
||||
username :: option(binary()),
|
||||
password :: option(binary())}).
|
||||
|
||||
-record(mqtt_packet_connack, {session_present,
|
||||
return_code}).
|
||||
-record(mqtt_packet_connack, {session_present :: boolean(),
|
||||
return_code :: connack_return_code()}).
|
||||
|
||||
-record(mqtt_packet_publish, {topic_name :: undefined | binary(),
|
||||
packet_id :: packet_id()}).
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -15,7 +15,7 @@
|
|||
|
||||
-export([start_link/3]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
code_change/3, terminate/2, format_status/1]).
|
||||
terminate/2, format_status/1]).
|
||||
|
||||
-export([conserve_resources/3,
|
||||
close_connection/2]).
|
||||
|
@ -35,13 +35,13 @@
|
|||
await_recv :: boolean(),
|
||||
deferred_recv :: option(binary()),
|
||||
parse_state :: rabbit_mqtt_packet:state(),
|
||||
proc_state :: rabbit_mqtt_processor:state(),
|
||||
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
|
||||
rabbit_mqtt_processor:state(),
|
||||
connection_state :: running | blocked,
|
||||
conserve :: boolean(),
|
||||
stats_timer :: option(rabbit_event:state()),
|
||||
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
|
||||
conn_name :: binary(),
|
||||
received_connect_packet :: boolean()
|
||||
conn_name :: binary()
|
||||
}).
|
||||
|
||||
-type(state() :: #state{}).
|
||||
|
@ -83,16 +83,13 @@ init(Ref) ->
|
|||
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
|
||||
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
|
||||
erlang:send_after(LoginTimeout, self(), login_timeout),
|
||||
ProcessorState = rabbit_mqtt_processor:initial_state(RealSocket, ConnName),
|
||||
State0 = #state{socket = RealSocket,
|
||||
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
|
||||
conn_name = ConnName,
|
||||
await_recv = false,
|
||||
connection_state = running,
|
||||
received_connect_packet = false,
|
||||
conserve = false,
|
||||
parse_state = rabbit_mqtt_packet:init_state(),
|
||||
proc_state = ProcessorState},
|
||||
parse_state = rabbit_mqtt_packet:init_state()},
|
||||
State1 = control_throttle(State0),
|
||||
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
|
||||
gen_server:enter_loop(?MODULE, [], State);
|
||||
|
@ -214,9 +211,8 @@ handle_info({keepalive, Req}, State = #state{keepalive = KState0,
|
|||
{stop, Reason, State}
|
||||
end;
|
||||
|
||||
handle_info(login_timeout, State = #state{received_connect_packet = true}) ->
|
||||
{noreply, State, ?HIBERNATE_AFTER};
|
||||
handle_info(login_timeout, State = #state{conn_name = ConnName}) ->
|
||||
handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed,
|
||||
conn_name = ConnName}) ->
|
||||
%% The connection is also closed if the CONNECT packet happens to
|
||||
%% be already in the `deferred_recv' buffer. This can happen while
|
||||
%% the connection is blocked because of a resource alarm. However
|
||||
|
@ -224,6 +220,8 @@ handle_info(login_timeout, State = #state{conn_name = ConnName}) ->
|
|||
%% and we don't want to skip closing the connection in that case.
|
||||
?LOG_ERROR("closing MQTT connection ~tp (login timeout)", [ConnName]),
|
||||
{stop, {shutdown, login_timeout}, State};
|
||||
handle_info(login_timeout, State) ->
|
||||
{noreply, State, ?HIBERNATE_AFTER};
|
||||
|
||||
handle_info(emit_stats, State) ->
|
||||
{noreply, emit_stats(State), ?HIBERNATE_AFTER};
|
||||
|
@ -263,7 +261,12 @@ terminate(Reason, {SendWill, State = #state{conn_name = ConnName,
|
|||
proc_state = PState}}) ->
|
||||
KState = rabbit_mqtt_keepalive:cancel_timer(KState0),
|
||||
maybe_emit_stats(State#state{keepalive = KState}),
|
||||
_ = rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState),
|
||||
case PState of
|
||||
connect_packet_unprocessed ->
|
||||
ok;
|
||||
_ ->
|
||||
rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
|
||||
end,
|
||||
log_terminate(Reason, State).
|
||||
|
||||
log_terminate({network_error, {ssl_upgrade_error, closed}, ConnName}, _State) ->
|
||||
|
@ -287,23 +290,16 @@ log_terminate({network_error,
|
|||
log_tls_alert(Alert, ConnName);
|
||||
log_terminate({network_error, {ssl_upgrade_error, Reason}, ConnName}, _State) ->
|
||||
?LOG_ERROR("MQTT detected TLS upgrade error on ~s: ~p", [ConnName, Reason]);
|
||||
|
||||
log_terminate({network_error, Reason, ConnName}, _State) ->
|
||||
?LOG_ERROR("MQTT detected network error on ~s: ~p", [ConnName, Reason]);
|
||||
|
||||
log_terminate({network_error, Reason}, _State) ->
|
||||
?LOG_ERROR("MQTT detected network error: ~p", [Reason]);
|
||||
|
||||
log_terminate(normal, #state{conn_name = ConnName}) ->
|
||||
?LOG_INFO("closing MQTT connection ~p (~s)", [self(), ConnName]),
|
||||
ok;
|
||||
|
||||
log_terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
log_tls_alert(handshake_failure, ConnName) ->
|
||||
|
@ -314,55 +310,61 @@ log_tls_alert(unknown_ca, ConnName) ->
|
|||
log_tls_alert(Alert, ConnName) ->
|
||||
?LOG_ERROR("MQTT detected TLS upgrade error on ~ts: alert ~ts", [ConnName, Alert]).
|
||||
|
||||
process_received_bytes(<<>>, State = #state{received_connect_packet = false,
|
||||
proc_state = PState,
|
||||
conn_name = ConnName}) ->
|
||||
?LOG_INFO("Accepted MQTT connection ~p (~s, client ID: ~s)",
|
||||
[self(), ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
|
||||
{noreply, ensure_stats_timer(State#state{received_connect_packet = true}), ?HIBERNATE_AFTER};
|
||||
process_received_bytes(<<>>, State) ->
|
||||
{noreply, ensure_stats_timer(State), ?HIBERNATE_AFTER};
|
||||
process_received_bytes(Bytes,
|
||||
State = #state{ parse_state = ParseState,
|
||||
proc_state = ProcState,
|
||||
conn_name = ConnName }) ->
|
||||
process_received_bytes(Bytes, State = #state{socket = Socket,
|
||||
parse_state = ParseState,
|
||||
proc_state = ProcState,
|
||||
conn_name = ConnName}) ->
|
||||
case parse(Bytes, ParseState) of
|
||||
{more, ParseState1} ->
|
||||
{noreply,
|
||||
ensure_stats_timer( State #state{ parse_state = ParseState1 }),
|
||||
ensure_stats_timer(State#state{parse_state = ParseState1}),
|
||||
?HIBERNATE_AFTER};
|
||||
{ok, Packet, Rest} ->
|
||||
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
|
||||
{ok, ProcState1} ->
|
||||
process_received_bytes(
|
||||
Rest,
|
||||
State #state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
%% PUBLISH and more
|
||||
{error, unauthorized = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT connection ~ts is closing due to an authorization failure", [ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
%% CONNECT packets only
|
||||
{error, unauthenticated = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT connection ~ts is closing due to an authentication failure", [ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
%% CONNECT packets only
|
||||
{error, invalid_client_id = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT cannot accept connection ~ts: client uses an invalid ID", [ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
%% CONNECT packets only
|
||||
{error, unsupported_protocol_version = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT cannot accept connection ~ts: incompatible protocol version", [ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{error, unavailable = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT cannot accept connection ~ts due to an internal error or unavailable component",
|
||||
[ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{error, Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT protocol error on connection ~ts: ~tp", [ConnName, Reason]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{stop, disconnect, ProcState1} ->
|
||||
{stop, normal, {_SendWill = false, pstate(State, ProcState1)}}
|
||||
case ProcState of
|
||||
connect_packet_unprocessed ->
|
||||
Send = fun(Data) ->
|
||||
try rabbit_net:port_command(Socket, Data)
|
||||
catch error:Error ->
|
||||
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
|
||||
[Socket, Error])
|
||||
end,
|
||||
ok
|
||||
end,
|
||||
case rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
|
||||
{ok, ProcState1} ->
|
||||
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
|
||||
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
|
||||
process_received_bytes(
|
||||
Rest, State#state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
{error, {socket_ends, Reason} = R} ->
|
||||
?LOG_ERROR("MQTT connection ~ts failed to establish because socket "
|
||||
"addresses could not be determined: ~tp",
|
||||
[ConnName, Reason]),
|
||||
{stop, {shutdown, R}, {_SendWill = false, State}};
|
||||
{error, ConnAckReturnCode} ->
|
||||
?LOG_ERROR("Rejected MQTT connection ~ts with CONNACK return code ~p",
|
||||
[ConnName, ConnAckReturnCode]),
|
||||
{stop, shutdown, {_SendWill = false, State}}
|
||||
end;
|
||||
_ ->
|
||||
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
|
||||
{ok, ProcState1} ->
|
||||
process_received_bytes(
|
||||
Rest,
|
||||
State #state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
{error, unauthorized = Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT connection ~ts is closing due to an authorization failure", [ConnName]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{error, Reason, ProcState1} ->
|
||||
?LOG_ERROR("MQTT protocol error on connection ~ts: ~tp", [ConnName, Reason]),
|
||||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{stop, disconnect, ProcState1} ->
|
||||
{stop, normal, {_SendWill = false, pstate(State, ProcState1)}}
|
||||
end
|
||||
end;
|
||||
{error, {cannot_parse, Reason, Stacktrace}} ->
|
||||
?LOG_ERROR("Unparseable MQTT packet received from connection ~ts", [ConnName]),
|
||||
|
@ -390,12 +392,12 @@ parse(Bytes, ParseState) ->
|
|||
|
||||
network_error(closed,
|
||||
State = #state{conn_name = ConnName,
|
||||
received_connect_packet = Connected}) ->
|
||||
proc_state = ProcState}) ->
|
||||
Fmt = "MQTT connection ~p will terminate because peer closed TCP connection",
|
||||
Args = [ConnName],
|
||||
case Connected of
|
||||
true -> ?LOG_INFO(Fmt, Args);
|
||||
false -> ?LOG_DEBUG(Fmt, Args)
|
||||
case ProcState of
|
||||
connect_packet_unprocessed -> ?LOG_DEBUG(Fmt, Args);
|
||||
_ -> ?LOG_INFO(Fmt, Args)
|
||||
end,
|
||||
{stop, {shutdown, conn_closed}, State};
|
||||
|
||||
|
@ -416,11 +418,13 @@ run_socket(State = #state{ socket = Sock }) ->
|
|||
|
||||
control_throttle(State = #state{connection_state = ConnState,
|
||||
conserve = Conserve,
|
||||
received_connect_packet = Connected,
|
||||
proc_state = PState,
|
||||
keepalive = KState
|
||||
}) ->
|
||||
Throttle = rabbit_mqtt_processor:throttle(Conserve, Connected, PState),
|
||||
Throttle = case PState of
|
||||
connect_packet_unprocessed -> Conserve;
|
||||
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
|
||||
end,
|
||||
case {ConnState, Throttle} of
|
||||
{running, true} ->
|
||||
State#state{connection_state = blocked,
|
||||
|
@ -444,7 +448,7 @@ maybe_emit_stats(State) ->
|
|||
rabbit_event:if_enabled(State, #state.stats_timer,
|
||||
fun() -> emit_stats(State) end).
|
||||
|
||||
emit_stats(State=#state{received_connect_packet = false}) ->
|
||||
emit_stats(State=#state{proc_state = connect_packet_unprocessed}) ->
|
||||
%% Avoid emitting stats on terminate when the connection has not yet been
|
||||
%% established, as this causes orphan entries on the stats database
|
||||
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
|
||||
|
@ -489,7 +493,7 @@ i(name, S) ->
|
|||
i(conn_name, S);
|
||||
i(conn_name, #state{conn_name = Val}) ->
|
||||
Val;
|
||||
i(connection_state, #state{received_connect_packet = false}) ->
|
||||
i(connection_state, #state{proc_state = connect_packet_unprocessed}) ->
|
||||
starting;
|
||||
i(connection_state, #state{connection_state = Val}) ->
|
||||
Val;
|
||||
|
@ -538,17 +542,19 @@ format_state(#state{socket = Socket,
|
|||
conserve = Conserve,
|
||||
stats_timer = StatsTimer,
|
||||
keepalive = Keepalive,
|
||||
conn_name = ConnName,
|
||||
received_connect_packet = ReceivedConnectPacket
|
||||
conn_name = ConnName
|
||||
}) ->
|
||||
#{socket => Socket,
|
||||
proxy_socket => ProxySock,
|
||||
await_recv => AwaitRecv,
|
||||
deferred_recv => DeferredRecv =/= undefined,
|
||||
proc_state => rabbit_mqtt_processor:format_status(PState),
|
||||
proc_state => if PState =:= connect_packet_unprocessed ->
|
||||
PState;
|
||||
true ->
|
||||
rabbit_mqtt_processor:format_status(PState)
|
||||
end,
|
||||
connection_state => ConnectionState,
|
||||
conserve => Conserve,
|
||||
stats_timer => StatsTimer,
|
||||
keepalive => Keepalive,
|
||||
conn_name => ConnName,
|
||||
received_connect_packet => ReceivedConnectPacket}.
|
||||
conn_name => ConnName}.
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
path_for/2,
|
||||
path_for/3,
|
||||
vhost_name_to_table_name/1,
|
||||
register_clientid/2,
|
||||
remove_duplicate_clientid_connections/2,
|
||||
init_sparkplug/0,
|
||||
mqtt_to_amqp/1,
|
||||
amqp_to_mqtt/1,
|
||||
|
@ -174,35 +172,6 @@ vhost_name_to_table_name(VHost) ->
|
|||
<<Num:128>> = erlang:md5(VHost),
|
||||
list_to_atom("rabbit_mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num])).
|
||||
|
||||
-spec register_clientid(rabbit_types:vhost(), binary()) -> ok.
|
||||
register_clientid(Vhost, ClientId)
|
||||
when is_binary(Vhost), is_binary(ClientId) ->
|
||||
PgGroup = {Vhost, ClientId},
|
||||
ok = pg:join(persistent_term:get(?PG_SCOPE), PgGroup, self()),
|
||||
case rabbit_mqtt_ff:track_client_id_in_ra() of
|
||||
true ->
|
||||
%% Ra node takes care of removing duplicate client ID connections.
|
||||
ok;
|
||||
false ->
|
||||
ok = erpc:multicast([node() | nodes()],
|
||||
?MODULE,
|
||||
remove_duplicate_clientid_connections,
|
||||
[PgGroup, self()])
|
||||
end.
|
||||
|
||||
-spec remove_duplicate_clientid_connections({rabbit_types:vhost(), binary()}, pid()) -> ok.
|
||||
remove_duplicate_clientid_connections(PgGroup, PidToKeep) ->
|
||||
try persistent_term:get(?PG_SCOPE) of
|
||||
PgScope ->
|
||||
Pids = pg:get_local_members(PgScope, PgGroup),
|
||||
lists:foreach(fun(Pid) ->
|
||||
gen_server:cast(Pid, duplicate_id)
|
||||
end, Pids -- [PidToKeep])
|
||||
catch _:badarg ->
|
||||
%% MQTT supervision tree on this node not fully started
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec truncate_binary(binary(), non_neg_integer()) -> binary().
|
||||
truncate_binary(Bin, Size)
|
||||
when is_binary(Bin) andalso byte_size(Bin) =< Size ->
|
||||
|
|
|
@ -534,8 +534,8 @@ no_queue_bind_permission(Config) ->
|
|||
["MQTT resource access refused: write access to queue "
|
||||
"'mqtt-subscription-mqtt-userqos0' in vhost 'mqtt-vhost' "
|
||||
"refused for user 'mqtt-user'",
|
||||
"Failed to bind queue 'mqtt-subscription-mqtt-userqos0' "
|
||||
"in vhost 'mqtt-vhost' with topic test/topic: access_refused"
|
||||
"Failed to add binding between exchange 'amq.topic' in vhost 'mqtt-vhost' and queue "
|
||||
"'mqtt-subscription-mqtt-userqos0' in vhost 'mqtt-vhost' for topic test/topic: access_refused"
|
||||
],
|
||||
test_subscribe_permissions_combination(<<".*">>, <<"">>, <<".*">>, Config, ExpectedLogs).
|
||||
|
||||
|
@ -570,8 +570,8 @@ no_queue_unbind_permission(Config) ->
|
|||
ok = assert_connection_closed(C2),
|
||||
ExpectedLogs =
|
||||
["MQTT resource access refused: read access to exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
|
||||
"Failed to unbind queue 'mqtt-subscription-mqtt-userqos1' in vhost 'mqtt-vhost' with topic 'my/topic': access_refused",
|
||||
"MQTT protocol error on connection.*: subscribe_error"
|
||||
"Failed to remove binding between exchange 'amq.topic' in vhost 'mqtt-vhost' and queue "
|
||||
"'mqtt-subscription-mqtt-userqos1' in vhost 'mqtt-vhost' for topic my/topic: access_refused"
|
||||
],
|
||||
wait_log(Config, [?FAIL_IF_CRASH_LOG, {ExpectedLogs, fun () -> stop end}]),
|
||||
|
||||
|
@ -619,7 +619,7 @@ no_queue_delete_permission(Config) ->
|
|||
,{[io_lib:format("MQTT resource access refused: configure access to queue "
|
||||
"'mqtt-subscription-~sqos1' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
|
||||
[ClientId]),
|
||||
"MQTT connection .* is closing due to an authorization failure"],
|
||||
"Rejected MQTT connection .* with CONNACK return code 5"],
|
||||
fun() -> stop end}
|
||||
]),
|
||||
ok.
|
||||
|
@ -653,7 +653,7 @@ no_queue_consume_permission_on_connect(Config) ->
|
|||
,{[io_lib:format("MQTT resource access refused: read access to queue "
|
||||
"'mqtt-subscription-~sqos1' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
|
||||
[ClientId]),
|
||||
"MQTT connection .* is closing due to an authorization failure"],
|
||||
"Rejected MQTT connection .* with CONNACK return code 5"],
|
||||
fun () -> stop end}
|
||||
]),
|
||||
ok.
|
||||
|
@ -713,8 +713,8 @@ no_topic_read_permission(Config) ->
|
|||
[?FAIL_IF_CRASH_LOG,
|
||||
{["MQTT topic access refused: read access to topic 'test.topic' in exchange "
|
||||
"'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
|
||||
"Failed to bind queue 'mqtt-subscription-mqtt-userqos0' "
|
||||
"in vhost 'mqtt-vhost' with topic test/topic: access_refused"
|
||||
"Failed to add binding between exchange 'amq.topic' in vhost 'mqtt-vhost' and queue "
|
||||
"'mqtt-subscription-mqtt-userqos0' in vhost 'mqtt-vhost' for topic test/topic: access_refused"
|
||||
],
|
||||
fun () -> stop end}
|
||||
]),
|
||||
|
@ -757,7 +757,7 @@ loopback_user_connects_from_remote_host(Config) ->
|
|||
wait_log(Config,
|
||||
[?FAIL_IF_CRASH_LOG,
|
||||
{["MQTT login failed: user 'mqtt-user' can only connect via localhost",
|
||||
"MQTT connection .* is closing due to an authorization failure"],
|
||||
"Rejected MQTT connection .* with CONNACK return code 5"],
|
||||
fun () -> stop end}
|
||||
]),
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ setup(CallerPid) ->
|
|||
end.
|
||||
|
||||
|
||||
user_login_authentication(_, AuthProps) ->
|
||||
user_login_authentication(_Username, AuthProps) ->
|
||||
ets:insert(?MODULE, {authentication, AuthProps}),
|
||||
{ok, #auth_user{username = <<"dummy">>,
|
||||
tags = [],
|
||||
|
|
|
@ -250,7 +250,7 @@ event_authentication_failure(Config) ->
|
|||
|
||||
?assertMatch({error, _}, emqtt:connect(C)),
|
||||
|
||||
[E, _ConnectionClosedEvent] = util:get_events(Server),
|
||||
[E] = util:get_events(Server),
|
||||
util:assert_event_type(user_authentication_failure, E),
|
||||
util:assert_event_prop([{name, <<"Trudy">>},
|
||||
{connection_type, network}],
|
||||
|
|
|
@ -1322,14 +1322,13 @@ handle_frame_pre_auth(Transport,
|
|||
AS ->
|
||||
AS
|
||||
end,
|
||||
RemoteAddress = list_to_binary(inet:ntoa(Host)),
|
||||
C1 = Connection0#stream_connection{auth_mechanism =
|
||||
{Mechanism,
|
||||
AuthMechanism}},
|
||||
{C2, CmdBody} =
|
||||
case AuthMechanism:handle_response(SaslBin, AuthState) of
|
||||
{refused, Username, Msg, Args} ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
|
||||
rabbit_core_metrics:auth_attempt_failed(Host,
|
||||
Username,
|
||||
stream),
|
||||
auth_fail(Username, Msg, Args, C1, State),
|
||||
|
@ -1338,7 +1337,7 @@ handle_frame_pre_auth(Transport,
|
|||
{sasl_authenticate,
|
||||
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
|
||||
{protocol_error, Msg, Args} ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
|
||||
rabbit_core_metrics:auth_attempt_failed(Host,
|
||||
<<>>,
|
||||
stream),
|
||||
notify_auth_result(none,
|
||||
|
@ -1352,7 +1351,7 @@ handle_frame_pre_auth(Transport,
|
|||
{C1#stream_connection{connection_step = failure},
|
||||
{sasl_authenticate, ?RESPONSE_SASL_ERROR, <<>>}};
|
||||
{challenge, Challenge, AuthState1} ->
|
||||
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
|
||||
rabbit_core_metrics:auth_attempt_succeeded(Host,
|
||||
<<>>,
|
||||
stream),
|
||||
{C1#stream_connection{authentication_state =
|
||||
|
@ -1367,7 +1366,7 @@ handle_frame_pre_auth(Transport,
|
|||
S)
|
||||
of
|
||||
ok ->
|
||||
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
|
||||
rabbit_core_metrics:auth_attempt_succeeded(Host,
|
||||
Username,
|
||||
stream),
|
||||
notify_auth_result(Username,
|
||||
|
@ -1383,7 +1382,7 @@ handle_frame_pre_auth(Transport,
|
|||
{sasl_authenticate, ?RESPONSE_CODE_OK,
|
||||
<<>>}};
|
||||
not_allowed ->
|
||||
rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
|
||||
rabbit_core_metrics:auth_attempt_failed(Host,
|
||||
Username,
|
||||
stream),
|
||||
rabbit_log_connection:warning("User '~ts' can only connect via localhost",
|
||||
|
|
|
@ -33,13 +33,13 @@
|
|||
-record(state, {
|
||||
socket :: {rabbit_proxy_socket, any(), any()} | rabbit_net:socket(),
|
||||
parse_state = rabbit_mqtt_packet:init_state() :: rabbit_mqtt_packet:state(),
|
||||
proc_state :: undefined | rabbit_mqtt_processor:state(),
|
||||
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
|
||||
rabbit_mqtt_processor:state(),
|
||||
connection_state = running :: running | blocked,
|
||||
conserve = false :: boolean(),
|
||||
stats_timer :: option(rabbit_event:state()),
|
||||
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
|
||||
conn_name :: option(binary()),
|
||||
received_connect_packet = false :: boolean()
|
||||
conn_name :: option(binary())
|
||||
}).
|
||||
|
||||
-type state() :: #state{}.
|
||||
|
@ -97,12 +97,7 @@ websocket_init(State0 = #state{socket = Sock}) ->
|
|||
ConnName = rabbit_data_coercion:to_binary(ConnStr),
|
||||
?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]),
|
||||
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
|
||||
PState = rabbit_mqtt_processor:initial_state(
|
||||
rabbit_net:unwrap_socket(Sock),
|
||||
ConnName,
|
||||
fun send_reply/2),
|
||||
State1 = State0#state{conn_name = ConnName,
|
||||
proc_state = PState},
|
||||
State1 = State0#state{conn_name = ConnName},
|
||||
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
|
||||
process_flag(trap_exit, true),
|
||||
{[], State, hibernate};
|
||||
|
@ -220,8 +215,6 @@ websocket_info(Msg, State) ->
|
|||
?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]),
|
||||
{[], State, hibernate}.
|
||||
|
||||
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
|
||||
ok;
|
||||
terminate(Reason, Request, #state{} = State) ->
|
||||
terminate(Reason, Request, {true, State});
|
||||
terminate(_Reason, _Request,
|
||||
|
@ -232,7 +225,12 @@ terminate(_Reason, _Request,
|
|||
maybe_emit_stats(State),
|
||||
_ = rabbit_mqtt_keepalive:cancel_timer(KState),
|
||||
ok = file_handle_cache:release(),
|
||||
rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState).
|
||||
case PState of
|
||||
connect_packet_unprocessed ->
|
||||
ok;
|
||||
_ ->
|
||||
rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
|
||||
end.
|
||||
|
||||
%% Internal.
|
||||
|
||||
|
@ -249,33 +247,46 @@ handle_data(Data, State0 = #state{}) ->
|
|||
Other
|
||||
end.
|
||||
|
||||
handle_data1(<<>>, State0 = #state{received_connect_packet = false,
|
||||
proc_state = PState,
|
||||
conn_name = ConnName}) ->
|
||||
?LOG_INFO("Accepted web MQTT connection ~p (~s, client ID: ~s)",
|
||||
[self(), ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
|
||||
State = State0#state{received_connect_packet = true},
|
||||
{ok, ensure_stats_timer(control_throttle(State)), hibernate};
|
||||
handle_data1(<<>>, State) ->
|
||||
{ok, ensure_stats_timer(control_throttle(State)), hibernate};
|
||||
handle_data1(Data, State = #state{ parse_state = ParseState,
|
||||
proc_state = ProcState,
|
||||
conn_name = ConnName }) ->
|
||||
handle_data1(Data, State = #state{socket = Socket,
|
||||
parse_state = ParseState,
|
||||
proc_state = ProcState,
|
||||
conn_name = ConnName}) ->
|
||||
case parse(Data, ParseState) of
|
||||
{more, ParseState1} ->
|
||||
{ok, ensure_stats_timer(control_throttle(
|
||||
State #state{ parse_state = ParseState1 })), hibernate};
|
||||
{ok, ensure_stats_timer(
|
||||
control_throttle(
|
||||
State#state{parse_state = ParseState1})), hibernate};
|
||||
{ok, Packet, Rest} ->
|
||||
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
|
||||
{ok, ProcState1} ->
|
||||
handle_data1(
|
||||
Rest,
|
||||
State#state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
{error, Reason, _} ->
|
||||
stop_mqtt_protocol_error(State, Reason, ConnName);
|
||||
{stop, disconnect, ProcState1} ->
|
||||
stop({_SendWill = false, State#state{proc_state = ProcState1}})
|
||||
case ProcState of
|
||||
connect_packet_unprocessed ->
|
||||
case rabbit_mqtt_processor:init(Packet, rabbit_net:unwrap_socket(Socket),
|
||||
ConnName, fun send_reply/1) of
|
||||
{ok, ProcState1} ->
|
||||
?LOG_INFO("Accepted Web MQTT connection ~ts for client ID ~ts",
|
||||
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
|
||||
handle_data1(
|
||||
Rest, State#state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
{error, Reason} ->
|
||||
?LOG_ERROR("Rejected Web MQTT connection ~ts: ~p", [ConnName, Reason]),
|
||||
stop_mqtt_protocol_error({_SendWill = false, State},
|
||||
connect_packet_rejected,
|
||||
ConnName)
|
||||
end;
|
||||
_ ->
|
||||
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
|
||||
{ok, ProcState1} ->
|
||||
handle_data1(
|
||||
Rest,
|
||||
State#state{parse_state = rabbit_mqtt_packet:reset_state(),
|
||||
proc_state = ProcState1});
|
||||
{error, Reason, _} ->
|
||||
stop_mqtt_protocol_error(State, Reason, ConnName);
|
||||
{stop, disconnect, ProcState1} ->
|
||||
stop({_SendWill = false, State#state{proc_state = ProcState1}})
|
||||
end
|
||||
end;
|
||||
{error, Reason} ->
|
||||
stop_mqtt_protocol_error(State, Reason, ConnName)
|
||||
|
@ -313,11 +324,13 @@ handle_credits(State0) ->
|
|||
|
||||
control_throttle(State = #state{connection_state = ConnState,
|
||||
conserve = Conserve,
|
||||
received_connect_packet = Connected,
|
||||
proc_state = PState,
|
||||
keepalive = KState
|
||||
}) ->
|
||||
Throttle = rabbit_mqtt_processor:throttle(Conserve, Connected, PState),
|
||||
Throttle = case PState of
|
||||
connect_packet_unprocessed -> Conserve;
|
||||
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
|
||||
end,
|
||||
case {ConnState, Throttle} of
|
||||
{running, true} ->
|
||||
State#state{connection_state = blocked,
|
||||
|
@ -329,8 +342,10 @@ control_throttle(State = #state{connection_state = ConnState,
|
|||
State
|
||||
end.
|
||||
|
||||
send_reply(Packet, PState) ->
|
||||
self() ! {reply, rabbit_mqtt_processor:serialise(Packet, PState)}.
|
||||
-spec send_reply(iodata()) -> ok.
|
||||
send_reply(Data) ->
|
||||
self() ! {reply, Data},
|
||||
ok.
|
||||
|
||||
ensure_stats_timer(State) ->
|
||||
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
|
||||
|
@ -341,7 +356,7 @@ maybe_emit_stats(State) ->
|
|||
rabbit_event:if_enabled(State, #state.stats_timer,
|
||||
fun() -> emit_stats(State) end).
|
||||
|
||||
emit_stats(State=#state{received_connect_packet = false}) ->
|
||||
emit_stats(State=#state{proc_state = connect_packet_unprocessed}) ->
|
||||
%% Avoid emitting stats on terminate when the connection has not yet been
|
||||
%% established, as this causes orphan entries on the stats database
|
||||
rabbit_event:reset_stats_timer(State, #state.stats_timer);
|
||||
|
@ -399,7 +414,7 @@ i(Cert, #state{socket = Sock})
|
|||
rabbit_ssl:cert_info(Cert, rabbit_net:unwrap_socket(Sock));
|
||||
i(state, S) ->
|
||||
i(connection_state, S);
|
||||
i(connection_state, #state{received_connect_packet = false}) ->
|
||||
i(connection_state, #state{proc_state = connect_packet_unprocessed}) ->
|
||||
starting;
|
||||
i(connection_state, #state{connection_state = Val}) ->
|
||||
Val;
|
||||
|
|
Loading…
Reference in New Issue