[skip ci] Code formatting

This commit is contained in:
Michal Kuratczyk 2025-07-16 00:06:29 +02:00
parent e459859a40
commit 1e17455463
No known key found for this signature in database
24 changed files with 109 additions and 93 deletions

View File

@ -249,7 +249,7 @@ hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) ->
hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min,
Rev}, State) ->
?LOG_WARNING("Unsupported protocol version: ~b ~b.~b.~b",
[Protocol, Maj, Min, Rev]),
[Protocol, Maj, Min, Rev]),
{stop, normal, State};
hdr_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
@ -344,7 +344,7 @@ opened(info, {'DOWN', MRef, process, _, _Info},
{stop, normal};
opened(_EvtType, Frame, State) ->
?LOG_WARNING("Unexpected connection frame ~tp when in state ~tp ",
[Frame, State]),
[Frame, State]),
keep_state_and_data.
close_sent(_EvtType, heartbeat, _Data) ->

View File

@ -143,17 +143,17 @@ handle_event(info, {gun_ws, WsPid, StreamRef, WsFrame}, StateName,
handle_socket_input(Bin, StateName, State);
close ->
?LOG_INFO("peer closed AMQP over WebSocket connection in state '~s'",
[StateName]),
[StateName]),
{stop, normal, socket_closed(State)};
{close, ReasonStatusCode, ReasonUtf8} ->
?LOG_INFO("peer closed AMQP over WebSocket connection in state '~s', reason: ~b ~ts",
[StateName, ReasonStatusCode, ReasonUtf8]),
[StateName, ReasonStatusCode, ReasonUtf8]),
{stop, {shutdown, {ReasonStatusCode, ReasonUtf8}}, socket_closed(State)}
end;
handle_event(info, {TcpError, _Sock, Reason}, StateName, State)
when TcpError == tcp_error orelse TcpError == ssl_error ->
?LOG_WARNING("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'",
[StateName, Reason]),
[StateName, Reason]),
{stop, {error, Reason}, socket_closed(State)};
handle_event(info, {TcpClosed, _}, StateName, State)
when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed ->
@ -163,12 +163,12 @@ handle_event(info, {TcpClosed, _}, StateName, State)
handle_event(info, {gun_down, WsPid, _Proto, Reason, _Streams}, StateName,
#state{socket = {ws, WsPid, _StreamRef}} = State) ->
?LOG_WARNING("AMQP over WebSocket process ~p lost connection in state: '~s': ~p",
[WsPid, StateName, Reason]),
[WsPid, StateName, Reason]),
{stop, Reason, socket_closed(State)};
handle_event(info, {'DOWN', _Mref, process, WsPid, Reason}, StateName,
#state{socket = {ws, WsPid, _StreamRef}} = State) ->
?LOG_WARNING("AMQP over WebSocket process ~p terminated in state: '~s': ~p",
[WsPid, StateName, Reason]),
[WsPid, StateName, Reason]),
{stop, Reason, socket_closed(State)};
handle_event(info, heartbeat, _StateName, #state{connection = Connection}) ->

View File

@ -487,7 +487,7 @@ mapped(cast, #'v1_0.disposition'{role = true,
{keep_state, State#state{outgoing_unsettled = Unsettled}};
mapped(cast, Frame, State) ->
?LOG_WARNING("Unhandled session frame ~tp in state ~tp",
[Frame, State]),
[Frame, State]),
{keep_state, State};
mapped({call, From},
{transfer, _Transfer, _Sections},
@ -568,7 +568,7 @@ mapped({call, From}, Msg, State) ->
mapped(_EvtType, Msg, _State) ->
?LOG_WARNING("amqp10_session: unhandled msg in mapped state ~W",
[Msg, 10]),
[Msg, 10]),
keep_state_and_data.
end_sent(_EvtType, #'v1_0.end'{} = End, State) ->

View File

@ -781,8 +781,8 @@ handle_method_from_server1(
State = #state{return_handler = ReturnHandler}) ->
_ = case ReturnHandler of
none -> ?LOG_WARNING("Channel (~tp): received {~tp, ~tp} but there is "
"no return handler registered",
[self(), BasicReturn, AmqpMsg]);
"no return handler registered",
[self(), BasicReturn, AmqpMsg]);
{Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg}
end,
{noreply, State};

View File

@ -193,7 +193,7 @@ handle_cast({hard_error_in_channel, _Pid, Reason}, State) ->
server_initiated_close(Reason, State);
handle_cast({channel_internal_error, Pid, Reason}, State) ->
?LOG_WARNING("Connection (~tp) closing: internal error in channel (~tp): ~tp",
[self(), Pid, Reason]),
[self(), Pid, Reason]),
internal_error(Pid, Reason, State);
handle_cast({server_misbehaved, AmqpError}, State) ->
server_misbehaved_close(AmqpError, State);
@ -324,7 +324,7 @@ server_initiated_close(Close, State) ->
server_misbehaved_close(AmqpError, State) ->
?LOG_WARNING("Connection (~tp) closing: server misbehaved: ~tp",
[self(), AmqpError]),
[self(), AmqpError]),
{0, Close} = rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
set_closing_state(abrupt, #closing{reason = server_misbehaved,
close = Close}, State).

