[skip ci] Simplify logging around peer discovery

This commit is contained in:
Michal Kuratczyk 2025-07-14 13:52:07 +02:00
parent 42c162ca3c
commit 8e4492a2a2
No known key found for this signature in database
6 changed files with 62 additions and 73 deletions

View File

@ -7,8 +7,6 @@
-include_lib("rabbit_common/include/logging.hrl"). -include_lib("rabbit_common/include/logging.hrl").
-define(RMQLOG_DOMAIN_PEER_DIS, ?DEFINE_RMQLOG_DOMAIN(peer_discovery)).
% rabbitmq/rabbitmq-peer-discovery-aws#25 % rabbitmq/rabbitmq-peer-discovery-aws#25
% Note: this timeout must not be greater than the default % Note: this timeout must not be greater than the default
% gen_server:call timeout of 5000ms. This `timeout`, % gen_server:call timeout of 5000ms. This `timeout`,

View File

@ -84,12 +84,12 @@ check_cluster() ->
{ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([]) -> init([]) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Map = ?CONFIG_MODULE:config_map(?CONFIG_KEY), Map = ?CONFIG_MODULE:config_map(?CONFIG_KEY),
case map_size(Map) of case map_size(Map) of
0 -> 0 ->
?LOG_INFO( ?LOG_INFO(
"Peer discovery: node cleanup is disabled", "Peer discovery: node cleanup is disabled"),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
{ok, #state{}}; {ok, #state{}};
_ -> _ ->
Interval = ?CONFIG_MODULE:get(cleanup_interval, ?CONFIG_MAPPING, Map), Interval = ?CONFIG_MODULE:get(cleanup_interval, ?CONFIG_MAPPING, Map),
@ -103,8 +103,7 @@ init([]) ->
end, end,
?LOG_INFO( ?LOG_INFO(
"Peer discovery: enabling node cleanup (~ts). Check interval: ~tp seconds.", "Peer discovery: enabling node cleanup (~ts). Check interval: ~tp seconds.",
[WarnMsg, State#state.interval], [WarnMsg, State#state.interval]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
{ok, State} {ok, State}
end. end.
@ -126,8 +125,7 @@ init([]) ->
handle_call(check_cluster, _From, State) -> handle_call(check_cluster, _From, State) ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: checking for partitioned nodes to clean up.", "Peer discovery: checking for partitioned nodes to clean up."),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
maybe_cleanup(State), maybe_cleanup(State),
{reply, ok, State}; {reply, ok, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
@ -236,26 +234,24 @@ maybe_cleanup(State) ->
UnreachableNodes :: [node()]) -> ok. UnreachableNodes :: [node()]) -> ok.
maybe_cleanup(_, []) -> maybe_cleanup(_, []) ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: all known cluster nodes are up.", "Peer discovery: all known cluster nodes are up.");
#{domain => ?RMQLOG_DOMAIN_PEER_DIS});
maybe_cleanup(State, UnreachableNodes) -> maybe_cleanup(State, UnreachableNodes) ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: cleanup discovered unreachable nodes: ~tp", "Peer discovery: cleanup discovered unreachable nodes: ~tp",
[UnreachableNodes], [UnreachableNodes]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
case lists:subtract(as_list(UnreachableNodes), as_list(service_discovery_nodes())) of case lists:subtract(as_list(UnreachableNodes), as_list(service_discovery_nodes())) of
[] -> [] ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: all unreachable nodes are still " "Peer discovery: all unreachable nodes are still "
"registered with the discovery backend ~tp", "registered with the discovery backend ~tp",
[rabbit_peer_discovery:backend()], [rabbit_peer_discovery:backend()],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok; ok;
Nodes -> Nodes ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: unreachable nodes are not registered " "Peer discovery: unreachable nodes are not registered "
"with the discovery backend ~tp", [Nodes], "with the discovery backend ~tp", [Nodes]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
maybe_remove_nodes(Nodes, State#state.warn_only) maybe_remove_nodes(Nodes, State#state.warn_only)
end. end.
@ -272,17 +268,14 @@ maybe_cleanup(State, UnreachableNodes) ->
maybe_remove_nodes([], _) -> ok; maybe_remove_nodes([], _) -> ok;
maybe_remove_nodes([Node | Nodes], true) -> maybe_remove_nodes([Node | Nodes], true) ->
?LOG_WARNING( ?LOG_WARNING(
"Peer discovery: node ~ts is unreachable", [Node], "Peer discovery: node ~ts is unreachable", [Node]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
maybe_remove_nodes(Nodes, true); maybe_remove_nodes(Nodes, true);
maybe_remove_nodes([Node | Nodes], false) -> maybe_remove_nodes([Node | Nodes], false) ->
?LOG_WARNING( ?LOG_WARNING(
"Peer discovery: removing unknown node ~ts from the cluster", [Node], "Peer discovery: removing unknown node ~ts from the cluster", [Node]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
_ = rabbit_db_cluster:forget_member(Node, false), _ = rabbit_db_cluster:forget_member(Node, false),
?LOG_WARNING( ?LOG_WARNING(
"Peer discovery: removing all quorum queue replicas on node ~ts", [Node], "Peer discovery: removing all quorum queue replicas on node ~ts", [Node]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
_ = rabbit_quorum_queue:shrink_all(Node), _ = rabbit_quorum_queue:shrink_all(Node),
maybe_remove_nodes(Nodes, false). maybe_remove_nodes(Nodes, false).
@ -310,13 +303,11 @@ service_discovery_nodes() ->
Nodes = as_list(OneOrMultipleNodes), Nodes = as_list(OneOrMultipleNodes),
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned ~tp", "Peer discovery cleanup: ~tp returned ~tp",
[Module, Nodes], [Module, Nodes]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
Nodes; Nodes;
{error, Reason} -> {error, Reason} ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned error ~tp", "Peer discovery cleanup: ~tp returned error ~tp",
[Module, Reason], [Module, Reason]),
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
[] []
end. end.

View File

@ -28,7 +28,7 @@ get(Key, Mapping, Config) ->
?LOG_ERROR( ?LOG_ERROR(
"Key ~ts is not found in peer discovery config mapping ~tp!", "Key ~ts is not found in peer discovery config mapping ~tp!",
[Key, Mapping], [Key, Mapping],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
throw({badkey, Key}); throw({badkey, Key});
true -> true ->
get_with_entry_meta(Key, maps:get(Key, Mapping), Config) get_with_entry_meta(Key, maps:get(Key, Mapping), Config)
@ -44,7 +44,7 @@ get_integer(Key, Mapping, Config) ->
?LOG_ERROR( ?LOG_ERROR(
"Key ~ts is not found in peer discovery config mapping ~tp!", "Key ~ts is not found in peer discovery config mapping ~tp!",
[Key, Mapping], [Key, Mapping],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
throw({badkey, Key}); throw({badkey, Key});
true -> true ->
get_integer_with_entry_meta(Key, maps:get(Key, Mapping), Config) get_integer_with_entry_meta(Key, maps:get(Key, Mapping), Config)

View File

@ -141,10 +141,10 @@ get(Scheme, Host, Port, Path, Args) ->
%% %%
get(Scheme, Host, Port, Path, Args, Headers, HttpOpts) -> get(Scheme, Host, Port, Path, Args, Headers, HttpOpts) ->
URL = build_uri(Scheme, Host, Port, Path, Args), URL = build_uri(Scheme, Host, Port, Path, Args),
?LOG_DEBUG("GET ~ts", [URL], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("GET ~ts", [URL], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
HttpOpts1 = ensure_timeout(HttpOpts), HttpOpts1 = ensure_timeout(HttpOpts),
Response = httpc:request(get, {URL, Headers}, HttpOpts1, []), Response = httpc:request(get, {URL, Headers}, HttpOpts1, []),
?LOG_DEBUG("Response: ~tp", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("Response: ~tp", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
parse_response(Response). parse_response(Response).
@ -179,10 +179,10 @@ post(Scheme, Host, Port, Path, Args, Body) ->
%% %%
post(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) -> post(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) ->
URL = build_uri(Scheme, Host, Port, Path, Args), URL = build_uri(Scheme, Host, Port, Path, Args),
?LOG_DEBUG("POST ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("POST ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
HttpOpts1 = ensure_timeout(HttpOpts), HttpOpts1 = ensure_timeout(HttpOpts),
Response = httpc:request(post, {URL, Headers, ?CONTENT_JSON, Body}, HttpOpts1, []), Response = httpc:request(post, {URL, Headers, ?CONTENT_JSON, Body}, HttpOpts1, []),
?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
parse_response(Response). parse_response(Response).
@ -208,10 +208,10 @@ post(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) ->
Body :: string() | binary() | tuple(). Body :: string() | binary() | tuple().
put(Scheme, Host, Port, Path, Args, Body) -> put(Scheme, Host, Port, Path, Args, Body) ->
URL = build_uri(Scheme, Host, Port, Path, Args), URL = build_uri(Scheme, Host, Port, Path, Args),
?LOG_DEBUG("PUT ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("PUT ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
HttpOpts = ensure_timeout(), HttpOpts = ensure_timeout(),
Response = httpc:request(put, {URL, [], ?CONTENT_URLENCODED, Body}, HttpOpts, []), Response = httpc:request(put, {URL, [], ?CONTENT_URLENCODED, Body}, HttpOpts, []),
?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
parse_response(Response). parse_response(Response).
@ -262,10 +262,10 @@ put(Scheme, Host, Port, Path, Args, Headers, Body) ->
Body :: string() | binary() | tuple(). Body :: string() | binary() | tuple().
put(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) -> put(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) ->
URL = build_uri(Scheme, Host, Port, Path, Args), URL = build_uri(Scheme, Host, Port, Path, Args),
?LOG_DEBUG("PUT ~ts [~tp] [~tp]", [URL, Headers, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("PUT ~ts [~tp] [~tp]", [URL, Headers, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
HttpOpts1 = ensure_timeout(HttpOpts), HttpOpts1 = ensure_timeout(HttpOpts),
Response = httpc:request(put, {URL, Headers, ?CONTENT_URLENCODED, Body}, HttpOpts1, []), Response = httpc:request(put, {URL, Headers, ?CONTENT_URLENCODED, Body}, HttpOpts1, []),
?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
parse_response(Response). parse_response(Response).
%% @public %% @public
@ -304,10 +304,10 @@ delete(Scheme, Host, Port, PathSegments, Args, HttpOpts, Body) when is_list(Path
delete(Scheme, Host, Port, Path, Args, HttpOpts, Body); delete(Scheme, Host, Port, Path, Args, HttpOpts, Body);
delete(Scheme, Host, Port, Path, Args, HttpOpts, Body) -> delete(Scheme, Host, Port, Path, Args, HttpOpts, Body) ->
URL = build_uri(Scheme, Host, Port, Path, Args), URL = build_uri(Scheme, Host, Port, Path, Args),
?LOG_DEBUG("DELETE ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("DELETE ~ts [~tp]", [URL, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
HttpOpts1 = ensure_timeout(HttpOpts), HttpOpts1 = ensure_timeout(HttpOpts),
Response = httpc:request(delete, {URL, [], ?CONTENT_URLENCODED, Body}, HttpOpts1, []), Response = httpc:request(delete, {URL, [], ?CONTENT_URLENCODED, Body}, HttpOpts1, []),
?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("Response: [~tp]", [Response], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
parse_response(Response). parse_response(Response).
@ -323,7 +323,7 @@ maybe_configure_proxy() ->
0 -> 0 ->
?LOG_DEBUG( ?LOG_DEBUG(
"HTTP client proxy is not configured", "HTTP client proxy is not configured",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok; ok;
_ -> _ ->
HttpProxy = ?CONFIG_MODULE:get(http_proxy, ?CONFIG_MAPPING, Map), HttpProxy = ?CONFIG_MODULE:get(http_proxy, ?CONFIG_MAPPING, Map),
@ -332,7 +332,7 @@ maybe_configure_proxy() ->
?LOG_DEBUG( ?LOG_DEBUG(
"Configured HTTP proxy: ~tp, HTTPS proxy: ~tp, exclusions: ~tp", "Configured HTTP proxy: ~tp, HTTPS proxy: ~tp, exclusions: ~tp",
[HttpProxy, HttpsProxy, ProxyExclusions], [HttpProxy, HttpsProxy, ProxyExclusions],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
_ = maybe_set_proxy(proxy, HttpProxy, ProxyExclusions), _ = maybe_set_proxy(proxy, HttpProxy, ProxyExclusions),
_ = maybe_set_proxy(https_proxy, HttpsProxy, ProxyExclusions), _ = maybe_set_proxy(https_proxy, HttpsProxy, ProxyExclusions),
ok ok
@ -368,7 +368,7 @@ maybe_set_proxy(Option, ProxyUrl, ProxyExclusions) ->
?LOG_DEBUG( ?LOG_DEBUG(
"Configuring HTTP client's ~ts setting: ~tp, exclusions: ~tp", "Configuring HTTP client's ~ts setting: ~tp, exclusions: ~tp",
[Option, {Host, Port}, ProxyExclusions], [Option, {Host, Port}, ProxyExclusions],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
httpc:set_option(Option, {{Host, Port}, ProxyExclusions}) httpc:set_option(Option, {{Host, Port}, ProxyExclusions})
end. end.
@ -415,7 +415,7 @@ decode_body(?CONTENT_JSON, Body) ->
"HTTP client could not decode a JSON payload " "HTTP client could not decode a JSON payload "
"(JSON parser returned an error): ~tp.", "(JSON parser returned an error): ~tp.",
[Err], [Err],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
[] []
end. end.
@ -428,7 +428,7 @@ decode_body(?CONTENT_JSON, Body) ->
-spec parse_response({ok, integer(), string()} | {error, any()}) -> {ok, term()} | {error, any()}. -spec parse_response({ok, integer(), string()} | {error, any()}) -> {ok, term()} | {error, any()}.
parse_response({error, Reason}) -> parse_response({error, Reason}) ->
?LOG_DEBUG("HTTP error ~tp", [Reason], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("HTTP error ~tp", [Reason], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
{error, lists:flatten(io_lib:format("~tp", [Reason]))}; {error, lists:flatten(io_lib:format("~tp", [Reason]))};
parse_response({ok, {{_,200,_}, Headers, Body}}) -> parse_response({ok, {{_,200,_}, Headers, Body}}) ->
{ok, decode_body(proplists:get_value("content-type", Headers, ?CONTENT_JSON), Body)}; {ok, decode_body(proplists:get_value("content-type", Headers, ?CONTENT_JSON), Body)};
@ -436,7 +436,7 @@ parse_response({ok,{{_,201,_}, Headers, Body}}) ->
{ok, decode_body(proplists:get_value("content-type", Headers, ?CONTENT_JSON), Body)}; {ok, decode_body(proplists:get_value("content-type", Headers, ?CONTENT_JSON), Body)};
parse_response({ok,{{_,204,_}, _, _}}) -> {ok, []}; parse_response({ok,{{_,204,_}, _, _}}) -> {ok, []};
parse_response({ok,{{_Vsn,Code,_Reason},_,Body}}) -> parse_response({ok,{{_Vsn,Code,_Reason},_,Body}}) ->
?LOG_DEBUG("HTTP Response (~tp) ~ts", [Code, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), ?LOG_DEBUG("HTTP Response (~tp) ~ts", [Code, Body], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
{error, integer_to_list(Code)}. {error, integer_to_list(Code)}.
%% @private %% @private

View File

@ -95,7 +95,7 @@ as_atom(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for atom value: ~tp", "Unexpected data type for atom value: ~tp",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Value. Value.
@ -116,7 +116,7 @@ as_integer(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for integer value: ~tp", "Unexpected data type for integer value: ~tp",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Value. Value.
@ -140,7 +140,7 @@ as_string(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for list value: ~tp", "Unexpected data type for list value: ~tp",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Value. Value.
@ -321,14 +321,14 @@ as_proplist(List) when is_list(List) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for proplist value: ~tp. JSON parser returned an error: ~tp!", "Unexpected data type for proplist value: ~tp. JSON parser returned an error: ~tp!",
[Value, Error], [Value, Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
[] []
end; end;
as_proplist(Value) -> as_proplist(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for proplist value: ~tp.", "Unexpected data type for proplist value: ~tp.",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
[]. [].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -352,7 +352,7 @@ as_map(List) when is_list(List) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for map value: ~tp. JSON parser returned an error: ~tp!", "Unexpected data type for map value: ~tp. JSON parser returned an error: ~tp!",
[Value, Error], [Value, Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
[] []
end; end;
as_map(Map) when is_map(Map) -> as_map(Map) when is_map(Map) ->
@ -361,7 +361,7 @@ as_map(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for map value: ~tp.", "Unexpected data type for map value: ~tp.",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
[]. [].
-spec stringify_error({ok, term()} | {error, term()}) -> {ok, term()} | {error, string()}. -spec stringify_error({ok, term()} | {error, term()}) -> {ok, term()} | {error, string()}.
@ -387,7 +387,7 @@ maybe_backend_configured(BackendConfigKey,
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: translated cluster formation configuration: ~tp", "Peer discovery: translated cluster formation configuration: ~tp",
[ClusterFormation], [ClusterFormation],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case proplists:get_value(BackendConfigKey, ClusterFormation) of case proplists:get_value(BackendConfigKey, ClusterFormation) of
undefined -> undefined ->
BackendUndefinedFun(); BackendUndefinedFun();
@ -395,7 +395,7 @@ maybe_backend_configured(BackendConfigKey,
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: cluster formation backend configuration: ~tp", "Peer discovery: cluster formation backend configuration: ~tp",
[Proplist], [Proplist],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ConfiguredFun(Proplist) ConfiguredFun(Proplist)
end end
end. end.
@ -428,5 +428,5 @@ as_list(Value) ->
?LOG_ERROR( ?LOG_ERROR(
"Unexpected data type for list value: ~tp", "Unexpected data type for list value: ~tp",
[Value], [Value],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Value. Value.

View File

@ -43,7 +43,7 @@
init() -> init() ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery Consul: initialising...", "Peer discovery Consul: initialising...",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok = application:ensure_started(inets), ok = application:ensure_started(inets),
%% we cannot start this plugin yet since it depends on the rabbit app, %% we cannot start this plugin yet since it depends on the rabbit app,
%% which is in the process of being started by the time this function is called %% which is in the process of being started by the time this function is called
@ -63,7 +63,7 @@ list_nodes() ->
"Cannot discover any nodes because Consul cluster " "Cannot discover any nodes because Consul cluster "
"details are not configured!", "details are not configured!",
[?MODULE], [?MODULE],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
{ok, {[], disc}} {ok, {[], disc}}
end, end,
Fun2 = fun(Proplist) -> Fun2 = fun(Proplist) ->
@ -112,7 +112,7 @@ register() ->
{ok, Body} -> {ok, Body} ->
?LOG_DEBUG( ?LOG_DEBUG(
"Consul registration body: ~ts", [Body], "Consul registration body: ~ts", [Body],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Path = rabbit_peer_discovery_httpc:build_path([v1, agent, service, register]), Path = rabbit_peer_discovery_httpc:build_path([v1, agent, service, register]),
Headers = maybe_add_acl([]), Headers = maybe_add_acl([]),
HttpOpts = http_options(M), HttpOpts = http_options(M),
@ -137,7 +137,7 @@ unregister() ->
ID = service_id(), ID = service_id(),
?LOG_DEBUG( ?LOG_DEBUG(
"Unregistering with Consul using service ID '~ts'", [ID], "Unregistering with Consul using service ID '~ts'", [ID],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Path = rabbit_peer_discovery_httpc:build_path([v1, agent, service, deregister, ID]), Path = rabbit_peer_discovery_httpc:build_path([v1, agent, service, deregister, ID]),
Headers = maybe_add_acl([]), Headers = maybe_add_acl([]),
HttpOpts = http_options(M), HttpOpts = http_options(M),
@ -153,13 +153,13 @@ unregister() ->
?LOG_INFO( ?LOG_INFO(
"Consul's response to the unregistration attempt: ~tp", "Consul's response to the unregistration attempt: ~tp",
[Response], [Response],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok; ok;
Error -> Error ->
?LOG_INFO( ?LOG_INFO(
"Failed to unregister service with ID '~ts` with Consul: ~tp", "Failed to unregister service with ID '~ts` with Consul: ~tp",
[ID, Error], [ID, Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Error Error
end. end.
@ -190,7 +190,7 @@ internal_lock() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
?LOG_DEBUG( ?LOG_DEBUG(
"Effective Consul peer discovery configuration: ~tp", [M], "Effective Consul peer discovery configuration: ~tp", [M],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Node = node(), Node = node(),
case create_session(Node, get_config_key(consul_svc_ttl, M)) of case create_session(Node, get_config_key(consul_svc_ttl, M)) of
{ok, SessionId} -> {ok, SessionId} ->
@ -209,7 +209,7 @@ internal_unlock({SessionId, TRef}) ->
_ = timer:cancel(TRef), _ = timer:cancel(TRef),
?LOG_DEBUG( ?LOG_DEBUG(
"Stopped session renewal", "Stopped session renewal",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case release_lock(SessionId) of case release_lock(SessionId) of
{ok, true} -> {ok, true} ->
ok; ok;
@ -355,7 +355,7 @@ registration_body({error, Reason}) ->
?LOG_ERROR( ?LOG_ERROR(
"Error serializing the request body: ~tp", "Error serializing the request body: ~tp",
[Reason], [Reason],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
{error, Reason}. {error, Reason}.
@ -403,7 +403,7 @@ registration_body_maybe_add_check(Payload, undefined) ->
?LOG_WARNING( ?LOG_WARNING(
"Can't use Consul's service deregistration feature without " "Can't use Consul's service deregistration feature without "
"using TTL. The parameter will be ignored", "using TTL. The parameter will be ignored",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Payload; Payload;
_ -> Payload _ -> Payload
@ -477,7 +477,7 @@ validate_addr_parameters(false, true) ->
"The parameter CONSUL_SVC_ADDR_NODENAME" "The parameter CONSUL_SVC_ADDR_NODENAME"
" can be used only if CONSUL_SVC_ADDR_AUTO is true." " can be used only if CONSUL_SVC_ADDR_AUTO is true."
" CONSUL_SVC_ADDR_NODENAME value will be ignored.", " CONSUL_SVC_ADDR_NODENAME value will be ignored.",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
false; false;
validate_addr_parameters(_, _) -> validate_addr_parameters(_, _) ->
true. true.
@ -565,7 +565,7 @@ send_health_check_pass() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
?LOG_DEBUG( ?LOG_DEBUG(
"Running Consul health check", "Running Consul health check",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Path = rabbit_peer_discovery_httpc:build_path([v1, agent, check, pass, Service]), Path = rabbit_peer_discovery_httpc:build_path([v1, agent, check, pass, Service]),
Headers = maybe_add_acl([]), Headers = maybe_add_acl([]),
HttpOpts = http_options(M), HttpOpts = http_options(M),
@ -582,14 +582,14 @@ send_health_check_pass() ->
%% Too Many Requests, see https://www.consul.io/docs/agent/checks.html %% Too Many Requests, see https://www.consul.io/docs/agent/checks.html
?LOG_WARNING( ?LOG_WARNING(
"Consul responded to a health check with 429 Too Many Requests", "Consul responded to a health check with 429 Too Many Requests",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok; ok;
%% starting with Consul 1.11, see https://github.com/hashicorp/consul/pull/11950 %% starting with Consul 1.11, see https://github.com/hashicorp/consul/pull/11950
{error, "404"} -> {error, "404"} ->
?LOG_WARNING( ?LOG_WARNING(
"Consul responded to a health check with a 404 status, will " "Consul responded to a health check with a 404 status, will "
"wait and try re-registering", "wait and try re-registering",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
maybe_re_register(wait_for_list_nodes()), maybe_re_register(wait_for_list_nodes()),
ok; ok;
%% prior to Consul 1.11, see https://github.com/hashicorp/consul/pull/11950 %% prior to Consul 1.11, see https://github.com/hashicorp/consul/pull/11950
@ -597,14 +597,14 @@ send_health_check_pass() ->
?LOG_WARNING( ?LOG_WARNING(
"Consul responded to a health check with a 500 status, will " "Consul responded to a health check with a 500 status, will "
"wait and try re-registering", "wait and try re-registering",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
maybe_re_register(wait_for_list_nodes()), maybe_re_register(wait_for_list_nodes()),
ok; ok;
{error, Reason} -> {error, Reason} ->
?LOG_ERROR( ?LOG_ERROR(
"Error running Consul health check: ~tp", "Error running Consul health check: ~tp",
[Reason], [Reason],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok ok
end. end.
@ -613,7 +613,7 @@ maybe_re_register({error, Reason}) ->
"Internal error in Consul while updating health check. " "Internal error in Consul while updating health check. "
"Cannot obtain list of nodes registered in Consul either: ~tp", "Cannot obtain list of nodes registered in Consul either: ~tp",
[Reason], [Reason],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}); #{domain => ?RMQLOG_DOMAIN_PEER_DISC});
maybe_re_register({ok, {Members, _NodeType}}) -> maybe_re_register({ok, {Members, _NodeType}}) ->
maybe_re_register(Members); maybe_re_register(Members);
maybe_re_register(Members) -> maybe_re_register(Members) ->
@ -621,12 +621,12 @@ maybe_re_register(Members) ->
true -> true ->
?LOG_ERROR( ?LOG_ERROR(
"Internal error in Consul while updating health check", "Internal error in Consul while updating health check",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}); #{domain => ?RMQLOG_DOMAIN_PEER_DISC});
false -> false ->
?LOG_ERROR( ?LOG_ERROR(
"Internal error in Consul while updating health check, " "Internal error in Consul while updating health check, "
"node is not registered. Re-registering", "node is not registered. Re-registering",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
register() register()
end. end.
@ -726,7 +726,7 @@ start_session_ttl_updater(SessionId) ->
Interval = get_config_key(consul_svc_ttl, M), Interval = get_config_key(consul_svc_ttl, M),
?LOG_DEBUG( ?LOG_DEBUG(
"Starting session renewal", "Starting session renewal",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
{ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE, {ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE,
session_ttl_update_callback, [SessionId]), session_ttl_update_callback, [SessionId]),
TRef. TRef.