Reject basic.get operations on quorum queues with single active consumer enabled

rabbitmq-server/2164

[#169810347]
This commit is contained in:
dcorbacho 2019-11-20 10:24:12 +00:00
parent e5f2dfafce
commit cb4989f31d
5 changed files with 38 additions and 1 deletions

View File

@ -263,6 +263,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},

View File

@ -199,6 +199,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
{ok, {Msg, NumReady}, State0#state{leader = Leader}};
{ok, {error, _} = Err, _Leader} ->
Err;
Err ->
Err
end.

View File

@ -482,6 +482,11 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
{error, unsupported} ->
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain access to locked ~s. basic.get operations are not supported by quorum queues with single active consumer",
[rabbit_misc:rs(QName)]);
{error, _} = Err ->
Err;
{timeout, _} ->

View File

@ -640,6 +640,21 @@ down_noconnection_returns_checked_out_test(_) ->
?assertEqual(lists:sort(Returns), Returns),
ok.
single_active_consumer_basic_get_test(_) ->
Cid = {?FUNCTION_NAME, self()},
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy),
?assertEqual(0, map_size(State0#rabbit_fifo.consumers)),
{State1, _} = enq(1, 1, first, State0),
{_State, {error, unsupported}} =
apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State1),
ok.
single_active_consumer_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,

View File

@ -40,7 +40,8 @@ groups() ->
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks
fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks,
basic_get_is_unsupported
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@ -267,6 +268,17 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
[amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
ok.
basic_get_is_unsupported(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
?assertExit(
{{shutdown, {server_initiated_close, 405, _}}, _},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false})),
amqp_connection:close(C),
ok.
amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),