[skip ci] Remove rabbit_log_federation and use LOG_ macros
This commit is contained in:
parent
3ee8df9310
commit
ebe3f61ef0
|
@ -9,6 +9,7 @@
|
|||
|
||||
-include_lib("rabbitmq_federation_common/include/rabbit_federation.hrl").
|
||||
-include("rabbit_exchange_federation.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
|
||||
-behaviour(application).
|
||||
-export([start/2, stop/1]).
|
||||
|
@ -43,6 +44,7 @@ stop(_State) ->
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION}),
|
||||
Flags = #{
|
||||
strategy => one_for_one,
|
||||
intensity => 3,
|
||||
|
|
|
@ -65,6 +65,8 @@ start_link(Args) ->
|
|||
gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
|
||||
|
||||
init({Upstream, XName}) ->
|
||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION,
|
||||
exchange => XName}),
|
||||
%% If we are starting up due to a policy change then it's possible
|
||||
%% for the exchange to have been deleted before we got here, in which
|
||||
%% case it's possible that delete callback would also have been called
|
||||
|
@ -80,7 +82,7 @@ init({Upstream, XName}) ->
|
|||
gen_server2:cast(self(), maybe_go),
|
||||
{ok, {not_started, {Upstream, UParams, XName}}};
|
||||
{error, not_found} ->
|
||||
rabbit_federation_link_util:log_warning(XName, "not found, stopping link", []),
|
||||
?LOG_WARNING("not found, stopping link", []),
|
||||
{stop, gone}
|
||||
end.
|
||||
|
||||
|
@ -105,14 +107,12 @@ handle_cast({enqueue, _, _}, State = {not_started, _}) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast({enqueue, Serial, Cmd},
|
||||
State = #state{waiting_cmds = Waiting,
|
||||
downstream_exchange = XName}) ->
|
||||
State = #state{waiting_cmds = Waiting}) ->
|
||||
Waiting1 = gb_trees:insert(Serial, Cmd, Waiting),
|
||||
try
|
||||
{noreply, play_back_commands(State#state{waiting_cmds = Waiting1})}
|
||||
catch exit:{{shutdown, {server_initiated_close, 404, Text}}, _} ->
|
||||
rabbit_federation_link_util:log_warning(
|
||||
XName, "detected upstream changes, restarting link: ~tp", [Text]),
|
||||
?LOG_WARNING("detected upstream changes, restarting link: ~tp", [Text]),
|
||||
{stop, {shutdown, restart}, State}
|
||||
end;
|
||||
|
||||
|
@ -177,7 +177,7 @@ handle_info(check_internal_exchange, State = #state{internal_exchange = IntXName
|
|||
internal_exchange_interval = Interval}) ->
|
||||
case check_internal_exchange(IntXNameBin, State) of
|
||||
upstream_not_found ->
|
||||
rabbit_log_federation:warning("Federation link could not find upstream exchange '~ts' and will restart",
|
||||
?LOG_WARNING("Federation link could not find upstream exchange '~ts' and will restart",
|
||||
[IntXNameBin]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
_ ->
|
||||
|
@ -470,25 +470,25 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
|
|||
unacked = Unacked,
|
||||
internal_exchange_interval = Interval}),
|
||||
Bindings),
|
||||
rabbit_log_federation:info("Federation link for ~ts (upstream: ~ts) will perform internal exchange checks "
|
||||
?LOG_INFO("Federation link for ~ts (upstream: ~ts) will perform internal exchange checks "
|
||||
"every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]),
|
||||
TRef = erlang:send_after(Interval, self(), check_internal_exchange),
|
||||
{noreply, State#state{internal_exchange_timer = TRef}}
|
||||
end, Upstream, UParams, DownXName, S0).
|
||||
|
||||
log_link_startup_attempt(#upstream{name = Name, channel_use_mode = ChMode}, DownXName) ->
|
||||
rabbit_log_federation:debug("Will try to start a federation link for ~ts, upstream: '~ts', channel use mode: ~ts",
|
||||
?LOG_DEBUG("Will try to start a federation link for ~ts, upstream: '~ts', channel use mode: ~ts",
|
||||
[rabbit_misc:rs(DownXName), Name, ChMode]).
|
||||
|
||||
%% If channel use mode is 'single', reuse the message transfer channel.
|
||||
%% Otherwise open a separate one.
|
||||
reuse_command_channel(MainCh, #upstream{name = UName}, DownXName) ->
|
||||
rabbit_log_federation:debug("Will use a single channel for both schema operations and message transfer on links to upstream '~ts' for downstream federated ~ts",
|
||||
?LOG_DEBUG("Will use a single channel for both schema operations and message transfer on links to upstream '~ts' for downstream federated ~ts",
|
||||
[UName, rabbit_misc:rs(DownXName)]),
|
||||
{ok, MainCh}.
|
||||
|
||||
open_command_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) ->
|
||||
rabbit_log_federation:debug("Will open a command channel to upstream '~ts' for downstream federated ~ts",
|
||||
?LOG_DEBUG("Will open a command channel to upstream '~ts' for downstream federated ~ts",
|
||||
[UName, rabbit_misc:rs(DownXName)]),
|
||||
case amqp_connection:open_channel(Conn) of
|
||||
{ok, CCh} ->
|
||||
|
@ -583,12 +583,12 @@ ensure_internal_exchange(IntXNameBin,
|
|||
connection = Conn,
|
||||
channel = Ch,
|
||||
downstream_exchange = #resource{virtual_host = DVhost}}) ->
|
||||
rabbit_log_federation:debug("Exchange federation will set up exchange '~ts' in upstream '~ts'",
|
||||
?LOG_DEBUG("Exchange federation will set up exchange '~ts' in upstream '~ts'",
|
||||
[IntXNameBin, UName]),
|
||||
#upstream_params{params = Params} = rabbit_federation_util:deobfuscate_upstream_params(UParams),
|
||||
rabbit_log_federation:debug("Will delete upstream exchange '~ts'", [IntXNameBin]),
|
||||
?LOG_DEBUG("Will delete upstream exchange '~ts'", [IntXNameBin]),
|
||||
delete_upstream_exchange(Conn, IntXNameBin),
|
||||
rabbit_log_federation:debug("Will declare an internal upstream exchange '~ts'", [IntXNameBin]),
|
||||
?LOG_DEBUG("Will declare an internal upstream exchange '~ts'", [IntXNameBin]),
|
||||
Base = #'exchange.declare'{exchange = IntXNameBin,
|
||||
durable = true,
|
||||
internal = true,
|
||||
|
@ -613,7 +613,7 @@ check_internal_exchange(IntXNameBin,
|
|||
downstream_exchange = XName = #resource{virtual_host = DVhost}}) ->
|
||||
#upstream_params{params = Params} =
|
||||
rabbit_federation_util:deobfuscate_upstream_params(UParams),
|
||||
rabbit_log_federation:debug("Exchange federation will check on exchange '~ts' in upstream '~ts'",
|
||||
?LOG_DEBUG("Exchange federation will check on exchange '~ts' in upstream '~ts'",
|
||||
[IntXNameBin, UName]),
|
||||
Base = #'exchange.declare'{exchange = IntXNameBin,
|
||||
passive = true,
|
||||
|
@ -629,13 +629,11 @@ check_internal_exchange(IntXNameBin,
|
|||
arguments = XFUArgs},
|
||||
rabbit_federation_link_util:disposable_connection_call(
|
||||
Params, XFU, fun(404, Text) ->
|
||||
rabbit_federation_link_util:log_warning(
|
||||
XName, "detected internal upstream exchange changes,"
|
||||
" restarting link: ~tp", [Text]),
|
||||
?LOG_WARNING("detected internal upstream exchange changes,"
|
||||
" restarting link: ~tp", [Text]),
|
||||
upstream_not_found;
|
||||
(Code, Text) ->
|
||||
rabbit_federation_link_util:log_warning(
|
||||
XName, "internal upstream exchange check failed: ~tp ~tp",
|
||||
?LOG_WARNING("internal upstream exchange check failed: ~tp ~tp",
|
||||
[Code, Text]),
|
||||
error
|
||||
end).
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include("rabbit_exchange_federation.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
-include_lib("rabbit_federation.hrl").
|
||||
-define(SUPERVISOR, ?MODULE).
|
||||
|
||||
%% Supervises the upstream links for all exchanges (but not queues). We need
|
||||
|
@ -43,8 +45,8 @@ start_child(X) ->
|
|||
{ok, _Pid} -> ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
#exchange{name = ExchangeName} = X,
|
||||
rabbit_log_federation:debug("Federation link for exchange ~tp was already started",
|
||||
[rabbit_misc:rs(ExchangeName)]),
|
||||
?LOG_DEBUG("Federation link for exchange ~tp was already started",
|
||||
[rabbit_misc:rs(ExchangeName)]),
|
||||
ok;
|
||||
%% A link returned {stop, gone}, the link_sup shut down, that's OK.
|
||||
{error, {shutdown, _}} -> ok
|
||||
|
@ -67,9 +69,8 @@ stop_child(X) ->
|
|||
ok -> ok;
|
||||
{error, Err} ->
|
||||
#exchange{name = ExchangeName} = X,
|
||||
rabbit_log_federation:warning(
|
||||
"Attempt to stop a federation link for exchange ~tp failed: ~tp",
|
||||
[rabbit_misc:rs(ExchangeName), Err]),
|
||||
?LOG_WARNING("Attempt to stop a federation link for exchange ~tp failed: ~tp",
|
||||
[rabbit_misc:rs(ExchangeName), Err]),
|
||||
ok
|
||||
end,
|
||||
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)).
|
||||
|
@ -77,6 +78,7 @@ stop_child(X) ->
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION}),
|
||||
{ok, {{one_for_one, 1200, 60}, []}}.
|
||||
|
||||
%% See comment in rabbit_federation_queue_link_sup_sup:id/1
|
||||
|
|
|
@ -10,14 +10,14 @@
|
|||
-include_lib("rabbit/include/amqqueue.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include("rabbit_federation.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
|
||||
%% real
|
||||
-export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3,
|
||||
disposable_connection_call/3, ensure_connection_closed/1,
|
||||
log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9,
|
||||
handle_downstream_down/3, handle_upstream_down/3,
|
||||
get_connection_name/2, log_debug/3, log_info/3, log_warning/3,
|
||||
log_error/3]).
|
||||
get_connection_name/2]).
|
||||
|
||||
%% temp
|
||||
-export([connection_error/6]).
|
||||
|
@ -55,10 +55,9 @@ start_conn_ch(Fun, OUpstream, OUParams,
|
|||
process_flag(trap_exit, true),
|
||||
try
|
||||
R = Fun(Conn, Ch, DConn, DCh),
|
||||
log_info(
|
||||
XorQName, "connected to ~ts",
|
||||
[rabbit_federation_upstream:params_to_string(
|
||||
UParams)]),
|
||||
?LOG_INFO("Federation ~ts connected to ~ts",
|
||||
[rabbit_misc:rs(XorQName),
|
||||
rabbit_federation_upstream:params_to_string(UParams)]),
|
||||
Name = pget(name, amqp_connection:info(DConn, [name])),
|
||||
rabbit_federation_status:report(
|
||||
OUpstream, OUParams, XorQName, {running, Name}),
|
||||
|
@ -130,45 +129,44 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
|
|||
Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(E)),
|
||||
log_warning(XorQName,
|
||||
"did not connect to ~ts. Server has closed the connection due to an error, code: ~tp, "
|
||||
?LOG_WARNING("Federation ~ts did not connect to ~ts. Server has closed the connection due to an error, code: ~tp, "
|
||||
"message: ~ts",
|
||||
[rabbit_federation_upstream:params_to_string(UParams),
|
||||
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
|
||||
Code, Message]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
|
||||
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(E)),
|
||||
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
|
||||
[rabbit_federation_upstream:params_to_string(UParams),
|
||||
?LOG_WARNING("Federation ~ts did not connect to ~ts. Reason: ~tp",
|
||||
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
|
||||
E]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
|
||||
connection_error(remote, E, Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(E)),
|
||||
log_info(XorQName, "disconnected from ~ts~n~tp",
|
||||
[rabbit_federation_upstream:params_to_string(UParams), E]),
|
||||
?LOG_INFO("Federation ~ts disconnected from ~ts~n~tp",
|
||||
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
|
||||
connection_error(command_channel, E, Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(E)),
|
||||
log_info(XorQName, "failed to open a command channel for upstream ~ts~n~tp",
|
||||
[rabbit_federation_upstream:params_to_string(UParams), E]),
|
||||
?LOG_INFO("Federation ~ts failed to open a command channel for upstream ~ts~n~tp",
|
||||
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
|
||||
connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, {error, basic_cancel}),
|
||||
log_info(XorQName, "received a 'basic.cancel'", []),
|
||||
?LOG_INFO("Federation ~ts received a 'basic.cancel'", [rabbit_misc:rs(XorQName)]),
|
||||
{stop, {shutdown, restart}, State};
|
||||
|
||||
connection_error(local_start, E, Upstream, UParams, XorQName, State) ->
|
||||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(E)),
|
||||
log_warning(XorQName, "did not connect locally~n~tp", [E]),
|
||||
?LOG_WARNING("Federation ~ts did not connect locally~n~tp", [rabbit_misc:rs(XorQName), E]),
|
||||
{stop, {shutdown, restart}, State}.
|
||||
|
||||
%% If we terminate due to a gen_server call exploding (almost
|
||||
|
@ -285,7 +283,7 @@ log_terminate(shutdown, Upstream, UParams, XorQName) ->
|
|||
%% the link because configuration has changed. So try to shut down
|
||||
%% nicely so that we do not cause unacked messages to be
|
||||
%% redelivered.
|
||||
log_info(XorQName, "disconnecting from ~ts",
|
||||
?LOG_INFO("disconnecting from ~ts",
|
||||
[rabbit_federation_upstream:params_to_string(UParams)]),
|
||||
rabbit_federation_status:remove(Upstream, XorQName);
|
||||
|
||||
|
@ -295,21 +293,6 @@ log_terminate(Reason, Upstream, UParams, XorQName) ->
|
|||
rabbit_federation_status:report(
|
||||
Upstream, UParams, XorQName, clean_reason(Reason)).
|
||||
|
||||
log_debug(XorQName, Fmt, Args) -> log(debug, XorQName, Fmt, Args).
|
||||
log_info(XorQName, Fmt, Args) -> log(info, XorQName, Fmt, Args).
|
||||
log_warning(XorQName, Fmt, Args) -> log(warning, XorQName, Fmt, Args).
|
||||
log_error(XorQName, Fmt, Args) -> log(error, XorQName, Fmt, Args).
|
||||
|
||||
log(Level, XorQName, Fmt0, Args0) ->
|
||||
Fmt = "Federation ~ts " ++ Fmt0,
|
||||
Args = [rabbit_misc:rs(XorQName) | Args0],
|
||||
case Level of
|
||||
debug -> rabbit_log_federation:debug(Fmt, Args);
|
||||
info -> rabbit_log_federation:info(Fmt, Args);
|
||||
warning -> rabbit_log_federation:warning(Fmt, Args);
|
||||
error -> rabbit_log_federation:error(Fmt, Args)
|
||||
end.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
disposable_channel_call(Conn, Method) ->
|
||||
|
@ -327,12 +310,13 @@ disposable_channel_call(Conn, Method, ErrFun) ->
|
|||
end
|
||||
catch
|
||||
Exception:Reason ->
|
||||
rabbit_log_federation:error("Federation link could not create a disposable (one-off) channel due to an error ~tp: ~tp", [Exception, Reason])
|
||||
?LOG_ERROR("Federation link could not create a disposable (one-off) channel due to an error ~tp: ~tp",
|
||||
[Exception, Reason])
|
||||
end.
|
||||
|
||||
disposable_connection_call(Params, Method, ErrFun) ->
|
||||
try
|
||||
rabbit_log_federation:debug("Disposable connection parameters: ~tp", [Params]),
|
||||
?LOG_DEBUG("Disposable connection parameters: ~tp", [Params]),
|
||||
case open(Params, <<"Disposable exchange federation link connection">>) of
|
||||
{ok, Conn, Ch} ->
|
||||
try
|
||||
|
@ -345,15 +329,15 @@ disposable_connection_call(Params, Method, ErrFun) ->
|
|||
ensure_connection_closed(Conn)
|
||||
end;
|
||||
{error, {auth_failure, Message}} ->
|
||||
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection "
|
||||
"due to an authentication failure: ~ts", [Message]);
|
||||
?LOG_ERROR("Federation link could not open a disposable (one-off) connection "
|
||||
"due to an authentication failure: ~ts", [Message]);
|
||||
Error ->
|
||||
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, "
|
||||
"reason: ~tp", [Error]),
|
||||
?LOG_ERROR("Federation link could not open a disposable (one-off) connection, "
|
||||
"reason: ~tp", [Error]),
|
||||
Error
|
||||
end
|
||||
catch
|
||||
Exception:Reason ->
|
||||
rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection "
|
||||
"due to an error ~tp: ~tp", [Exception, Reason])
|
||||
?LOG_ERROR("Federation link could not create a disposable (one-off) connection "
|
||||
"due to an error ~tp: ~tp", [Exception, Reason])
|
||||
end.
|
||||
|
|
|
@ -13,13 +13,13 @@
|
|||
-export([start_scope/1, stop_scope/1]).
|
||||
|
||||
start_scope(Scope) ->
|
||||
rabbit_log_federation:debug("Starting pg scope ~ts", [Scope]),
|
||||
?LOG_DEBUG("Starting pg scope ~ts", [Scope]),
|
||||
_ = pg:start_link(Scope).
|
||||
|
||||
stop_scope(Scope) ->
|
||||
case whereis(Scope) of
|
||||
Pid when is_pid(Pid) ->
|
||||
rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]),
|
||||
?LOG_DEBUG("Stopping pg scope ~ts", [Scope]),
|
||||
Groups = pg:which_groups(Scope),
|
||||
lists:foreach(
|
||||
fun(Group) ->
|
||||
|
|
|
@ -1,108 +0,0 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
|
||||
%% @doc Compatibility module for the old Lager-based logging API.
|
||||
-module(rabbit_log_federation).
|
||||
|
||||
-export([debug/1, debug/2, debug/3,
|
||||
info/1, info/2, info/3,
|
||||
notice/1, notice/2, notice/3,
|
||||
warning/1, warning/2, warning/3,
|
||||
error/1, error/2, error/3,
|
||||
critical/1, critical/2, critical/3,
|
||||
alert/1, alert/2, alert/3,
|
||||
emergency/1, emergency/2, emergency/3,
|
||||
none/1, none/2, none/3]).
|
||||
|
||||
-include("logging.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
|
||||
-compile({no_auto_import, [error/2, error/3]}).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
-spec debug(string()) -> 'ok'.
|
||||
-spec debug(string(), [any()]) -> 'ok'.
|
||||
-spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec info(string()) -> 'ok'.
|
||||
-spec info(string(), [any()]) -> 'ok'.
|
||||
-spec info(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec notice(string()) -> 'ok'.
|
||||
-spec notice(string(), [any()]) -> 'ok'.
|
||||
-spec notice(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec warning(string()) -> 'ok'.
|
||||
-spec warning(string(), [any()]) -> 'ok'.
|
||||
-spec warning(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec error(string()) -> 'ok'.
|
||||
-spec error(string(), [any()]) -> 'ok'.
|
||||
-spec error(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec critical(string()) -> 'ok'.
|
||||
-spec critical(string(), [any()]) -> 'ok'.
|
||||
-spec critical(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec alert(string()) -> 'ok'.
|
||||
-spec alert(string(), [any()]) -> 'ok'.
|
||||
-spec alert(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec emergency(string()) -> 'ok'.
|
||||
-spec emergency(string(), [any()]) -> 'ok'.
|
||||
-spec emergency(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
-spec none(string()) -> 'ok'.
|
||||
-spec none(string(), [any()]) -> 'ok'.
|
||||
-spec none(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
debug(Format) -> debug(Format, []).
|
||||
debug(Format, Args) -> debug(self(), Format, Args).
|
||||
debug(Pid, Format, Args) ->
|
||||
?LOG_DEBUG(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
info(Format) -> info(Format, []).
|
||||
info(Format, Args) -> info(self(), Format, Args).
|
||||
info(Pid, Format, Args) ->
|
||||
?LOG_INFO(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
notice(Format) -> notice(Format, []).
|
||||
notice(Format, Args) -> notice(self(), Format, Args).
|
||||
notice(Pid, Format, Args) ->
|
||||
?LOG_NOTICE(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
warning(Format) -> warning(Format, []).
|
||||
warning(Format, Args) -> warning(self(), Format, Args).
|
||||
warning(Pid, Format, Args) ->
|
||||
?LOG_WARNING(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
error(Format) -> error(Format, []).
|
||||
error(Format, Args) -> error(self(), Format, Args).
|
||||
error(Pid, Format, Args) ->
|
||||
?LOG_ERROR(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
critical(Format) -> critical(Format, []).
|
||||
critical(Format, Args) -> critical(self(), Format, Args).
|
||||
critical(Pid, Format, Args) ->
|
||||
?LOG_CRITICAL(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
alert(Format) -> alert(Format, []).
|
||||
alert(Format, Args) -> alert(self(), Format, Args).
|
||||
alert(Pid, Format, Args) ->
|
||||
?LOG_ALERT(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
emergency(Format) -> emergency(Format, []).
|
||||
emergency(Format, Args) -> emergency(self(), Format, Args).
|
||||
emergency(Pid, Format, Args) ->
|
||||
?LOG_EMERGENCY(Format, Args, #{pid => Pid,
|
||||
domain => ?RMQLOG_DOMAIN_FEDERATION}).
|
||||
|
||||
none(_Format) -> ok.
|
||||
none(_Format, _Args) -> ok.
|
||||
none(_Pid, _Format, _Args) -> ok.
|
|
@ -10,7 +10,9 @@
|
|||
-include_lib("rabbit/include/amqqueue.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("rabbitmq_federation_common/include/rabbit_federation.hrl").
|
||||
-include_lib("rabbitmq_federation_common/include/logging.hrl").
|
||||
-include("rabbit_queue_federation.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
|
||||
-behaviour(gen_server2).
|
||||
|
||||
|
@ -53,6 +55,8 @@ q(QName) ->
|
|||
|
||||
init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
|
||||
QName = amqqueue:get_name(Queue),
|
||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION,
|
||||
queue => QName}),
|
||||
case rabbit_amqqueue:lookup(QName) of
|
||||
{ok, Q} ->
|
||||
DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream),
|
||||
|
@ -68,7 +72,7 @@ init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
|
|||
upstream = Upstream,
|
||||
upstream_params = UParams}};
|
||||
{error, not_found} ->
|
||||
rabbit_federation_link_util:log_warning(QName, "not found, stopping link", []),
|
||||
?LOG_WARNING("not found, stopping link", []),
|
||||
{stop, gone}
|
||||
end.
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit/include/amqqueue.hrl").
|
||||
-include("rabbit_queue_federation.hrl").
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
-include("rabbit_federation.hrl").
|
||||
-define(SUPERVISOR, ?MODULE).
|
||||
|
||||
%% Supervises the upstream links for all queues (but not exchanges). We need
|
||||
|
@ -43,8 +45,8 @@ start_child(Q) ->
|
|||
{ok, _Pid} -> ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
QueueName = amqqueue:get_name(Q),
|
||||
rabbit_log_federation:warning("Federation link for queue ~tp was already started",
|
||||
[rabbit_misc:rs(QueueName)]),
|
||||
?LOG_WARNING("Federation link for queue ~tp was already started",
|
||||
[rabbit_misc:rs(QueueName)]),
|
||||
ok;
|
||||
%% A link returned {stop, gone}, the link_sup shut down, that's OK.
|
||||
{error, {shutdown, _}} -> ok
|
||||
|
@ -66,9 +68,8 @@ stop_child(Q) ->
|
|||
ok -> ok;
|
||||
{error, Err} ->
|
||||
QueueName = amqqueue:get_name(Q),
|
||||
rabbit_log_federation:warning(
|
||||
"Attempt to stop a federation link for queue ~tp failed: ~tp",
|
||||
[rabbit_misc:rs(QueueName), Err]),
|
||||
?LOG_WARNING("Attempt to stop a federation link for queue ~tp failed: ~tp",
|
||||
[rabbit_misc:rs(QueueName), Err]),
|
||||
ok
|
||||
end,
|
||||
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Q)).
|
||||
|
|
|
@ -43,6 +43,7 @@ stop(_State) ->
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION}),
|
||||
Flags = #{
|
||||
strategy => one_for_one,
|
||||
intensity => 3,
|
||||
|
|
Loading…
Reference in New Issue