View File

@ -24,7 +24,7 @@
{error, unsuccessful_access_token_response() | any()}.
get_access_token(OAuthProvider, Request) ->
?LOG_DEBUG("get_access_token using OAuthProvider:~p and client_id:~p",
[OAuthProvider, Request#access_token_request.client_id]),
[OAuthProvider, Request#access_token_request.client_id]),
URL = OAuthProvider#oauth_provider.token_endpoint,
Header = [],
Type = ?CONTENT_URLENCODED,
@ -221,7 +221,7 @@ do_update_oauth_provider_endpoints_configuration(OAuthProvider) when
JwksUri -> set_env(jwks_uri, JwksUri)
end,
?LOG_DEBUG("Updated oauth_provider details: ~p ",
[format_oauth_provider(OAuthProvider)]),
[format_oauth_provider(OAuthProvider)]),
OAuthProvider;
do_update_oauth_provider_endpoints_configuration(OAuthProvider) ->
@ -273,7 +273,7 @@ get_oauth_provider(ListOfRequiredAttributes) ->
undefined -> get_root_oauth_provider(ListOfRequiredAttributes);
DefaultOauthProviderId ->
?LOG_DEBUG("Using default_oauth_provider ~p",
[DefaultOauthProviderId]),
[DefaultOauthProviderId]),
get_oauth_provider(DefaultOauthProviderId, ListOfRequiredAttributes)
end.
@ -296,7 +296,7 @@ ensure_oauth_provider_has_attributes(OAuthProvider, ListOfRequiredAttributes) ->
case find_missing_attributes(OAuthProvider, ListOfRequiredAttributes) of
[] ->
?LOG_DEBUG("Resolved oauth_provider ~p",
[format_oauth_provider(OAuthProvider)]),
[format_oauth_provider(OAuthProvider)]),
{ok, OAuthProvider};
_ = Attrs ->
{error, {missing_oauth_provider_attributes, Attrs}}
@ -305,13 +305,13 @@ ensure_oauth_provider_has_attributes(OAuthProvider, ListOfRequiredAttributes) ->
get_root_oauth_provider(ListOfRequiredAttributes) ->
OAuthProvider = lookup_root_oauth_provider(),
?LOG_DEBUG("Using root oauth_provider ~p",
[format_oauth_provider(OAuthProvider)]),
[format_oauth_provider(OAuthProvider)]),
case find_missing_attributes(OAuthProvider, ListOfRequiredAttributes) of
[] ->
{ok, OAuthProvider};
_ = MissingAttributes ->
?LOG_DEBUG("Looking up missing attributes ~p ...",
[MissingAttributes]),
[MissingAttributes]),
case download_oauth_provider(OAuthProvider) of
{ok, OAuthProvider2} ->
ensure_oauth_provider_has_attributes(OAuthProvider2,
@ -335,11 +335,11 @@ get_oauth_provider(OAuth2ProviderId, ListOfRequiredAttributes)
get_oauth_provider(OAuthProviderId, ListOfRequiredAttributes)
when is_binary(OAuthProviderId) ->
?LOG_DEBUG("get_oauth_provider ~p with at least these attributes: ~p",
[OAuthProviderId, ListOfRequiredAttributes]),
[OAuthProviderId, ListOfRequiredAttributes]),
case lookup_oauth_provider_config(OAuthProviderId) of
{error, _} = Error0 ->
?LOG_DEBUG("Failed to find oauth_provider ~p configuration due to ~p",
[OAuthProviderId, Error0]),
[OAuthProviderId, Error0]),
Error0;
Config ->
?LOG_DEBUG("Found oauth_provider configuration ~p", [Config]),

