[skip ci] Remove rabbit_log_connection and use LOG_ macros

This commit is contained in:
Michal Kuratczyk 2025-07-11 16:25:27 +02:00
parent ebe3f61ef0
commit 8a05433897
No known key found for this signature in database
9 changed files with 113 additions and 217 deletions

View File

@ -78,11 +78,11 @@ handle_cast({connection_created, Details}) ->
error:{no_exists, _} ->
Msg = "Could not register connection ~tp for tracking, "
"its table is not ready yet or the connection terminated prematurely",
rabbit_log_connection:warning(Msg, [ConnId]),
?LOG_WARNING(Msg, [ConnId]),
ok;
error:Err ->
Msg = "Could not register connection ~tp for tracking: ~tp",
rabbit_log_connection:warning(Msg, [ConnId, Err]),
?LOG_WARNING(Msg, [ConnId, Err]),
ok
end;
_OtherNode ->
@ -107,7 +107,7 @@ handle_cast({vhost_deleted, Details}) ->
%% Schedule vhost entry deletion, allowing time for connections to close
_ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_vhost_entry, [VHost]),
rabbit_log_connection:info("Closing all connections in vhost '~ts' because it's being deleted", [VHost]),
?LOG_INFO("Closing all connections in vhost '~ts' because it's being deleted", [VHost]),
shutdown_tracked_items(
list(VHost),
rabbit_misc:format("vhost '~ts' is deleted", [VHost]));
@ -117,7 +117,7 @@ handle_cast({vhost_deleted, Details}) ->
handle_cast({vhost_down, Details}) ->
VHost = pget(name, Details),
Node = pget(node, Details),
rabbit_log_connection:info("Closing all connections in vhost '~ts' on node '~ts'"
?LOG_INFO("Closing all connections in vhost '~ts' on node '~ts'"
" because the vhost is stopping",
[VHost, Node]),
shutdown_tracked_items(
@ -128,7 +128,7 @@ handle_cast({user_deleted, Details}) ->
%% Schedule user entry deletion, allowing time for connections to close
_ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_user_entry, [Username]),
rabbit_log_connection:info("Closing all connections for user '~ts' because the user is being deleted", [Username]),
?LOG_INFO("Closing all connections for user '~ts' because the user is being deleted", [Username]),
shutdown_tracked_items(
list_of_user(Username),
rabbit_misc:format("user '~ts' is deleted", [Username])).

View File

@ -22,6 +22,7 @@
-export([close_connections/3]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/logging.hrl").
-rabbit_boot_step({?MODULE,
[{description, "connection tracking event handler"},
@ -37,6 +38,7 @@
%%
init([]) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->

View File

@ -20,6 +20,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_misc.hrl").
-include_lib("kernel/include/logger.hrl").
%%----------------------------------------------------------------------------
@ -157,7 +158,7 @@ is_vhost_alive(VHost, {Username, _Password}, Pid) ->
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true -> true;
false ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on direct client connection ~tp~n"
"access to vhost '~ts' refused for user '~ts': "
"vhost '~ts' is down",
@ -173,7 +174,7 @@ is_over_vhost_connection_limit(VHost, {Username, _Password}, Pid) ->
try rabbit_vhost_limit:is_over_connection_limit(VHost) of
false -> false;
{true, Limit} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on direct client connection ~tp~n"
"access to vhost '~ts' refused for user '~ts': "
"vhost connection limit (~tp) is reached",
@ -181,7 +182,7 @@ is_over_vhost_connection_limit(VHost, {Username, _Password}, Pid) ->
true
catch
throw:{error, {no_such_vhost, VHost}} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on direct client connection ~tp~n"
"vhost ~ts not found", [Pid, VHost]),
true
@ -211,7 +212,7 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
{error, Reason}
end;
{true, Limit} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on Direct connection ~tp~n"
"access refused for user '~ts': "
"user connection limit (~tp) is reached",
@ -237,7 +238,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector, AmqpParams}]),
{ok, ChannelPid};
{true, Limit} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on direct connection ~tp~n"
"number of channels opened for user '~ts' has reached the "
"maximum allowed limit of (~w)",

View File

@ -1,121 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% @doc Compatibility module for the old Lager-based logging API.
-module(rabbit_log_connection).
-export([debug/1, debug/2, debug/3,
info/1, info/2, info/3,
notice/1, notice/2, notice/3,
warning/1, warning/2, warning/3,
error/1, error/2, error/3,
critical/1, critical/2, critical/3,
alert/1, alert/2, alert/3,
emergency/1, emergency/2, emergency/3,
none/1, none/2, none/3]).
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("kernel/include/logger.hrl").
-compile({no_auto_import, [error/2, error/3]}).
-spec debug(string()) -> 'ok'.
debug(Format) -> debug(Format, []).
-spec debug(string(), [any()]) -> 'ok'.
debug(Format, Args) -> debug(self(), Format, Args).
-spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'.
debug(Pid, Format, Args) ->
?LOG_DEBUG(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec info(string()) -> 'ok'.
info(Format) -> info(Format, []).
-spec info(string(), [any()]) -> 'ok'.
info(Format, Args) -> info(self(), Format, Args).
-spec info(pid() | [tuple()], string(), [any()]) -> 'ok'.
info(Pid, Format, Args) ->
?LOG_INFO(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec notice(string()) -> 'ok'.
notice(Format) -> notice(Format, []).
-spec notice(string(), [any()]) -> 'ok'.
notice(Format, Args) -> notice(self(), Format, Args).
-spec notice(pid() | [tuple()], string(), [any()]) -> 'ok'.
notice(Pid, Format, Args) ->
?LOG_NOTICE(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec warning(string()) -> 'ok'.
warning(Format) -> warning(Format, []).
-spec warning(string(), [any()]) -> 'ok'.
warning(Format, Args) -> warning(self(), Format, Args).
-spec warning(pid() | [tuple()], string(), [any()]) -> 'ok'.
warning(Pid, Format, Args) ->
?LOG_WARNING(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec error(string()) -> 'ok'.
error(Format) -> error(Format, []).
-spec error(string(), [any()]) -> 'ok'.
error(Format, Args) -> error(self(), Format, Args).
-spec error(pid() | [tuple()], string(), [any()]) -> 'ok'.
error(Pid, Format, Args) ->
?LOG_ERROR(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec critical(string()) -> 'ok'.
critical(Format) -> critical(Format, []).
-spec critical(string(), [any()]) -> 'ok'.
critical(Format, Args) -> critical(self(), Format, Args).
-spec critical(pid() | [tuple()], string(), [any()]) -> 'ok'.
critical(Pid, Format, Args) ->
?LOG_CRITICAL(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec alert(string()) -> 'ok'.
alert(Format) -> alert(Format, []).
-spec alert(string(), [any()]) -> 'ok'.
alert(Format, Args) -> alert(self(), Format, Args).
-spec alert(pid() | [tuple()], string(), [any()]) -> 'ok'.
alert(Pid, Format, Args) ->
?LOG_ALERT(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec emergency(string()) -> 'ok'.
emergency(Format) -> emergency(Format, []).
-spec emergency(string(), [any()]) -> 'ok'.
emergency(Format, Args) -> emergency(self(), Format, Args).
-spec emergency(pid() | [tuple()], string(), [any()]) -> 'ok'.
emergency(Pid, Format, Args) ->
?LOG_EMERGENCY(Format, Args, #{pid => Pid,
domain => ?RMQLOG_DOMAIN_CONN}).
-spec none(string()) -> 'ok'.
none(_Format) -> ok.
-spec none(string(), [any()]) -> 'ok'.
none(_Format, _Args) -> ok.
-spec none(pid() | [tuple()], string(), [any()]) -> 'ok'.
none(_Pid, _Format, _Args) -> ok.

View File

@ -44,6 +44,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp_metrics.hrl").
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
@ -158,6 +159,7 @@ shutdown(Pid, Explanation) ->
-spec init(pid(), {pid(), pid()}, ranch:ref()) ->
no_return().
init(Parent, HelperSups, Ref) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
?LG_PROCESS_TYPE(reader),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false),
@ -254,7 +256,7 @@ server_capabilities(_) ->
%%--------------------------------------------------------------------------
socket_error(Reason) when is_atom(Reason) ->
rabbit_log_connection:error("Error on AMQP connection ~tp: ~ts",
?LOG_ERROR("Error on AMQP connection ~tp: ~ts",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
Fmt = "Error on AMQP connection ~tp:~n~tp",
@ -264,9 +266,9 @@ socket_error(Reason) ->
%% This is presumably a TCP healthcheck, so don't log
%% it unless specified otherwise.
{ssl_upgrade_error, closed} ->
rabbit_log_connection:debug(Fmt, Args);
?LOG_DEBUG(Fmt, Args);
_ ->
rabbit_log_connection:error(Fmt, Args)
?LOG_ERROR(Fmt, Args)
end.
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@ -348,13 +350,13 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
connected_at = ConnectedAt0}} ->
ConnName = dynamic_connection_name(Name),
ConnDuration = connection_duration(ConnectedAt0),
rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')",
?LOG_INFO("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')",
[ConnName, VHost, Username, ConnDuration]);
%% just to be more defensive
_ ->
ConnName = dynamic_connection_name(Name),
ConnDuration = connection_duration(ConnectedAt),
rabbit_log_connection:info("closing AMQP connection (~ts, duration: '~ts')",
?LOG_INFO("closing AMQP connection (~ts, duration: '~ts')",
[ConnName, ConnDuration])
end
catch
@ -461,9 +463,9 @@ log_connection_exception(Severity, Name, Duration, Ex) ->
log_connection_exception_with_severity(Severity, Fmt, Args) ->
case Severity of
debug -> rabbit_log_connection:debug(Fmt, Args);
warning -> rabbit_log_connection:warning(Fmt, Args);
error -> rabbit_log_connection:error(Fmt, Args)
debug -> ?LOG_DEBUG(Fmt, Args);
warning -> ?LOG_WARNING(Fmt, Args);
error -> ?LOG_ERROR(Fmt, Args)
end.
run({M, F, A}) ->
@ -519,8 +521,8 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
Fmt = "accepting AMQP connection ~ts",
Args = [ConnName],
case Recv of
closed -> _ = rabbit_log_connection:debug(Fmt, Args);
_ -> _ = rabbit_log_connection:info(Fmt, Args)
closed -> _ = ?LOG_DEBUG(Fmt, Args);
_ -> _ = ?LOG_INFO(Fmt, Args)
end;
_ ->
ok
@ -793,7 +795,7 @@ wait_for_channel_termination(N, TimerRef,
{_, controlled} ->
wait_for_channel_termination(N-1, TimerRef, State1);
{_, uncontrolled} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on AMQP connection ~tp (~ts, vhost: '~ts',"
" user: '~ts', state: ~tp), channel ~tp:"
"error while terminating:~n~tp",
@ -835,7 +837,7 @@ log_hard_error(#v1{connection_state = CS,
log_name = ConnName,
user = User,
vhost = VHost}}, Channel, Reason) ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on AMQP connection ~tp (~ts, vhost: '~ts',"
" user: '~ts', state: ~tp), channel ~tp:~n ~ts",
[self(), ConnName, VHost, User#user.username, CS, Channel, format_hard_error(Reason)]).
@ -855,7 +857,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
connection_state = starting},
Channel, Reason = #amqp_error{name = access_refused,
explanation = ErrMsg}) ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on AMQP connection ~tp (~ts, state: ~tp):~n~ts",
[self(), ConnName, starting, ErrMsg]),
%% respect authentication failure notification capability
@ -874,7 +876,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
connection_state = opening},
Channel, Reason = #amqp_error{name = not_allowed,
explanation = ErrMsg}) ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on AMQP connection ~tp (~ts, user: '~ts', state: ~tp):~n~ts",
[self(), ConnName, User#user.username, opening, ErrMsg]),
send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
@ -891,7 +893,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
connection_state = tuning},
Channel, Reason = #amqp_error{name = not_allowed,
explanation = ErrMsg}) ->
rabbit_log_connection:error(
?LOG_ERROR(
"Error on AMQP connection ~tp (~ts,"
" user: '~ts', state: ~tp):~n~ts",
[self(), ConnName, User#user.username, tuning, ErrMsg]),
@ -1326,7 +1328,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
Infos),
rabbit_event:notify(connection_created, Infos),
maybe_emit_stats(State1),
rabbit_log_connection:info(
?LOG_INFO(
"connection ~ts: user '~ts' authenticated and granted access to vhost '~ts'",
[dynamic_connection_name(ConnName), Username, VHost]),
State1;
@ -1351,7 +1353,7 @@ handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reas
user = User = #user{username = Username},
log_name = ConnName} = Conn,
sock = Sock}) when ?IS_RUNNING(State) ->
rabbit_log_connection:debug(
?LOG_DEBUG(
"connection ~ts of user '~ts': "
"asked to update secret, reason: ~ts",
[dynamic_connection_name(ConnName), Username, Reason]),
@ -1368,16 +1370,16 @@ handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reas
_ = rabbit_channel:update_user_state(Ch, User1)
end, all_channels()),
ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol),
rabbit_log_connection:info(
?LOG_INFO(
"connection ~ts: user '~ts' updated secret, reason: ~ts",
[dynamic_connection_name(ConnName), Username, Reason]),
State#v1{connection = Conn#connection{user = User1}};
{refused, Message} ->
rabbit_log_connection:error("Secret update was refused for user '~ts': ~tp",
?LOG_ERROR("Secret update was refused for user '~ts': ~tp",
[Username, Message]),
rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []);
{error, Message} ->
rabbit_log_connection:error("Secret update for user '~ts' failed: ~tp",
?LOG_ERROR("Secret update for user '~ts' failed: ~tp",
[Username, Message]),
rabbit_misc:protocol_error(not_allowed,
"Secret update failed", [])
@ -1839,7 +1841,7 @@ augment_connection_log_name(#connection{name = Name} = Connection) ->
Connection;
UserSpecifiedName ->
LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
rabbit_log_connection:info("connection ~ts has a client-provided name: ~ts",
?LOG_INFO("connection ~ts has a client-provided name: ~ts",
[Name, UserSpecifiedName]),
?store_proc_name(LogName),
Connection#connection{log_name = LogName}

View File

@ -19,6 +19,9 @@
-include("rabbit_stomp.hrl").
-include("rabbit_stomp_frame.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("kernel/include/logger.hrl").
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state,
@ -62,6 +65,7 @@ close_connection(Pid, Reason) ->
init([SupHelperPid, Ref, Configuration]) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
process_flag(trap_exit, true),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stomp, proxy_protocol, false)),
@ -74,7 +78,7 @@ init([SupHelperPid, Ref, Configuration]) ->
ProcState = rabbit_stomp_processor:initial_state(Configuration,
ProcInitArgs),
rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
?LOG_INFO("accepting STOMP connection ~tp (~ts)",
[self(), ConnName]),
ParseState = rabbit_stomp_frame:initial_state(),
@ -334,7 +338,7 @@ code_change(_OldVsn, State, _Extra) ->
log_reason({network_error, {ssl_upgrade_error, closed}, ConnName}, _State) ->
rabbit_log_connection:error("STOMP detected TLS upgrade error on ~ts: connection closed",
?LOG_ERROR("STOMP detected TLS upgrade error on ~ts: connection closed",
[ConnName]);
@ -355,46 +359,46 @@ log_reason({network_error,
{tls_alert, Alert}}, ConnName}, _State) ->
log_tls_alert(Alert, ConnName);
log_reason({network_error, {ssl_upgrade_error, Reason}, ConnName}, _State) ->
rabbit_log_connection:error("STOMP detected TLS upgrade error on ~ts: ~tp",
?LOG_ERROR("STOMP detected TLS upgrade error on ~ts: ~tp",
[ConnName, Reason]);
log_reason({network_error, Reason, ConnName}, _State) ->
rabbit_log_connection:error("STOMP detected network error on ~ts: ~tp",
?LOG_ERROR("STOMP detected network error on ~ts: ~tp",
[ConnName, Reason]);
log_reason({network_error, Reason}, _State) ->
rabbit_log_connection:error("STOMP detected network error: ~tp", [Reason]);
?LOG_ERROR("STOMP detected network error: ~tp", [Reason]);
log_reason({shutdown, client_heartbeat_timeout},
#reader_state{ processor_state = ProcState }) ->
AdapterName = rabbit_stomp_processor:adapter_name(ProcState),
rabbit_log_connection:warning("STOMP detected missed client heartbeat(s) "
?LOG_WARNING("STOMP detected missed client heartbeat(s) "
"on connection ~ts, closing it", [AdapterName]);
log_reason({shutdown, {server_initiated_close, Reason}},
#reader_state{conn_name = ConnName}) ->
rabbit_log_connection:info("closing STOMP connection ~tp (~ts), reason: ~ts",
?LOG_INFO("closing STOMP connection ~tp (~ts), reason: ~ts",
[self(), ConnName, Reason]);
log_reason(normal, #reader_state{conn_name = ConnName}) ->
rabbit_log_connection:info("closing STOMP connection ~tp (~ts)", [self(), ConnName]);
?LOG_INFO("closing STOMP connection ~tp (~ts)", [self(), ConnName]);
log_reason(shutdown, undefined) ->
rabbit_log_connection:error("closing STOMP connection that never completed connection handshake (negotiation)");
?LOG_ERROR("closing STOMP connection that never completed connection handshake (negotiation)");
log_reason(Reason, #reader_state{processor_state = ProcState}) ->
AdapterName = rabbit_stomp_processor:adapter_name(ProcState),
rabbit_log_connection:warning("STOMP connection ~ts terminated"
?LOG_WARNING("STOMP connection ~ts terminated"
" with reason ~tp, closing it", [AdapterName, Reason]).
log_tls_alert(handshake_failure, ConnName) ->
rabbit_log_connection:error("STOMP detected TLS upgrade error on ~ts: handshake failure",
?LOG_ERROR("STOMP detected TLS upgrade error on ~ts: handshake failure",
[ConnName]);
log_tls_alert(unknown_ca, ConnName) ->
rabbit_log_connection:error("STOMP detected TLS certificate verification error on ~ts: alert 'unknown CA'",
?LOG_ERROR("STOMP detected TLS certificate verification error on ~ts: alert 'unknown CA'",
[ConnName]);
log_tls_alert(Alert, ConnName) ->
rabbit_log_connection:error("STOMP detected TLS upgrade error on ~ts: alert ~ts",
?LOG_ERROR("STOMP detected TLS upgrade error on ~ts: alert ~ts",
[ConnName, Alert]).

View File

@ -25,6 +25,7 @@
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
-record(statem_data,
{transport :: module(),
@ -143,6 +144,7 @@ init([KeepaliveSup,
heartbeat := Heartbeat,
transport := ConnTransport}]) ->
process_flag(trap_exit, true),
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
{ok, Sock} =
rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stream,
@ -220,7 +222,7 @@ init([KeepaliveSup,
config = Config});
{Error, Reason} ->
rabbit_net:fast_close(RealSocket),
rabbit_log_connection:warning("Closing connection because of ~tp ~tp",
?LOG_WARNING("Closing connection because of ~tp ~tp",
[Error, Reason])
end.
@ -411,7 +413,7 @@ tuned({call, From}, {info, _Items}, _StateData) ->
{keep_state_and_data, {reply, From, []}}.
state_timeout(State, Transport, Socket) ->
rabbit_log_connection:warning("Closing connection because of timeout in state "
?LOG_WARNING("Closing connection because of timeout in state "
"'~ts' likely due to lack of client action.",
[State]),
close_immediately(Transport, Socket),
@ -438,16 +440,16 @@ handle_info(Msg,
setopts(Transport, S, [{active, once}]),
#stream_connection{connection_step = NewConnectionStep} =
Connection1,
rabbit_log_connection:debug("Transitioned from ~ts to ~ts",
?LOG_DEBUG("Transitioned from ~ts to ~ts",
[PreviousConnectionStep,
NewConnectionStep]),
Transition(NewConnectionStep, StatemData, Connection1, State1);
{Closed, S} ->
rabbit_log_connection:debug("Stream protocol connection socket ~w closed",
?LOG_DEBUG("Stream protocol connection socket ~w closed",
[S]),
stop;
{Error, S, Reason} ->
rabbit_log_connection:warning("Socket error ~tp [~w]", [Reason, S]),
?LOG_WARNING("Socket error ~tp [~w]", [Reason, S]),
stop;
{resource_alarm, IsThereAlarm} ->
{keep_state,
@ -491,7 +493,7 @@ transition_to_opened(Transport,
config = Configuration}}.
invalid_transition(Transport, Socket, From, To) ->
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~ts "
?LOG_WARNING("Closing socket ~w. Invalid transition from ~ts "
"to ~ts.",
[Socket, From, To]),
close_immediately(Transport, Socket),
@ -512,7 +514,7 @@ socket_op(Sock, Fun) ->
{ok, Res} ->
Res;
{error, Reason} ->
rabbit_log_connection:warning("Error during socket operation ~tp",
?LOG_WARNING("Error during socket operation ~tp",
[Reason]),
rabbit_net:fast_close(RealSocket),
exit(normal)
@ -636,7 +638,7 @@ open(info, {resource_alarm, IsThereAlarm},
#configuration{credits_required_for_unblocking =
CreditsRequiredForUnblocking}} =
StatemData) ->
rabbit_log_connection:debug("Connection ~tp received resource alarm. Alarm "
?LOG_DEBUG("Connection ~tp received resource alarm. Alarm "
"on? ~tp",
[ConnectionName, IsThereAlarm]),
EnoughCreditsToUnblock =
@ -648,18 +650,18 @@ open(info, {resource_alarm, IsThereAlarm},
{false, EnoughCredits} ->
not EnoughCredits
end,
rabbit_log_connection:debug("Connection ~tp had blocked status set to ~tp, "
?LOG_DEBUG("Connection ~tp had blocked status set to ~tp, "
"new blocked status is now ~tp",
[ConnectionName, Blocked, NewBlockedState]),
case {Blocked, NewBlockedState} of
{true, false} ->
setopts(Transport, S, [{active, once}]),
ok = rabbit_heartbeat:resume_monitor(Heartbeater),
rabbit_log_connection:debug("Unblocking connection ~tp",
?LOG_DEBUG("Unblocking connection ~tp",
[ConnectionName]);
{false, true} ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
rabbit_log_connection:debug("Blocking connection ~tp after resource alarm",
?LOG_DEBUG("Blocking connection ~tp after resource alarm",
[ConnectionName]);
_ ->
ok
@ -690,7 +692,7 @@ open(info, {OK, S, Data},
closing ->
stop;
close_sent ->
rabbit_log_connection:debug("Transitioned to close_sent"),
?LOG_DEBUG("Transitioned to close_sent"),
setopts(Transport, S, [{active, once}]),
{next_state, close_sent,
StatemData#statem_data{connection = Connection1,
@ -814,14 +816,14 @@ open(info,
open(info, {Closed, Socket}, #statem_data{connection = Connection})
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
_ = demonitor_all_streams(Connection),
rabbit_log_connection:warning("Stream reader socket ~w closed [~w]",
?LOG_WARNING("Stream reader socket ~w closed [~w]",
[Socket, self()]),
stop;
open(info, {Error, Socket, Reason},
#statem_data{connection = Connection})
when Error =:= tcp_error; Error =:= ssl_error ->
_ = demonitor_all_streams(Connection),
rabbit_log_connection:error("Stream reader socket error ~tp [~w] [~w]",
?LOG_ERROR("Stream reader socket error ~tp [~w] [~w]",
[Reason, Socket, self()]),
stop;
open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
@ -864,14 +866,14 @@ open(info, heartbeat_send,
ok ->
keep_state_and_data;
Unexpected ->
rabbit_log_connection:info("Heartbeat send error ~tp, closing connection",
?LOG_INFO("Heartbeat send error ~tp, closing connection",
[Unexpected]),
_C1 = demonitor_all_streams(Connection),
stop
end;
open(info, heartbeat_timeout,
#statem_data{connection = #stream_connection{} = Connection}) ->
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
?LOG_DEBUG("Heartbeat timeout, closing connection"),
_C1 = demonitor_all_streams(Connection),
stop;
open(info, {infos, From},
@ -906,7 +908,7 @@ open(info, check_outstanding_requests,
end, false, Requests),
case HasTimedOut of
true ->
rabbit_log_connection:info("Forcing stream connection ~tp closing: request to client timed out",
?LOG_INFO("Forcing stream connection ~tp closing: request to client timed out",
[self()]),
_ = demonitor_all_streams(Connection0),
{stop, {request_timeout, <<"Request timeout">>}};
@ -918,19 +920,19 @@ open(info, check_outstanding_requests,
end;
open(info, token_expired, #statem_data{connection = Connection}) ->
_ = demonitor_all_streams(Connection),
rabbit_log_connection:info("Forcing stream connection ~tp closing because token expired",
?LOG_INFO("Forcing stream connection ~tp closing because token expired",
[self()]),
{stop, {shutdown, <<"Token expired">>}};
open(info, {shutdown, Explanation} = Reason,
#statem_data{connection = Connection}) ->
%% rabbitmq_management or rabbitmq_stream_management plugin
%% requests to close connection.
rabbit_log_connection:info("Forcing stream connection ~tp closing: ~tp",
?LOG_INFO("Forcing stream connection ~tp closing: ~tp",
[self(), Explanation]),
_ = demonitor_all_streams(Connection),
{stop, Reason};
open(info, Unknown, _StatemData) ->
rabbit_log_connection:warning("Received unknown message ~tp in state ~ts",
?LOG_WARNING("Received unknown message ~tp in state ~ts",
[Unknown, ?FUNCTION_NAME]),
%% FIXME send close
keep_state_and_data;
@ -1104,12 +1106,12 @@ open(cast,
SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
?LOG_INFO("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~tp",
?LOG_INFO("Error while sending chunks: ~tp",
[Reason]),
%% likely a connection problem
Consumer;
@ -1149,7 +1151,7 @@ close_sent(enter, _OldState,
StateTimeout}}) ->
{keep_state_and_data, {state_timeout, StateTimeout, close}};
close_sent(state_timeout, close, #statem_data{}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state "
?LOG_WARNING("Closing connection because of timeout in state "
"'~ts' likely due to lack of client action.",
[?FUNCTION_NAME]),
stop;
@ -1162,7 +1164,7 @@ close_sent(info, {tcp, S, Data},
{Connection1, State1} =
handle_inbound_data_post_close(Transport, Connection, State, Data),
#stream_connection{connection_step = Step} = Connection1,
rabbit_log_connection:debug("Stream reader has transitioned from ~ts to ~ts",
?LOG_DEBUG("Stream reader has transitioned from ~ts to ~ts",
[?FUNCTION_NAME, Step]),
case Step of
closing_done ->
@ -1174,11 +1176,11 @@ close_sent(info, {tcp, S, Data},
connection_state = State1}}
end;
close_sent(info, {tcp_closed, S}, _StatemData) ->
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]",
?LOG_DEBUG("Stream protocol connection socket ~w closed [~w]",
[S, self()]),
stop;
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~tp "
?LOG_ERROR("Stream protocol connection socket error: ~tp "
"[~w] [~w]",
[Reason, S, self()]),
stop;
@ -1192,7 +1194,7 @@ close_sent(info, {resource_alarm, IsThereAlarm},
Connection#stream_connection{resource_alarm =
IsThereAlarm}}};
close_sent(info, Msg, _StatemData) ->
rabbit_log_connection:warning("Ignored unknown message ~tp in state ~ts",
?LOG_WARNING("Ignored unknown message ~tp in state ~ts",
[Msg, ?FUNCTION_NAME]),
keep_state_and_data;
close_sent({call, From}, {info, _Items}, _StateData) ->
@ -1340,7 +1342,7 @@ handle_frame_pre_auth(Transport,
Username,
stream),
auth_fail(Username, Msg, Args, C1, State),
rabbit_log_connection:warning(Msg, Args),
?LOG_WARNING(Msg, Args),
silent_close_delay(),
{C1#stream_connection{connection_step = failure},
{sasl_authenticate,
@ -1356,7 +1358,7 @@ handle_frame_pre_auth(Transport,
Args)}],
C1,
State),
rabbit_log_connection:warning(Msg, Args),
?LOG_WARNING(Msg, Args),
{C1#stream_connection{connection_step = failure},
{sasl_authenticate, ?RESPONSE_SASL_ERROR, <<>>}};
{challenge, Challenge, AuthState1} ->
@ -1387,7 +1389,7 @@ handle_frame_pre_auth(Transport,
rabbit_core_metrics:auth_attempt_failed(Host,
Username,
stream),
rabbit_log_connection:warning("User '~ts' can only connect via localhost",
?LOG_WARNING("User '~ts' can only connect via localhost",
[Username]),
{C1#stream_connection{connection_step =
failure},
@ -1424,7 +1426,7 @@ handle_frame_pre_auth(_Transport,
Connection,
#stream_connection_state{blocked = Blocked} = State,
{tune, FrameMax, Heartbeat}) ->
rabbit_log_connection:debug("Tuning response ~tp ~tp ",
?LOG_DEBUG("Tuning response ~tp ~tp ",
[FrameMax, Heartbeat]),
Parent = self(),
%% sending a message to the main process so the heartbeat frame is sent from this main process
@ -1521,7 +1523,7 @@ handle_frame_pre_auth(_Transport, Connection, State, heartbeat) ->
?LOG_DEBUG("Received heartbeat frame pre auth"),
{Connection, State};
handle_frame_pre_auth(_Transport, Connection, State, Command) ->
rabbit_log_connection:warning("unknown command ~w, closing connection.",
?LOG_WARNING("unknown command ~w, closing connection.",
[Command]),
{Connection#stream_connection{connection_step = failure}, State}.
@ -1565,7 +1567,7 @@ handle_frame_post_auth(Transport,
PublisherId,
_WriterRef,
Stream}}) ->
rabbit_log_connection:info("Cannot create publisher ~tp on stream ~tp, connection "
?LOG_INFO("Cannot create publisher ~tp on stream ~tp, connection "
"is blocked because of resource alarm",
[PublisherId, Stream]),
response(Transport,
@ -1598,7 +1600,7 @@ handle_frame_post_auth(Transport,
NewUsername,
stream),
auth_fail(NewUsername, Msg, Args, C1, S1),
rabbit_log_connection:warning(Msg, Args),
?LOG_WARNING(Msg, Args),
{C1#stream_connection{connection_step = failure},
{sasl_authenticate,
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
@ -1613,7 +1615,7 @@ handle_frame_post_auth(Transport,
Args)}],
C1,
S1),
rabbit_log_connection:warning(Msg, Args),
?LOG_WARNING(Msg, Args),
{C1#stream_connection{connection_step = failure},
{sasl_authenticate, ?RESPONSE_SASL_ERROR, <<>>}};
{challenge, Challenge, AuthState1} ->
@ -1642,7 +1644,7 @@ handle_frame_post_auth(Transport,
rabbit_core_metrics:auth_attempt_failed(Host,
Username,
stream),
rabbit_log_connection:warning("Not allowed to change username '~ts'. Only password",
?LOG_WARNING("Not allowed to change username '~ts'. Only password",
[Username]),
{C1#stream_connection{connection_step =
failure},
@ -1663,7 +1665,7 @@ handle_frame_post_auth(Transport,
{C2, S1}
end;
{OtherMechanism, _} ->
rabbit_log_connection:warning("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'",
?LOG_WARNING("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'",
[Username, NewMechanism, OtherMechanism]),
CmdBody =
{sasl_authenticate, ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, <<>>},
@ -2056,7 +2058,7 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
?LOG_INFO("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
@ -2516,12 +2518,12 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
?LOG_INFO("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~tp",
?LOG_INFO("Error while sending chunks: ~tp",
[Reason]),
%% likely a connection problem
Consumer;
@ -2850,7 +2852,7 @@ maybe_dispatch_on_subscription(Transport,
SendFileOct)
of
{error, closed} ->
rabbit_log_connection:info("Stream protocol connection has been closed by "
?LOG_INFO("Stream protocol connection has been closed by "
"peer",
[]),
throw({stop, normal});
@ -3228,13 +3230,13 @@ handle_frame_post_close(_Transport,
Connection,
State,
{response, _CorrelationId, {close, _Code}}) ->
rabbit_log_connection:info("Received close confirmation from client"),
?LOG_INFO("Received close confirmation from client"),
{Connection#stream_connection{connection_step = closing_done}, State};
handle_frame_post_close(_Transport, Connection, State, heartbeat) ->
rabbit_log_connection:debug("Received heartbeat command post close"),
?LOG_DEBUG("Received heartbeat command post close"),
{Connection, State};
handle_frame_post_close(_Transport, Connection, State, Command) ->
rabbit_log_connection:warning("ignored command on close ~tp .",
?LOG_WARNING("ignored command on close ~tp .",
[Command]),
{Connection, State}.

View File

@ -13,6 +13,7 @@
-include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl").
-include_lib("rabbitmq_stomp/include/rabbit_stomp_frame.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit_common/include/logging.hrl").
%% Websocket.
-export([
@ -68,6 +69,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState})
%% Websocket.
init(Req0, Opts) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
{PeerAddr, _PeerPort} = maps:get(peer, Req0),
{_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts),
SockInfo = maps:get(proxy_header, Req0, undefined),
@ -105,7 +107,7 @@ websocket_init(State) ->
-spec close_connection(pid(), string()) -> 'ok'.
close_connection(Pid, Reason) ->
rabbit_log_connection:info("Web STOMP: will terminate connection process ~tp, reason: ~ts",
?LOG_INFO("Web STOMP: will terminate connection process ~tp, reason: ~ts",
[Pid, Reason]),
sys:terminate(Pid, Reason),
ok.
@ -242,7 +244,7 @@ websocket_info(emit_stats, State) ->
{ok, emit_stats(State)};
websocket_info(Msg, State) ->
rabbit_log_connection:info("Web STOMP: unexpected message ~tp",
?LOG_INFO("Web STOMP: unexpected message ~tp",
[Msg]),
{ok, State}.
@ -274,7 +276,7 @@ handle_data(Data, State0) ->
{[{active, false}], State1};
{error, Error0} ->
Error1 = rabbit_misc:format("~tp", [Error0]),
rabbit_log_connection:error("STOMP detected framing error '~ts'", [Error1]),
?LOG_ERROR("STOMP detected framing error '~ts'", [Error1]),
stop(State0, 1007, Error1);
Other ->
Other

View File

@ -1,4 +1,4 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
@ -14,6 +14,9 @@
close_all_client_connections/1
]).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
%% for testing purposes
-export([get_binding_address/1, get_tcp_port/1, get_tcp_conf/2]).
@ -28,6 +31,7 @@
-spec init() -> ok.
init() ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN}),
WsFrame = get_env(ws_frame, text),
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
CowboyOpts = CowboyOpts0#{proxy_header => get_env(proxy_protocol, false),
@ -111,14 +115,14 @@ start_tcp_listener(TCPConf0, CowboyOpts0, Routes) ->
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
{error, ErrTCP} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Failed to start a WebSocket (HTTP) listener. Error: ~tp,"
" listener settings: ~tp",
[ErrTCP, TCPConf]),
throw(ErrTCP)
end,
listener_started(?TCP_PROTOCOL, TCPConf),
rabbit_log_connection:info(
?LOG_INFO(
"rabbit_web_stomp: listening for HTTP connections on ~ts:~w",
[get_binding_address(TCPConf), Port]).
@ -150,14 +154,14 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) ->
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
{error, ErrTLS} ->
rabbit_log_connection:error(
?LOG_ERROR(
"Failed to start a TLS WebSocket (HTTPS) listener. Error: ~tp,"
" listener settings: ~tp",
[ErrTLS, TLSConf]),
throw(ErrTLS)
end,
listener_started(?TLS_PROTOCOL, TLSConf),
rabbit_log_connection:info(
?LOG_INFO(
"rabbit_web_stomp: listening for HTTPS connections on ~ts:~w",
[get_binding_address(TLSConf), TLSPort]).