Stream queue: treat discard and return like settle

Currently these are not allowed for use with stream queues
which is a bit too strict. Some client impl will automatically
nack or reject messages that are pending when an application
requests to stop consuming. Treating all message outcomes the same
makes as much sense as not to.
This commit is contained in:
Karl Nilsson 2023-10-05 16:19:28 +01:00
parent c8d4c2334a
commit d6900be8a0
2 changed files with 10 additions and 74 deletions

View File

@ -555,9 +555,10 @@ recover(_VHost, Queues) ->
{[Q | R0], F0}
end, {[], []}, Queues).
settle(QName, complete, CTag, MsgIds, #stream_client{readers = Readers0,
settle(QName, _, CTag, MsgIds, #stream_client{readers = Readers0,
local_pid = LocalPid,
name = Name} = State) ->
%% all settle reasons will "give credit" to the stream queue
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
@ -567,11 +568,7 @@ settle(QName, complete, CTag, MsgIds, #stream_client{readers = Readers0,
_ ->
{Readers0, []}
end,
{State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]};
settle(_, _, _, _, #stream_client{name = Name}) ->
{protocol_error, not_implemented,
"basic.nack and basic.reject not supported by stream queues ~ts",
[rabbit_misc:rs(Name)]}.
{State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]}.
info(Q, all_keys) ->
info(Q, ?INFO_KEYS);

View File

@ -1135,37 +1135,6 @@ consume_with_autoack(Config) ->
subscribe(Ch1, Q, true, 0)),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
consume_and_nack(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
subscribe(Ch1, Q, false, 0),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
%% Nack will throw a not implemented exception. As it is a cast operation,
%% we'll detect the conneciton/channel closure on the next call.
%% Let's try to redeclare and see what happens
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
amqp_channel:call(Ch1, #'queue.declare'{queue = Q,
durable = true,
auto_delete = false,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}))
after 10000 ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
basic_cancel(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1370,42 +1339,13 @@ filter_consumers(Config, Server, CTag) ->
end, [], CInfo).
consume_and_reject(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
subscribe(Ch1, Q, false, 0),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
MRef = erlang:monitor(process, Ch1),
ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag,
requeue = true}),
%% Reject will throw a not implemented exception. As it is a cast
%% operation, we detect the connection error from the channel
%% process exit reason.
receive
{'DOWN', MRef, _, _, Reason} ->
?assertMatch(
{shutdown,
{connection_closing,
{server_initiated_close, 540, _}}},
Reason)
after 10000 ->
exit(timeout)
end
after 10000 ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end).
consume_and_nack(Config) ->
consume_and_(Config, fun (DT) -> #'basic.nack'{delivery_tag = DT} end).
consume_and_ack(Config) ->
consume_and_(Config, fun (DT) -> #'basic.ack'{delivery_tag = DT} end).
consume_and_(Config, AckFun) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -1420,8 +1360,7 @@ consume_and_ack(Config) ->
subscribe(Ch1, Q, false, 0),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false}),
ok = amqp_channel:cast(Ch1, AckFun(DeliveryTag)),
%% It will succeed as ack is now a credit operation. We should be
%% able to redeclare a queue (gen_server call op) as the channel
%% should still be open and declare is an idempotent operation