View File

@ -73,7 +73,7 @@ handle_maybe_call_mfa(true, {Module, Function, Args, Default}, State) ->
handle_maybe_call_mfa_error(Module, Default, State);
Err:Reason ->
?LOG_ERROR("Calling ~tp:~tp failed: ~tp:~tp",
[Module, Function, Err, Reason]),
[Module, Function, Err, Reason]),
handle_maybe_call_mfa_error(Module, Default, State)
end.

View File

@ -269,8 +269,8 @@ update_x_death_header(Info, Headers) ->
[{table, rabbit_misc:sort_field_table(Info1)} | Others]);
{<<"x-death">>, InvalidType, Header} ->
?LOG_WARNING("Message has invalid x-death header (type: ~tp)."
" Resetting header ~tp",
[InvalidType, Header]),
" Resetting header ~tp",
[InvalidType, Header]),
%% if x-death is something other than an array (list)
%% then we reset it: this happens when some clients consume
%% a message and re-publish is, converting header values

View File

@ -123,8 +123,8 @@ handle_call(clear, _From, S) ->
handle_call(Request, From, S) ->
?LOG_WARNING("The pg_local server received an unexpected message:\n"
"handle_call(~tp, ~tp, _)\n",
[Request, From]),
"handle_call(~tp, ~tp, _)\n",
[Request, From]),
{noreply, S}.
handle_cast({join, Name, Pid}, S) ->

View File

@ -1155,7 +1155,7 @@ pg_local_scope(Prefix) ->
update_cluster_tags() ->
Tags = application:get_env(rabbit, cluster_tags, []),
?LOG_DEBUG("Seeding cluster tags from application environment key...",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
rabbit_runtime_parameters:set_global(cluster_tags, Tags, <<"internal_user">>).

View File

@ -242,7 +242,7 @@ handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) ->
end,
{ok, lists:foldr(fun(Source, AccState) ->
?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp",
[Source, Node]),
[Source, Node]),
maybe_alert(fun dict_unappend/3, Node, Source, false, AccState)
end, State, AlarmsForDeadNode)};
@ -350,7 +350,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_resource_alarm(Source, Node, State) ->
?LOG_WARNING("~ts resource limit alarm cleared on node ~tp",
[Source, Node]),
[Source, Node]),
{ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.
handle_clear_alarm(file_descriptor_limit, State) ->

View File

@ -295,14 +295,14 @@ sql_to_list(SQL) ->
{ok, String};
Error ->
?LOG_WARNING("JMS message selector ~p is not UTF-8 encoded: ~p",
[JmsSelector, Error]),
[JmsSelector, Error]),
error
end.
check_length(String)
when length(String) > ?MAX_EXPRESSION_LENGTH ->
?LOG_WARNING("JMS message selector length ~b exceeds maximum length ~b",
[length(String), ?MAX_EXPRESSION_LENGTH]),
[length(String), ?MAX_EXPRESSION_LENGTH]),
error;
check_length(_) ->
ok.
@ -313,14 +313,14 @@ tokenize(String, SQL) ->
{ok, Tokens};
{error, {_Line, _Mod, ErrDescriptor}, _Location} ->
?LOG_WARNING("failed to scan JMS message selector '~ts': ~tp",
[JmsSelector, ErrDescriptor]),
[JmsSelector, ErrDescriptor]),
error
end.
check_token_count(Tokens, SQL)
when length(Tokens) > ?MAX_TOKENS ->
?LOG_WARNING("JMS message selector '~ts' with ~b tokens exceeds token limit ~b",
[JmsSelector, length(Tokens), ?MAX_TOKENS]),
[JmsSelector, length(Tokens), ?MAX_TOKENS]),
error;
check_token_count(_, _) ->
ok.
@ -329,7 +329,7 @@ parse(Tokens, SQL) ->
case rabbit_amqp_sql_parser:parse(Tokens) of
{error, Reason} ->
?LOG_WARNING("failed to parse JMS message selector '~ts': ~p",
[JmsSelector, Reason]),
[JmsSelector, Reason]),
error;
Ok ->
Ok

View File

@ -51,7 +51,7 @@ handle_request(Request, Vhost, User, ConnectionPid, PermCaches0) ->
PermCaches0)
catch throw:{?MODULE, StatusCode0, Explanation} ->
?LOG_WARNING("request ~ts ~ts failed: ~ts",
[HttpMethod, HttpRequestTarget, Explanation]),
[HttpMethod, HttpRequestTarget, Explanation]),
{StatusCode0, {utf8, Explanation}, PermCaches0}
end,

