Merge pull request #8260 from rabbitmq/ik-consumer-timeout-followup

Consumer Timeout Follow-up
This commit is contained in:
Michael Klishin 2023-05-23 00:28:26 +04:00 committed by GitHub
commit e569a2b4f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 42 deletions

View File

@ -2818,17 +2818,8 @@ get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
GCT
end.
get_consumer_timeout(PA = #pending_ack{tag = CTag},
State = #ch{consumer_mapping = CMap}) ->
case maps:find(CTag, CMap) of
{ok, {_, {_, _, _, Args}}} ->
case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
{long, Timeout} -> Timeout;
_ -> get_queue_consumer_timeout(PA, State)
end;
_ ->
get_queue_consumer_timeout(PA, State)
end.
get_consumer_timeout(PA, State) ->
get_queue_consumer_timeout(PA, State).
evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
case ?QUEUE:get(UAMQ, empty) of

View File

@ -19,20 +19,13 @@
-define(GROUP_CONFIG,
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, []}],
{queue_arguments, []}],
queue_policy_consumer_timeout => [{rabbit, []},
{queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
{queue_arguments, []},
{consumer_arguments, []}],
{queue_arguments, []}],
queue_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
{consumer_arguments, []}],
consumer_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
-import(quorum_queue_utils, [wait_for_messages/2]).
@ -40,20 +33,13 @@ all() ->
[
{group, global_consumer_timeout},
{group, queue_policy_consumer_timeout},
{group, queue_argument_consumer_timeout},
{group, consumer_argument_consumer_timeout}
{group, queue_argument_consumer_timeout}
].
groups() ->
ConsumerTests = [consumer_timeout,
consumer_timeout_no_basic_cancel_capability],
AllTests = ConsumerTests ++ [consumer_timeout_basic_get],
ConsumerTestsParallel = [
{classic_queue, [parallel], ConsumerTests},
{mirrored_queue, [parallel], ConsumerTests},
{quorum_queue, [parallel], ConsumerTests}
],
AllTests = [consumer_timeout,
consumer_timeout_no_basic_cancel_capability,
consumer_timeout_basic_get],
AllTestsParallel = [
{classic_queue, [parallel], AllTests},
@ -63,8 +49,7 @@ groups() ->
[
{global_consumer_timeout, [], AllTestsParallel},
{queue_policy_consumer_timeout, [], AllTestsParallel},
{queue_argument_consumer_timeout, [], AllTestsParallel},
{consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
{queue_argument_consumer_timeout, [], AllTestsParallel}
].
suite() ->
@ -158,7 +143,7 @@ consumer_timeout(Config) ->
declare_queue(Ch, Config, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
subscribe(Ch, QName, false),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
receive
@ -226,7 +211,7 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
subscribe(Ch, QName, false),
receive
{#'basic.deliver'{delivery_tag = _,
redelivered = false}, _} ->
@ -280,14 +265,13 @@ consume(Ch, QName, NoAck, Payloads) ->
DTag
end || Payload <- Payloads].
subscribe(Ch, Queue, NoAck, Args) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).
subscribe(Ch, Queue, NoAck) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>).
subscribe(Ch, Queue, NoAck, Ctag, Args) ->
subscribe(Ch, Queue, NoAck, Ctag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
consumer_tag = Ctag,
arguments = Args
consumer_tag = Ctag
},
self()),
receive

View File

@ -191,6 +191,9 @@ var HELP = {
'queue-message-ttl':
'How long a message published to a queue can live before it is discarded (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#per-queue-message-ttl">x-message-ttl</a>" argument.)',
'queue-consumer-timeout':
'If a consumer does not ack its delivery for more than the <a href="https://www.rabbitmq.com/consumers.html#acknowledgement-timeout">timeout value</a> (30 minutes by default), its channel will be closed with a <code>PRECONDITION_FAILED</code> channel exception.',
'queue-expires':
'How long a queue can be unused for before it is automatically deleted (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#queue-ttl">x-expires</a>" argument.)',

View File

@ -14,6 +14,7 @@
<th>Prefetch count</th>
<th>Active <span class="help" id="consumer-active"></span></th>
<th>Activity status</th>
<th>Consumer Timeout</th>
<th>Arguments</th>
</tr>
</thead>
@ -34,6 +35,7 @@
<td class="c"><%= consumer.prefetch_count %></td>
<td class="c"><%= fmt_boolean(consumer.active) %></td>
<td class="c"><%= fmt_activity_status(consumer.activity_status) %></td>
<td class="c"><%= consumer.consumer_timeout %></td>
<td class="c"><%= fmt_table_short(consumer.arguments) %></td>
</tr>
<% } %>

View File

@ -108,6 +108,7 @@
<span class="argument-link" field="definition" key="dead-letter-exchange" type="string">Dead letter exchange</span> |
<span class="argument-link" field="definition" key="dead-letter-routing-key" type="string">Dead letter routing key</span><br/>
<span class="argument-link" field="definition" key="message-ttl" type="number">Message TTL</span><span class="help" id="queue-message-ttl"></span></br>
<span class="argument-link" field="definition" key="consumer-timeout" type="number">Consumer Timeout</span><span class="help" id="queue-consumer-timeout"></span></br>
</td>
<tr>
<td>Queues [Classic]</td>

View File

@ -252,7 +252,31 @@ augment_consumer({{Q, Ch, CTag}, Props}) ->
[{queue, format_resource(Q)},
{channel_details, augment_channel_pid(Ch)},
{channel_pid, Ch},
{consumer_tag, CTag} | Props].
{consumer_tag, CTag},
{consumer_timeout, consumer_timeout(Props, Q)} | Props].
consumer_timeout(_Props, Q) ->
get_queue_consumer_timeout(Q, get_global_consumer_timeout()).
get_queue_consumer_timeout(QName, GCT) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} -> %% should we account for different queue states here?
case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
fun (X, Y) -> erlang:min(X, Y) end, Q) of
undefined -> GCT;
Val -> Val
end;
_ ->
GCT
end.
get_global_consumer_timeout() ->
case application:get_env(rabbit, consumer_timeout) of
{ok, MS} when is_integer(MS) ->
MS;
_ ->
undefined
end.
consumers_by_vhost(VHost) ->
ets:select(consumer_stats,