Queue federation: improve logging

around federated queue link state transitions (pause/unpause).

References #8297.
This commit is contained in:
Michael Klishin 2023-05-24 14:27:09 +04:00
parent 1c727a4ee4
commit 958043caaa
3 changed files with 17 additions and 5 deletions

View File

@ -140,9 +140,13 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
Reason = case E of
{error, Value} -> Value;
Other -> Other
end,
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
[rabbit_federation_upstream:params_to_string(UParams),
E]),
Reason]),
{stop, {shutdown, restart}, State};
connection_error(remote, E, Upstream, UParams, XorQName, State) ->

View File

@ -104,7 +104,11 @@ handle_cast(pause, State = #state{run = false}) ->
handle_cast(pause, State = #not_started{}) ->
{noreply, State#not_started{run = false}};
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
name = UpName, queue_name = QName
}}) ->
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
[QName, UpName]),
cancel(Ch, Upstream),
{noreply, State#state{run = false}};
@ -305,9 +309,11 @@ visit_match(_ ,_) ->
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
ConsumerTag.
consume(Ch, Upstream, UQueue) ->
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
ConsumerTag = consumer_tag(Upstream),
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
amqp_channel:cast(
Ch, #'basic.consume'{queue = name(UQueue),
no_ack = NoAck,
@ -315,8 +321,10 @@ consume(Ch, Upstream, UQueue) ->
consumer_tag = ConsumerTag,
arguments = [{<<"x-priority">>, long, -1}]}).
cancel(Ch, Upstream) ->
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
ConsumerTag = consumer_tag(Upstream),
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
[QName, ConsumerTag, UpName]),
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
consumer_tag = ConsumerTag}).

View File

@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) ->
params_to_string(#upstream_params{safe_uri = SafeURI,
x_or_q = XorQ}) ->
print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]).
print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]).
remove_credentials(URI) ->
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).