View File

@ -425,7 +425,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
%% filtered out with is_replicable(Q). Maybe error instead?
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
?LOG_INFO("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
[Type, VhostSpec, QueueSpec]),
[Type, VhostSpec, QueueSpec]),
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
NumRunning = length(Running),
TypeModule = case Type of
@ -523,7 +523,7 @@ maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
_ ->
[{Length, Destination} | _] = sort_by_number_of_queues(Candidates, ByNode),
?LOG_INFO("Migrating queue ~tp from node ~tp with ~tp queues to node ~tp with ~tp queues",
[Name, N, length(All), Destination, Length]),
[Name, N, length(All), Destination, Length]),
case Module:transfer_leadership(Q, Destination) of
{migrated, NewNode} ->
?LOG_INFO("Queue ~tp migrated to ~tp", [Name, NewNode]),
@ -1982,8 +1982,8 @@ delete_transient_queues_on_node(Node) ->
case length(QueueNames) of
0 -> ok;
N -> ?LOG_INFO("~b transient queues from node '~ts' "
"deleted in ~fs",
[N, Node, Time / 1_000_000])
"deleted in ~fs",
[N, Node, Time / 1_000_000])
end,
notify_queue_binding_deletions(Deletions),
rabbit_core_metrics:queues_deleted(QueueNames),

View File

@ -76,7 +76,7 @@ start_for_vhost(VHost) ->
%% e.g. some integration tests do it
{error, {no_such_vhost, VHost}} ->
?LOG_ERROR("Failed to start a queue process supervisor for vhost ~ts: vhost no longer exists!",
[VHost]),
[VHost]),
{error, {no_such_vhost, VHost}}
end.
@ -89,6 +89,6 @@ stop_for_vhost(VHost) ->
%% see start/1
{error, {no_such_vhost, VHost}} ->
?LOG_ERROR("Failed to stop a queue process supervisor for vhost ~ts: vhost no longer exists!",
[VHost]),
[VHost]),
ok
end.

View File

@ -526,11 +526,11 @@ set_permissions(Username, VirtualHost, ConfigurePerm, WritePerm, ReadPerm, Actin
clear_permissions(Username, VirtualHost, ActingUser) ->
?LOG_DEBUG("Asked to clear permissions for user '~ts' in virtual host '~ts'",
[Username, VirtualHost]),
[Username, VirtualHost]),
try
R = rabbit_db_user:clear_user_permissions(Username, VirtualHost),
?LOG_INFO("Successfully cleared permissions for user '~ts' in virtual host '~ts'",
[Username, VirtualHost]),
[Username, VirtualHost]),
rabbit_event:notify(permission_deleted, [{user, Username},
{vhost, VirtualHost},
{user_who_performed_action, ActingUser}]),
@ -636,11 +636,11 @@ set_topic_permissions(Username, VirtualHost, Exchange, WritePerm, ReadPerm, Acti
clear_topic_permissions(Username, VirtualHost, ActingUser) ->
?LOG_DEBUG("Asked to clear topic permissions for user '~ts' in virtual host '~ts'",
[Username, VirtualHost]),
[Username, VirtualHost]),
try
R = rabbit_db_user:clear_topic_permissions(Username, VirtualHost, '_'),
?LOG_INFO("Successfully cleared topic permissions for user '~ts' in virtual host '~ts'",
[Username, VirtualHost]),
[Username, VirtualHost]),
rabbit_event:notify(topic_permission_deleted, [{user, Username},
{vhost, VirtualHost},
{user_who_performed_action, ActingUser}]),
@ -654,12 +654,12 @@ clear_topic_permissions(Username, VirtualHost, ActingUser) ->
clear_topic_permissions(Username, VirtualHost, Exchange, ActingUser) ->
?LOG_DEBUG("Asked to clear topic permissions on exchange '~ts' for user '~ts' in virtual host '~ts'",
[Exchange, Username, VirtualHost]),
[Exchange, Username, VirtualHost]),
try
R = rabbit_db_user:clear_topic_permissions(
Username, VirtualHost, Exchange),
?LOG_INFO("Successfully cleared topic permissions on exchange '~ts' for user '~ts' in virtual host '~ts'",
[Exchange, Username, VirtualHost]),
[Exchange, Username, VirtualHost]),
rabbit_event:notify(topic_permission_deleted, [{user, Username},
{vhost, VirtualHost},
{user_who_performed_action, ActingUser}]),

