David Ansari 2025-03-27 10:20:45 +01:00 committed by David Ansari
parent ef1a595a13
commit c151806f7c
3 changed files with 29 additions and 30 deletions

View File

@ -1354,10 +1354,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
CurrentConsumers = maps:size(ConsumerMapping), CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(ConsumerTag, ConsumerMapping) of case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error( rabbit_misc:protocol_error(not_allowed,
not_allowed, "reached maximum (~B) of consumers per channel",
"reached maximum (~B) of consumers per channel", [MaxConsumers]);
[MaxConsumers]);
error -> error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath), QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext), check_read_permitted(QueueName, User, AuthzContext),
@ -1368,13 +1367,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
_ -> _ ->
ConsumerTag ConsumerTag
end, end,
basic_consume( basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualTag,
QueueName, NoAck, ConsumerPrefetch, ActualTag, ExclusiveConsume, Args, NoWait, State);
ExclusiveConsume, Args, NoWait, State);
{ok, _} -> {ok, _} ->
%% Attempted reuse of consumer tag. %% Attempted reuse of consumer tag.
rabbit_misc:protocol_error( rabbit_misc:protocol_error(not_allowed,
not_allowed, "attempt to reuse consumer tag '~ts'", [ConsumerTag]) "attempt to reuse consumer tag '~ts'",
[ConsumerTag])
end; end;
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},

View File

@ -522,7 +522,7 @@ consume(Q, Spec, State) ->
case Mod:consume(Q, Spec, CtxState0) of case Mod:consume(Q, Spec, CtxState0) of
{ok, CtxState} -> {ok, CtxState} ->
{ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)}; {ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)};
Err = {error, _Type, _Fmt, _FmtArgs} -> {error, _Type, _Fmt, _FmtArgs} = Err->
Err Err
end. end.

View File

@ -1010,8 +1010,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
args => Args, args => Args,
username => ActingUser, username => ActingUser,
priority => Priority}, priority => Priority},
case rabbit_fifo_client:checkout( case rabbit_fifo_client:checkout(ConsumerTag, Mode, ConsumerMeta, QState0) of
ConsumerTag, Mode, ConsumerMeta, QState0) of
{ok, _Infos, QState} -> {ok, _Infos, QState} ->
case single_active_consumer_on(Q) of case single_active_consumer_on(Q) of
true -> true ->
@ -1024,29 +1023,30 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
_ -> _ ->
waiting waiting
end, end,
rabbit_core_metrics:consumer_created( rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ChPid, ConsumerTag, ExclusiveConsume, ExclusiveConsume,
AckRequired, QName, AckRequired, QName,
Prefetch, ActivityStatus == single_active, %% Active Prefetch,
ActivityStatus, Args), ActivityStatus == single_active,
emit_consumer_created( ActivityStatus, Args),
ChPid, ConsumerTag, ExclusiveConsume, emit_consumer_created(ChPid, ConsumerTag,
AckRequired, QName, Prefetch, ExclusiveConsume,
Args, none, ActingUser), AckRequired, QName,
Prefetch, Args, none,
ActingUser),
{ok, QState}; {ok, QState};
Err -> Err ->
consume_error(Err, QName) consume_error(Err, QName)
end; end;
false -> false ->
rabbit_core_metrics:consumer_created( rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ChPid, ConsumerTag, ExclusiveConsume, ExclusiveConsume,
AckRequired, QName, AckRequired, QName,
Prefetch, true, %% Active Prefetch, true,
up, Args), up, Args),
emit_consumer_created( emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, Prefetch,
AckRequired, QName, Prefetch, Args, none, ActingUser),
Args, none, ActingUser),
{ok, QState} {ok, QState}
end; end;
Err -> Err ->