[skip ci] Remove rabbit_log_shovel, use LOG_ macros directly
This commit is contained in:
parent
301f6e9906
commit
84d52b51dc
|
@ -36,7 +36,8 @@
|
||||||
-import(rabbit_misc, [pget/2, pget/3]).
|
-import(rabbit_misc, [pget/2, pget/3]).
|
||||||
-import(rabbit_data_coercion, [to_binary/1]).
|
-import(rabbit_data_coercion, [to_binary/1]).
|
||||||
|
|
||||||
-define(INFO(Text, Args), rabbit_log_shovel:info(Text, Args)).
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
-define(LINK_CREDIT_TIMEOUT, 20_000).
|
-define(LINK_CREDIT_TIMEOUT, 20_000).
|
||||||
|
|
||||||
-type state() :: rabbit_shovel_behaviour:state().
|
-type state() :: rabbit_shovel_behaviour:state().
|
||||||
|
@ -194,7 +195,7 @@ handle_source({amqp10_event, {connection, Conn, opened}},
|
||||||
handle_source({amqp10_event, {connection, Conn, {closed, Why}}},
|
handle_source({amqp10_event, {connection, Conn, {closed, Why}}},
|
||||||
#{source := #{current := #{conn := Conn}},
|
#{source := #{current := #{conn := Conn}},
|
||||||
name := Name}) ->
|
name := Name}) ->
|
||||||
?INFO("Shovel ~ts source connection closed. Reason: ~tp", [Name, Why]),
|
?LOG_INFO("Shovel ~ts source connection closed. Reason: ~tp", [Name, Why]),
|
||||||
{stop, {inbound_conn_closed, Why}};
|
{stop, {inbound_conn_closed, Why}};
|
||||||
handle_source({amqp10_event, {session, Sess, begun}},
|
handle_source({amqp10_event, {session, Sess, begun}},
|
||||||
State = #{source := #{current := #{session := Sess}}}) ->
|
State = #{source := #{current := #{session := Sess}}}) ->
|
||||||
|
@ -231,7 +232,7 @@ handle_dest({amqp10_disposition, {Result, Tag}},
|
||||||
{#{Tag := IncomingTag}, rejected} ->
|
{#{Tag := IncomingTag}, rejected} ->
|
||||||
{1, rabbit_shovel_behaviour:nack(IncomingTag, false, State1)};
|
{1, rabbit_shovel_behaviour:nack(IncomingTag, false, State1)};
|
||||||
_ -> % not found - this should ideally not happen
|
_ -> % not found - this should ideally not happen
|
||||||
rabbit_log_shovel:warning("Shovel ~ts amqp10 destination disposition tag not found: ~tp",
|
?LOG_WARNING("Shovel ~ts amqp10 destination disposition tag not found: ~tp",
|
||||||
[Name, Tag]),
|
[Name, Tag]),
|
||||||
{0, State1}
|
{0, State1}
|
||||||
end,
|
end,
|
||||||
|
@ -242,7 +243,7 @@ handle_dest({amqp10_event, {connection, Conn, opened}},
|
||||||
handle_dest({amqp10_event, {connection, Conn, {closed, Why}}},
|
handle_dest({amqp10_event, {connection, Conn, {closed, Why}}},
|
||||||
#{name := Name,
|
#{name := Name,
|
||||||
dest := #{current := #{conn := Conn}}}) ->
|
dest := #{current := #{conn := Conn}}}) ->
|
||||||
?INFO("Shovel ~ts destination connection closed. Reason: ~tp", [Name, Why]),
|
?LOG_INFO("Shovel ~ts destination connection closed. Reason: ~tp", [Name, Why]),
|
||||||
{stop, {outbound_conn_died, Why}};
|
{stop, {outbound_conn_died, Why}};
|
||||||
handle_dest({amqp10_event, {session, Sess, begun}},
|
handle_dest({amqp10_event, {session, Sess, begun}},
|
||||||
State = #{dest := #{current := #{session := Sess}}}) ->
|
State = #{dest := #{current := #{session := Sess}}}) ->
|
||||||
|
|
|
@ -1,108 +0,0 @@
|
||||||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
||||||
%%
|
|
||||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
||||||
%%
|
|
||||||
|
|
||||||
%% @doc Compatibility module for the old Lager-based logging API.
|
|
||||||
-module(rabbit_log_shovel).
|
|
||||||
|
|
||||||
-export([debug/1, debug/2, debug/3,
|
|
||||||
info/1, info/2, info/3,
|
|
||||||
notice/1, notice/2, notice/3,
|
|
||||||
warning/1, warning/2, warning/3,
|
|
||||||
error/1, error/2, error/3,
|
|
||||||
critical/1, critical/2, critical/3,
|
|
||||||
alert/1, alert/2, alert/3,
|
|
||||||
emergency/1, emergency/2, emergency/3,
|
|
||||||
none/1, none/2, none/3]).
|
|
||||||
|
|
||||||
-include("logging.hrl").
|
|
||||||
-include_lib("kernel/include/logger.hrl").
|
|
||||||
|
|
||||||
-compile({no_auto_import, [error/2, error/3]}).
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec debug(string()) -> 'ok'.
|
|
||||||
-spec debug(string(), [any()]) -> 'ok'.
|
|
||||||
-spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec info(string()) -> 'ok'.
|
|
||||||
-spec info(string(), [any()]) -> 'ok'.
|
|
||||||
-spec info(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec notice(string()) -> 'ok'.
|
|
||||||
-spec notice(string(), [any()]) -> 'ok'.
|
|
||||||
-spec notice(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec warning(string()) -> 'ok'.
|
|
||||||
-spec warning(string(), [any()]) -> 'ok'.
|
|
||||||
-spec warning(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec error(string()) -> 'ok'.
|
|
||||||
-spec error(string(), [any()]) -> 'ok'.
|
|
||||||
-spec error(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec critical(string()) -> 'ok'.
|
|
||||||
-spec critical(string(), [any()]) -> 'ok'.
|
|
||||||
-spec critical(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec alert(string()) -> 'ok'.
|
|
||||||
-spec alert(string(), [any()]) -> 'ok'.
|
|
||||||
-spec alert(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec emergency(string()) -> 'ok'.
|
|
||||||
-spec emergency(string(), [any()]) -> 'ok'.
|
|
||||||
-spec emergency(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
-spec none(string()) -> 'ok'.
|
|
||||||
-spec none(string(), [any()]) -> 'ok'.
|
|
||||||
-spec none(pid() | [tuple()], string(), [any()]) -> 'ok'.
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
debug(Format) -> debug(Format, []).
|
|
||||||
debug(Format, Args) -> debug(self(), Format, Args).
|
|
||||||
debug(Pid, Format, Args) ->
|
|
||||||
?LOG_DEBUG(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
info(Format) -> info(Format, []).
|
|
||||||
info(Format, Args) -> info(self(), Format, Args).
|
|
||||||
info(Pid, Format, Args) ->
|
|
||||||
?LOG_INFO(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
notice(Format) -> notice(Format, []).
|
|
||||||
notice(Format, Args) -> notice(self(), Format, Args).
|
|
||||||
notice(Pid, Format, Args) ->
|
|
||||||
?LOG_NOTICE(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
warning(Format) -> warning(Format, []).
|
|
||||||
warning(Format, Args) -> warning(self(), Format, Args).
|
|
||||||
warning(Pid, Format, Args) ->
|
|
||||||
?LOG_WARNING(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
error(Format) -> error(Format, []).
|
|
||||||
error(Format, Args) -> error(self(), Format, Args).
|
|
||||||
error(Pid, Format, Args) ->
|
|
||||||
?LOG_ERROR(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
critical(Format) -> critical(Format, []).
|
|
||||||
critical(Format, Args) -> critical(self(), Format, Args).
|
|
||||||
critical(Pid, Format, Args) ->
|
|
||||||
?LOG_CRITICAL(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
alert(Format) -> alert(Format, []).
|
|
||||||
alert(Format, Args) -> alert(self(), Format, Args).
|
|
||||||
alert(Pid, Format, Args) ->
|
|
||||||
?LOG_ALERT(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
emergency(Format) -> emergency(Format, []).
|
|
||||||
emergency(Format, Args) -> emergency(self(), Format, Args).
|
|
||||||
emergency(Pid, Format, Args) ->
|
|
||||||
?LOG_EMERGENCY(Format, Args, #{pid => Pid,
|
|
||||||
domain => ?RMQLOG_DOMAIN_SHOVEL}).
|
|
||||||
|
|
||||||
none(_Format) -> ok.
|
|
||||||
none(_Format, _Args) -> ok.
|
|
||||||
none(_Pid, _Format, _Args) -> ok.
|
|
|
@ -34,6 +34,8 @@
|
||||||
incr_forwarded/1
|
incr_forwarded/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
-type tag() :: non_neg_integer().
|
-type tag() :: non_neg_integer().
|
||||||
-type uri() :: string() | binary().
|
-type uri() :: string() | binary().
|
||||||
-type ack_mode() :: 'no_ack' | 'on_confirm' | 'on_publish'.
|
-type ack_mode() :: 'no_ack' | 'on_confirm' | 'on_publish'.
|
||||||
|
@ -189,7 +191,7 @@ decr_remaining(N, State = #{source := #{remaining := M} = Src,
|
||||||
case M > N of
|
case M > N of
|
||||||
true -> State#{source => Src#{remaining => M - N}};
|
true -> State#{source => Src#{remaining => M - N}};
|
||||||
false ->
|
false ->
|
||||||
rabbit_log_shovel:info("shutting down Shovel '~ts', no messages left to transfer", [Name]),
|
?LOG_INFO("shutting down Shovel '~ts', no messages left to transfer", [Name]),
|
||||||
rabbit_log_shovel:debug("shutting down Shovel '~ts', no messages left to transfer. Shovel state: ~tp", [Name, State]),
|
?LOG_DEBUG("shutting down Shovel '~ts', no messages left to transfer. Shovel state: ~tp", [Name, State]),
|
||||||
exit({shutdown, autodelete})
|
exit({shutdown, autodelete})
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||||
-include("rabbit_shovel.hrl").
|
-include("rabbit_shovel.hrl").
|
||||||
-include_lib("kernel/include/logger.hrl").
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
-include_lib("logging.hrl").
|
||||||
-define(SUPERVISOR, ?MODULE).
|
-define(SUPERVISOR, ?MODULE).
|
||||||
|
|
||||||
start_link(Name, Config) ->
|
start_link(Name, Config) ->
|
||||||
|
@ -35,6 +36,7 @@ maybe_start_link(_, Name, Config) ->
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Name, Config0]) ->
|
init([Name, Config0]) ->
|
||||||
|
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_SHOVEL}),
|
||||||
Config = rabbit_data_coercion:to_proplist(Config0),
|
Config = rabbit_data_coercion:to_proplist(Config0),
|
||||||
Delay = pget(<<"reconnect-delay">>, Config, ?DEFAULT_RECONNECT_DELAY),
|
Delay = pget(<<"reconnect-delay">>, Config, ?DEFAULT_RECONNECT_DELAY),
|
||||||
case Name of
|
case Name of
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
-import(rabbit_misc, [pget/2]).
|
-import(rabbit_misc, [pget/2]).
|
||||||
-import(rabbit_data_coercion, [to_map/1, to_list/1]).
|
-import(rabbit_data_coercion, [to_map/1, to_list/1]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
-include_lib("logging.hrl").
|
||||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||||
-define(SUPERVISOR, ?MODULE).
|
-define(SUPERVISOR, ?MODULE).
|
||||||
|
|
||||||
|
@ -37,10 +39,10 @@ adjust(Name, Def) ->
|
||||||
start_child(Name, Def).
|
start_child(Name, Def).
|
||||||
|
|
||||||
start_child({VHost, ShovelName} = Name, Def) ->
|
start_child({VHost, ShovelName} = Name, Def) ->
|
||||||
rabbit_log_shovel:debug("Asked to start a dynamic Shovel named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
?LOG_DEBUG("Asked to start a dynamic Shovel named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
||||||
LockId = rabbit_shovel_locks:lock(Name),
|
LockId = rabbit_shovel_locks:lock(Name),
|
||||||
cleanup_specs(),
|
cleanup_specs(),
|
||||||
rabbit_log_shovel:debug("Starting a mirrored supervisor named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
?LOG_DEBUG("Starting a mirrored supervisor named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
||||||
case child_exists(Name)
|
case child_exists(Name)
|
||||||
orelse mirrored_supervisor:start_child(
|
orelse mirrored_supervisor:start_child(
|
||||||
?SUPERVISOR,
|
?SUPERVISOR,
|
||||||
|
@ -68,7 +70,7 @@ child_exists(Name) ->
|
||||||
mirrored_supervisor:which_children(?SUPERVISOR)).
|
mirrored_supervisor:which_children(?SUPERVISOR)).
|
||||||
|
|
||||||
stop_child({VHost, ShovelName} = Name) ->
|
stop_child({VHost, ShovelName} = Name) ->
|
||||||
rabbit_log_shovel:debug("Asked to stop a dynamic Shovel named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
?LOG_DEBUG("Asked to stop a dynamic Shovel named '~ts' in virtual host '~ts'", [ShovelName, VHost]),
|
||||||
LockId = rabbit_shovel_locks:lock(Name),
|
LockId = rabbit_shovel_locks:lock(Name),
|
||||||
case get({shovel_worker_autodelete, Name}) of
|
case get({shovel_worker_autodelete, Name}) of
|
||||||
true -> ok; %% [1]
|
true -> ok; %% [1]
|
||||||
|
@ -136,6 +138,7 @@ cleanup_specs() ->
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_SHOVEL}),
|
||||||
{ok, {{one_for_one, 3, 10}, []}}.
|
{ok, {{one_for_one, 3, 10}, []}}.
|
||||||
|
|
||||||
id({VHost, ShovelName} = Name)
|
id({VHost, ShovelName} = Name)
|
||||||
|
|
|
@ -25,6 +25,9 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
-include("logging.hrl").
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(ETS_NAME, ?MODULE).
|
-define(ETS_NAME, ?MODULE).
|
||||||
-define(CHECK_FREQUENCY, 60000).
|
-define(CHECK_FREQUENCY, 60000).
|
||||||
|
@ -113,6 +116,7 @@ get_status_table() ->
|
||||||
gen_server:call(?SERVER, get_status_table).
|
gen_server:call(?SERVER, get_status_table).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_SHOVEL}),
|
||||||
?ETS_NAME = ets:new(?ETS_NAME,
|
?ETS_NAME = ets:new(?ETS_NAME,
|
||||||
[named_table, {keypos, #entry.name}, private]),
|
[named_table, {keypos, #entry.name}, private]),
|
||||||
{ok, ensure_timer(#state{})}.
|
{ok, ensure_timer(#state{})}.
|
||||||
|
@ -185,7 +189,7 @@ handle_info(check, State) ->
|
||||||
rabbit_shovel_dyn_worker_sup_sup:cleanup_specs()
|
rabbit_shovel_dyn_worker_sup_sup:cleanup_specs()
|
||||||
catch
|
catch
|
||||||
C:E ->
|
C:E ->
|
||||||
rabbit_log_shovel:warning("Recurring shovel spec clean up failed with ~p:~p", [C, E])
|
?LOG_WARNING("Recurring shovel spec clean up failed with ~p:~p", [C, E])
|
||||||
end,
|
end,
|
||||||
{noreply, ensure_timer(State)};
|
{noreply, ensure_timer(State)};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
|
|
@ -89,7 +89,7 @@ restart_shovel(VHost, Name) ->
|
||||||
not_found ->
|
not_found ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
_Obj ->
|
_Obj ->
|
||||||
rabbit_log_shovel:info("Shovel '~ts' in virtual host '~ts' will be restarted", [Name, VHost]),
|
?LOG_INFO("Shovel '~ts' in virtual host '~ts' will be restarted", [Name, VHost]),
|
||||||
ok = rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}),
|
ok = rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}),
|
||||||
{ok, _} = rabbit_shovel_dyn_worker_sup_sup:start_link(),
|
{ok, _} = rabbit_shovel_dyn_worker_sup_sup:start_link(),
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
get_internal_config/1]).
|
get_internal_config/1]).
|
||||||
|
|
||||||
-include("rabbit_shovel.hrl").
|
-include("rabbit_shovel.hrl").
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
-include("logging.hrl").
|
||||||
|
|
||||||
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
|
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
|
||||||
type :: static | dynamic,
|
type :: static | dynamic,
|
||||||
|
@ -44,6 +46,7 @@ maybe_start_link(_, Type, Name, Config) ->
|
||||||
%%---------------------------
|
%%---------------------------
|
||||||
|
|
||||||
init([Type, Name, Config0]) ->
|
init([Type, Name, Config0]) ->
|
||||||
|
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_SHOVEL}),
|
||||||
Config = case Type of
|
Config = case Type of
|
||||||
static ->
|
static ->
|
||||||
Config0;
|
Config0;
|
||||||
|
@ -54,7 +57,7 @@ init([Type, Name, Config0]) ->
|
||||||
Config0),
|
Config0),
|
||||||
Conf
|
Conf
|
||||||
end,
|
end,
|
||||||
rabbit_log_shovel:debug("Initialising a Shovel ~ts of type '~ts'", [human_readable_name(Name), Type]),
|
?LOG_DEBUG("Initialising a Shovel ~ts of type '~ts'", [human_readable_name(Name), Type]),
|
||||||
gen_server2:cast(self(), init),
|
gen_server2:cast(self(), init),
|
||||||
{ok, #state{name = Name, type = Type, config = Config}}.
|
{ok, #state{name = Name, type = Type, config = Config}}.
|
||||||
|
|
||||||
|
@ -62,29 +65,29 @@ handle_call(_Msg, _From, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_cast(init, State = #state{config = Config0}) ->
|
handle_cast(init, State = #state{config = Config0}) ->
|
||||||
rabbit_log_shovel:debug("Shovel ~ts is reporting its status", [human_readable_name(State#state.name)]),
|
?LOG_DEBUG("Shovel ~ts is reporting its status", [human_readable_name(State#state.name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type, starting),
|
rabbit_shovel_status:report(State#state.name, State#state.type, starting),
|
||||||
rabbit_log_shovel:info("Shovel ~ts will now try to connect...", [human_readable_name(State#state.name)]),
|
?LOG_INFO("Shovel ~ts will now try to connect...", [human_readable_name(State#state.name)]),
|
||||||
try rabbit_shovel_behaviour:connect_source(Config0) of
|
try rabbit_shovel_behaviour:connect_source(Config0) of
|
||||||
Config ->
|
Config ->
|
||||||
rabbit_log_shovel:debug("Shovel ~ts connected to source", [human_readable_name(maps:get(name, Config))]),
|
?LOG_DEBUG("Shovel ~ts connected to source", [human_readable_name(maps:get(name, Config))]),
|
||||||
%% this makes sure that connection pid is updated in case
|
%% this makes sure that connection pid is updated in case
|
||||||
%% any of the subsequent connection/init steps fail. See
|
%% any of the subsequent connection/init steps fail. See
|
||||||
%% rabbitmq/rabbitmq-shovel#54 for context.
|
%% rabbitmq/rabbitmq-shovel#54 for context.
|
||||||
gen_server2:cast(self(), connect_dest),
|
gen_server2:cast(self(), connect_dest),
|
||||||
{noreply, State#state{config = Config}}
|
{noreply, State#state{config = Config}}
|
||||||
catch E:R ->
|
catch E:R ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts could not connect to source: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
|
?LOG_ERROR("Shovel ~ts could not connect to source: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
|
||||||
{stop, shutdown, State}
|
{stop, shutdown, State}
|
||||||
end;
|
end;
|
||||||
handle_cast(connect_dest, State = #state{config = Config0}) ->
|
handle_cast(connect_dest, State = #state{config = Config0}) ->
|
||||||
try rabbit_shovel_behaviour:connect_dest(Config0) of
|
try rabbit_shovel_behaviour:connect_dest(Config0) of
|
||||||
Config ->
|
Config ->
|
||||||
rabbit_log_shovel:debug("Shovel ~ts connected to destination", [human_readable_name(maps:get(name, Config))]),
|
?LOG_DEBUG("Shovel ~ts connected to destination", [human_readable_name(maps:get(name, Config))]),
|
||||||
gen_server2:cast(self(), init_shovel),
|
gen_server2:cast(self(), init_shovel),
|
||||||
{noreply, State#state{config = Config}}
|
{noreply, State#state{config = Config}}
|
||||||
catch E:R ->
|
catch E:R ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts could not connect to destination: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
|
?LOG_ERROR("Shovel ~ts could not connect to destination: ~p ~p", [human_readable_name(maps:get(name, Config0)), E, R]),
|
||||||
{stop, shutdown, State}
|
{stop, shutdown, State}
|
||||||
end;
|
end;
|
||||||
handle_cast(init_shovel, State = #state{config = Config}) ->
|
handle_cast(init_shovel, State = #state{config = Config}) ->
|
||||||
|
@ -94,7 +97,7 @@ handle_cast(init_shovel, State = #state{config = Config}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
Config1 = rabbit_shovel_behaviour:init_dest(Config),
|
Config1 = rabbit_shovel_behaviour:init_dest(Config),
|
||||||
Config2 = rabbit_shovel_behaviour:init_source(Config1),
|
Config2 = rabbit_shovel_behaviour:init_source(Config1),
|
||||||
rabbit_log_shovel:debug("Shovel ~ts has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]),
|
?LOG_DEBUG("Shovel ~ts has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]),
|
||||||
State1 = State#state{config = Config2},
|
State1 = State#state{config = Config2},
|
||||||
ok = report_running(State1),
|
ok = report_running(State1),
|
||||||
{noreply, State1}.
|
{noreply, State1}.
|
||||||
|
@ -105,19 +108,19 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
|
||||||
not_handled ->
|
not_handled ->
|
||||||
case rabbit_shovel_behaviour:handle_dest(Msg, Config) of
|
case rabbit_shovel_behaviour:handle_dest(Msg, Config) of
|
||||||
not_handled ->
|
not_handled ->
|
||||||
rabbit_log_shovel:warning("Shovel ~ts could not handle a destination message ~tp", [human_readable_name(Name), Msg]),
|
?LOG_WARNING("Shovel ~ts could not handle a destination message ~tp", [human_readable_name(Name), Msg]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{stop, {outbound_conn_died, heartbeat_timeout}} ->
|
{stop, {outbound_conn_died, heartbeat_timeout}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected missed heartbeats on destination connection", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts detected missed heartbeats on destination connection", [human_readable_name(Name)]),
|
||||||
{stop, {shutdown, heartbeat_timeout}, State};
|
{stop, {shutdown, heartbeat_timeout}, State};
|
||||||
{stop, {outbound_conn_died, Reason}} ->
|
{stop, {outbound_conn_died, Reason}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected destination connection failure: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts detected destination connection failure: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
{stop, {outbound_link_or_channel_closure, Reason}} ->
|
{stop, {outbound_link_or_channel_closure, Reason}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected destination shovel failure: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts detected destination shovel failure: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
{stop, Reason} ->
|
{stop, Reason} ->
|
||||||
rabbit_log_shovel:debug("Shovel ~ts decided to stop due a message from destination: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_DEBUG("Shovel ~ts decided to stop due a message from destination: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
Config1 ->
|
Config1 ->
|
||||||
State1 = State#state{config = Config1},
|
State1 = State#state{config = Config1},
|
||||||
|
@ -125,16 +128,16 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
|
||||||
{noreply, State2}
|
{noreply, State2}
|
||||||
end;
|
end;
|
||||||
{stop, {inbound_conn_died, heartbeat_timeout}} ->
|
{stop, {inbound_conn_died, heartbeat_timeout}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected missed heartbeats on source connection", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts detected missed heartbeats on source connection", [human_readable_name(Name)]),
|
||||||
{stop, {shutdown, heartbeat_timeout}, State};
|
{stop, {shutdown, heartbeat_timeout}, State};
|
||||||
{stop, {inbound_conn_died, Reason}} ->
|
{stop, {inbound_conn_died, Reason}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected source connection failure: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts detected source connection failure: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
{stop, {inbound_link_or_channel_closure, Reason}} ->
|
{stop, {inbound_link_or_channel_closure, Reason}} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts detected source Shovel (or link, or channel) failure: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts detected source Shovel (or link, or channel) failure: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
{stop, Reason} ->
|
{stop, Reason} ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts decided to stop due a message from source: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts decided to stop due a message from source: ~tp", [human_readable_name(Name), Reason]),
|
||||||
{stop, Reason, State};
|
{stop, Reason, State};
|
||||||
Config1 ->
|
Config1 ->
|
||||||
State1 = State#state{config = Config1},
|
State1 = State#state{config = Config1},
|
||||||
|
@ -145,7 +148,7 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
|
||||||
terminate({shutdown, autodelete}, State = #state{name = Name,
|
terminate({shutdown, autodelete}, State = #state{name = Name,
|
||||||
type = dynamic}) ->
|
type = dynamic}) ->
|
||||||
{VHost, ShovelName} = Name,
|
{VHost, ShovelName} = Name,
|
||||||
rabbit_log_shovel:info("Shovel '~ts' is stopping (it was configured to autodelete and transfer is completed)",
|
?LOG_INFO("Shovel '~ts' is stopping (it was configured to autodelete and transfer is completed)",
|
||||||
[human_readable_name(Name)]),
|
[human_readable_name(Name)]),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
%% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1
|
%% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1
|
||||||
|
@ -158,43 +161,43 @@ terminate(shutdown, State = #state{name = Name}) ->
|
||||||
rabbit_shovel_status:remove(Name),
|
rabbit_shovel_status:remove(Name),
|
||||||
ok;
|
ok;
|
||||||
terminate(socket_closed_unexpectedly, State = #state{name = Name}) ->
|
terminate(socket_closed_unexpectedly, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping because of the socket closed unexpectedly", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping because of the socket closed unexpectedly", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "socket closed"}),
|
{terminated, "socket closed"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) ->
|
terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "heartbeat timeout"}),
|
{terminated, "heartbeat timeout"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) ->
|
terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "destination connection failed"}),
|
{terminated, "destination connection failed"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) ->
|
terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping because destination connection failed", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "source connection failed"}),
|
{terminated, "source connection failed"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({shutdown, heartbeat_timeout}, State = #state{name = Name}) ->
|
terminate({shutdown, heartbeat_timeout}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "heartbeat timeout"}),
|
{terminated, "heartbeat timeout"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({shutdown, restart}, State = #state{name = Name}) ->
|
terminate({shutdown, restart}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping to restart", [human_readable_name(Name)]),
|
?LOG_ERROR("Shovel ~ts is stopping to restart", [human_readable_name(Name)]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, "needed a restart"}),
|
{terminated, "needed a restart"}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) ->
|
terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping: one of its connections closed "
|
?LOG_ERROR("Shovel ~ts is stopping: one of its connections closed "
|
||||||
"with code ~b, reason: ~ts",
|
"with code ~b, reason: ~ts",
|
||||||
[human_readable_name(Name), Code, Reason]),
|
[human_readable_name(Name), Code, Reason]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
|
@ -202,7 +205,7 @@ terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #stat
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
ok;
|
ok;
|
||||||
terminate(Reason, State = #state{name = Name}) ->
|
terminate(Reason, State = #state{name = Name}) ->
|
||||||
rabbit_log_shovel:error("Shovel ~ts is stopping, reason: ~tp", [human_readable_name(Name), Reason]),
|
?LOG_ERROR("Shovel ~ts is stopping, reason: ~tp", [human_readable_name(Name), Reason]),
|
||||||
rabbit_shovel_status:report(State#state.name, State#state.type,
|
rabbit_shovel_status:report(State#state.name, State#state.type,
|
||||||
{terminated, Reason}),
|
{terminated, Reason}),
|
||||||
close_connections(State),
|
close_connections(State),
|
||||||
|
|
Loading…
Reference in New Issue