View File

@ -414,8 +414,8 @@ refresh_config_local() ->
gen_server2:call(C, refresh_config, infinity)
catch _:Reason ->
?LOG_ERROR("Failed to refresh channel config "
"for channel ~tp. Reason ~tp",
[C, Reason])
"for channel ~tp. Reason ~tp",
[C, Reason])
end
end,
list_local()),
@ -428,8 +428,8 @@ refresh_interceptors() ->
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
catch _:Reason ->
?LOG_ERROR("Failed to refresh channel interceptors "
"for channel ~tp. Reason ~tp",
[C, Reason])
"for channel ~tp. Reason ~tp",
[C, Reason])
end
end,
list_local()),

View File

@ -219,13 +219,13 @@ ensure_tracked_tables_for_this_node() ->
%% Create tables
ensure_tracked_channels_table_for_this_node() ->
?LOG_INFO("Setting up a table for channel tracking on this node: ~tp",
[?TRACKED_CHANNEL_TABLE]),
[?TRACKED_CHANNEL_TABLE]),
ets:new(?TRACKED_CHANNEL_TABLE, [named_table, public, {write_concurrency, true},
{keypos, #tracked_channel.pid}]).
ensure_per_user_tracked_channels_table_for_this_node() ->
?LOG_INFO("Setting up a table for channel tracking on this node: ~tp",
[?TRACKED_CHANNEL_TABLE_PER_USER]),
[?TRACKED_CHANNEL_TABLE_PER_USER]),
ets:new(?TRACKED_CHANNEL_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]).
get_tracked_channels_by_connection_pid(ConnPid) ->

View File

@ -179,9 +179,9 @@ delete(Q0, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q0) ->
case IfEmpty of
true ->
?LOG_ERROR("Queue ~ts in vhost ~ts is down. "
"The queue may be non-empty. "
"Refusing to force-delete.",
[Name, Vhost]),
"The queue may be non-empty. "
"Refusing to force-delete.",
[Name, Vhost]),
{error, not_empty};
false ->
?LOG_WARNING("Queue ~ts in vhost ~ts is down. "
@ -590,7 +590,7 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q),
{init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
_ = [?LOG_ERROR("Queue ~tp failed to initialise: ~tp",
[Pid, Error]) || {Pid, Error} <- Failures],
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
capabilities() ->

View File

@ -447,7 +447,7 @@ recover_index_v1_clean(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []),
State = recover_index_v1_common(State0, V1State, CountersRef),
?LOG_INFO("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2",
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
State.
recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean,
@ -466,7 +466,7 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
convert),
State = recover_index_v1_common(State0, V1State, CountersRef),
?LOG_INFO("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2",
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
State.
%% At this point all messages are persistent because transient messages

View File

@ -319,7 +319,7 @@ read_from_disk(SeqId, {?MODULE, Offset, Size}, State0) ->
ok
catch C:E:S ->
?LOG_ERROR("Per-queue store CRC32 check failed in ~ts seq id ~b offset ~b size ~b",
[segment_file(Segment, State), SeqId, Offset, Size]),
[segment_file(Segment, State), SeqId, Offset, Size]),
erlang:raise(C, E, S)
end
end,
@ -417,7 +417,7 @@ parse_many_from_disk([<<Size:32/unsigned, _:7, UseCRC32:1, CRC32Expected:16/bits
ok
catch C:E:S ->
?LOG_ERROR("Per-queue store CRC32 check failed in ~ts",
[segment_file(Segment, State)]),
[segment_file(Segment, State)]),
erlang:raise(C, E, S)
end
end,

View File

@ -118,8 +118,8 @@ handle_cast({vhost_down, Details}) ->
VHost = pget(name, Details),
Node = pget(node, Details),
?LOG_INFO("Closing all connections in vhost '~ts' on node '~ts'"
" because the vhost is stopping",
[VHost, Node]),
" because the vhost is stopping",
[VHost, Node]),
shutdown_tracked_items(
list_on_node(Node, VHost),
rabbit_misc:format("vhost '~ts' is down", [VHost]));
@ -191,17 +191,17 @@ ensure_tracked_connections_table_for_this_node() ->
_ = ets:new(?TRACKED_CONNECTION_TABLE, [named_table, public, {write_concurrency, true},
{keypos, #tracked_connection.id}]),
?LOG_INFO("Setting up a table for connection tracking on this node: ~tp",
[?TRACKED_CONNECTION_TABLE]).
[?TRACKED_CONNECTION_TABLE]).
ensure_per_vhost_tracked_connections_table_for_this_node() ->
?LOG_INFO("Setting up a table for per-vhost connection counting on this node: ~tp",
[?TRACKED_CONNECTION_TABLE_PER_VHOST]),
[?TRACKED_CONNECTION_TABLE_PER_VHOST]),
ets:new(?TRACKED_CONNECTION_TABLE_PER_VHOST, [named_table, public, {write_concurrency, true}]).
ensure_per_user_tracked_connections_table_for_this_node() ->
_ = ets:new(?TRACKED_CONNECTION_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]),
?LOG_INFO("Setting up a table for per-user connection counting on this node: ~tp",
[?TRACKED_CONNECTION_TABLE_PER_USER]).
[?TRACKED_CONNECTION_TABLE_PER_USER]).
-spec tracked_connection_table_name_for(node()) -> atom().

