Use log macros for AMQP

Using a log macro has the benefit that location data is added as
explained in https://www.erlang.org/doc/apps/kernel/logger.html#t:metadata/0
This commit is contained in:
David Ansari 2024-10-30 14:50:05 +01:00
parent 1778bc22aa
commit dbd9ede67b
2 changed files with 32 additions and 32 deletions

View File

@ -7,6 +7,7 @@
-module(rabbit_amqp_reader). -module(rabbit_amqp_reader).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl"). -include("rabbit_amqp.hrl").
@ -329,15 +330,13 @@ error_frame(Condition, Fmt, Args) ->
handle_exception(State = #v1{connection_state = closed}, Channel, handle_exception(State = #v1{connection_state = closed}, Channel,
#'v1_0.error'{description = {utf8, Desc}}) -> #'v1_0.error'{description = {utf8, Desc}}) ->
rabbit_log_connection:error( ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]), [self(), closed, Channel, Desc]),
State; State;
handle_exception(State = #v1{connection_state = CS}, Channel, handle_exception(State = #v1{connection_state = CS}, Channel,
Error = #'v1_0.error'{description = {utf8, Desc}}) Error = #'v1_0.error'{description = {utf8, Desc}})
when ?IS_RUNNING(State) orelse CS =:= closing -> when ?IS_RUNNING(State) orelse CS =:= closing ->
rabbit_log_connection:error( ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]), [self(), CS, Channel, Desc]),
close(Error, State); close(Error, State);
handle_exception(State, _Channel, Error) -> handle_exception(State, _Channel, Error) ->
@ -438,7 +437,7 @@ handle_connection_frame(
Timer = maybe_start_credential_expiry_timer(User), Timer = maybe_start_credential_expiry_timer(User),
rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10),
notify_auth(user_authentication_success, Username, State0), notify_auth(user_authentication_success, Username, State0),
rabbit_log_connection:info( ?LOG_INFO(
"Connection from AMQP 1.0 container '~ts': user '~ts' authenticated " "Connection from AMQP 1.0 container '~ts': user '~ts' authenticated "
"using SASL mechanism ~s and granted access to vhost '~ts'", "using SASL mechanism ~s and granted access to vhost '~ts'",
[ContainerId, Username, Mechanism, Vhost]), [ContainerId, Username, Mechanism, Vhost]),
@ -519,7 +518,7 @@ handle_connection_frame(
null -> undefined; null -> undefined;
{utf8, Val} -> Val {utf8, Val} -> Val
end, end,
rabbit_log:debug( ?LOG_DEBUG(
"AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p", "AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p",
[HostnameVal, Vhost, IdleTimeout]), [HostnameVal, Vhost, IdleTimeout]),
@ -780,7 +779,7 @@ notify_auth(EventType, Username, State) ->
rabbit_event:notify(EventType, EventProps). rabbit_event:notify(EventType, EventProps).
track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) -> track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) ->
rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b", ?LOG_DEBUG("AMQP 1.0 created session process ~p for channel number ~b",
[SessionPid, ChannelNum]), [SessionPid, ChannelNum]),
_Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]), _Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]),
State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}. State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}.
@ -788,7 +787,7 @@ track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State)
untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) -> untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) ->
case maps:take(ChannelNum, Channels0) of case maps:take(ChannelNum, Channels0) of
{SessionPid, Channels} -> {SessionPid, Channels} ->
rabbit_log:debug("AMQP 1.0 closed session process ~p with channel number ~b", ?LOG_DEBUG("AMQP 1.0 closed session process ~p with channel number ~b",
[SessionPid, ChannelNum]), [SessionPid, ChannelNum]),
State#v1{tracked_channels = Channels}; State#v1{tracked_channels = Channels};
_ -> _ ->
@ -890,7 +889,7 @@ set_credential0(Cred,
credential_timer = OldTimer} = Conn, credential_timer = OldTimer} = Conn,
tracked_channels = Chans, tracked_channels = Chans,
sock = Sock}) -> sock = Sock}) ->
rabbit_log:info("updating credential", []), ?LOG_INFO("updating credential", []),
case rabbit_access_control:update_state(User0, Cred) of case rabbit_access_control:update_state(User0, Cred) of
{ok, User} -> {ok, User} ->
try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of
@ -925,7 +924,7 @@ maybe_start_credential_expiry_timer(User) ->
undefined; undefined;
Ts when is_integer(Ts) -> Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000, Time = (Ts - os:system_time(second)) * 1000,
rabbit_log:debug( ?LOG_DEBUG(
"credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]), [Time, Ts]),
case Time > 0 of case Time > 0 of

View File

@ -11,6 +11,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl"). -include("rabbit_amqp.hrl").
@ -601,7 +602,7 @@ log_error_and_close_session(
writer_pid = WriterPid, writer_pid = WriterPid,
channel_num = Ch}}) -> channel_num = Ch}}) ->
End = #'v1_0.end'{error = Error}, End = #'v1_0.end'{error = Error},
rabbit_log:warning("Closing session for connection ~p: ~tp", ?LOG_WARNING("Closing session for connection ~p: ~tp",
[ReaderPid, Error]), [ReaderPid, Error]),
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End), ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
{stop, {shutdown, Error}, State}. {stop, {shutdown, Error}, State}.
@ -889,7 +890,7 @@ destroy_outgoing_link(_, _, _, Acc) ->
Acc. Acc.
detach(Handle, Link, Error = #'v1_0.error'{}) -> detach(Handle, Link, Error = #'v1_0.error'{}) ->
rabbit_log:warning("Detaching link handle ~b due to error: ~tp", ?LOG_WARNING("Detaching link handle ~b due to error: ~tp",
[Handle, Error]), [Handle, Error]),
publisher_or_consumer_deleted(Link), publisher_or_consumer_deleted(Link),
#'v1_0.detach'{handle = ?UINT(Handle), #'v1_0.detach'{handle = ?UINT(Handle),
@ -981,8 +982,8 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow,
%% "If set to a handle that is not currently associated with %% "If set to a handle that is not currently associated with
%% an attached link, the recipient MUST respond by ending the %% an attached link, the recipient MUST respond by ending the
%% session with an unattached-handle session error." [2.7.4] %% session with an unattached-handle session error." [2.7.4]
rabbit_log:warning( ?LOG_WARNING("Received Flow frame for unknown link handle: ~tp",
"Received Flow frame for unknown link handle: ~tp", [Flow]), [Flow]),
protocol_error( protocol_error(
?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE,
"Unattached link handle: ~b", [HandleInt]) "Unattached link handle: ~b", [HandleInt])
@ -2161,7 +2162,7 @@ handle_deliver(ConsumerTag, AckRequired,
outgoing_links = OutgoingLinks}; outgoing_links = OutgoingLinks};
_ -> _ ->
%% TODO handle missing link -- why does the queue think it's there? %% TODO handle missing link -- why does the queue think it's there?
rabbit_log:warning( ?LOG_WARNING(
"No link handle ~b exists for delivery with consumer tag ~p from queue ~tp", "No link handle ~b exists for delivery with consumer tag ~p from queue ~tp",
[Handle, ConsumerTag, QName]), [Handle, ConsumerTag, QName]),
State State
@ -3008,7 +3009,7 @@ credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. " Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag rabbitmq_4.0.0", "Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)], Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args), ?LOG_ERROR(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
default(undefined, Default) -> Default; default(undefined, Default) -> Default;
@ -3547,7 +3548,7 @@ recheck_authz(#state{incoming_links = IncomingLinks,
permission_cache = Cache0, permission_cache = Cache0,
cfg = #cfg{user = User} cfg = #cfg{user = User}
} = State) -> } = State) ->
rabbit_log:debug("rechecking link authorizations", []), ?LOG_DEBUG("rechecking link authorizations", []),
Cache1 = maps:fold( Cache1 = maps:fold(
fun(_Handle, #incoming_link{exchange = X}, Cache) -> fun(_Handle, #incoming_link{exchange = X}, Cache) ->
case X of case X of