View File

@ -56,8 +56,8 @@ start_conn_ch(Fun, OUpstream, OUParams,
try
R = Fun(Conn, Ch, DConn, DCh),
?LOG_INFO("Federation ~ts connected to ~ts",
[rabbit_misc:rs(XorQName),
rabbit_federation_upstream:params_to_string(UParams)]),
[rabbit_misc:rs(XorQName),
rabbit_federation_upstream:params_to_string(UParams)]),
Name = pget(name, amqp_connection:info(DConn, [name])),
rabbit_federation_status:report(
OUpstream, OUParams, XorQName, {running, Name}),
@ -130,31 +130,31 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
?LOG_WARNING("Federation ~ts did not connect to ~ts. Server has closed the connection due to an error, code: ~tp, "
"message: ~ts",
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
Code, Message]),
"message: ~ts",
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
Code, Message]),
{stop, {shutdown, restart}, State};
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
?LOG_WARNING("Federation ~ts did not connect to ~ts. Reason: ~tp",
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
E]),
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
E]),
{stop, {shutdown, restart}, State};
connection_error(remote, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
?LOG_INFO("Federation ~ts disconnected from ~ts~n~tp",
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
{stop, {shutdown, restart}, State};
connection_error(command_channel, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
?LOG_INFO("Federation ~ts failed to open a command channel for upstream ~ts~n~tp",
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
{stop, {shutdown, restart}, State};
connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) ->
@ -284,7 +284,7 @@ log_terminate(shutdown, Upstream, UParams, XorQName) ->
%% nicely so that we do not cause unacked messages to be
%% redelivered.
?LOG_INFO("disconnecting from ~ts",
[rabbit_federation_upstream:params_to_string(UParams)]),
[rabbit_federation_upstream:params_to_string(UParams)]),
rabbit_federation_status:remove(Upstream, XorQName);
log_terminate(Reason, Upstream, UParams, XorQName) ->

View File

@ -77,7 +77,8 @@ handle_cast(init, State = #state{config = Config0}) ->
gen_server2:cast(self(), connect_dest),
{noreply, State#state{config = Config}}
catch E:R ->
?LOG_ERROR("Shovel ~ts could not connect to source: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
?LOG_ERROR("Shovel ~ts could not connect to source: ~p ~p",
[human_readable_name(maps:get(name, Config0)), E, R]),
{stop, shutdown, State}
end;
handle_cast(connect_dest, State = #state{config = Config0}) ->
@ -87,7 +88,8 @@ handle_cast(connect_dest, State = #state{config = Config0}) ->
gen_server2:cast(self(), init_shovel),
{noreply, State#state{config = Config}}
catch E:R ->
?LOG_ERROR("Shovel ~ts could not connect to destination: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
?LOG_ERROR("Shovel ~ts could not connect to destination: ~p ~p",
[human_readable_name(maps:get(name, Config0)), E, R]),
{stop, shutdown, State}
end;
handle_cast(init_shovel, State = #state{config = Config}) ->
@ -97,7 +99,8 @@ handle_cast(init_shovel, State = #state{config = Config}) ->
process_flag(trap_exit, true),
Config1 = rabbit_shovel_behaviour:init_dest(Config),
Config2 = rabbit_shovel_behaviour:init_source(Config1),
?LOG_DEBUG("Shovel ~ts has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]),
?LOG_DEBUG("Shovel ~ts has finished setting up its topology",
[human_readable_name(maps:get(name, Config2))]),
State1 = State#state{config = Config2},
ok = report_running(State1),
{noreply, State1}.
@ -108,19 +111,24 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
not_handled ->
case rabbit_shovel_behaviour:handle_dest(Msg, Config) of
not_handled ->
?LOG_WARNING("Shovel ~ts could not handle a destination message ~tp", [human_readable_name(Name), Msg]),
?LOG_WARNING("Shovel ~ts could not handle a destination message ~tp",
[human_readable_name(Name), Msg]),
{noreply, State};
{stop, {outbound_conn_died, heartbeat_timeout}} ->
?LOG_ERROR("Shovel ~ts detected missed heartbeats on destination connection", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts detected missed heartbeats on destination connection",
[human_readable_name(Name)]),
{stop, {shutdown, heartbeat_timeout}, State};
{stop, {outbound_conn_died, Reason}} ->
?LOG_ERROR("Shovel ~ts detected destination connection failure: ~tp", [human_readable_name(Name), Reason]),
?LOG_ERROR("Shovel ~ts detected destination connection failure: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
{stop, {outbound_link_or_channel_closure, Reason}} ->
?LOG_ERROR("Shovel ~ts detected destination shovel failure: ~tp", [human_readable_name(Name), Reason]),
?LOG_ERROR("Shovel ~ts detected destination shovel failure: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
{stop, Reason} ->
?LOG_DEBUG("Shovel ~ts decided to stop due a message from destination: ~tp", [human_readable_name(Name), Reason]),
?LOG_DEBUG("Shovel ~ts decided to stop due a message from destination: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
Config1 ->
State1 = State#state{config = Config1},
@ -128,16 +136,20 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
{noreply, State2}
end;
{stop, {inbound_conn_died, heartbeat_timeout}} ->
?LOG_ERROR("Shovel ~ts detected missed heartbeats on source connection", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts detected missed heartbeats on source connection",
[human_readable_name(Name)]),
{stop, {shutdown, heartbeat_timeout}, State};
{stop, {inbound_conn_died, Reason}} ->
?LOG_ERROR("Shovel ~ts detected source connection failure: ~tp", [human_readable_name(Name), Reason]),
?LOG_ERROR("Shovel ~ts detected source connection failure: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
{stop, {inbound_link_or_channel_closure, Reason}} ->
?LOG_ERROR("Shovel ~ts detected source Shovel (or link, or channel) failure: ~tp", [human_readable_name(Name), Reason]),
?LOG_ERROR("Shovel ~ts detected source Shovel (or link, or channel) failure: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
{stop, Reason} ->
?LOG_ERROR("Shovel ~ts decided to stop due a message from source: ~tp", [human_readable_name(Name), Reason]),
?LOG_ERROR("Shovel ~ts decided to stop due a message from source: ~tp",
[human_readable_name(Name), Reason]),
{stop, Reason, State};
Config1 ->
State1 = State#state{config = Config1},
@ -149,7 +161,7 @@ terminate({shutdown, autodelete}, State = #state{name = Name,
type = dynamic}) ->
{VHost, ShovelName} = Name,
?LOG_INFO("Shovel '~ts' is stopping (it was configured to autodelete and transfer is completed)",
[human_readable_name(Name)]),
[human_readable_name(Name)]),
close_connections(State),
%% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1
put({shovel_worker_autodelete, Name}, true),
@ -161,25 +173,29 @@ terminate(shutdown, State = #state{name = Name}) ->
rabbit_shovel_status:remove(Name),
ok;
terminate(socket_closed_unexpectedly, State = #state{name = Name}) ->
?LOG_ERROR("Shovel ~ts is stopping because of the socket closed unexpectedly", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts is stopping because of the socket closed unexpectedly",
[human_readable_name(Name)]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "socket closed"}),
close_connections(State),
ok;
terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) ->
?LOG_ERROR("Shovel ~ts is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts is stopping because of a heartbeat timeout",
[human_readable_name(Name)]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "heartbeat timeout"}),
close_connections(State),
ok;
terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) ->
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed",
[human_readable_name(Name)]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "destination connection failed"}),
close_connections(State),
ok;
terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) ->
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed",
[human_readable_name(Name)]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "source connection failed"}),
close_connections(State),