Fall back to at-most-once if feature flag stream_queue is disabled

because dead-letter-strategy at-least-once requires the
rabbit_queue_type refactor that comes with the stream_queue feature
flag.

It is reasonable to assume that feature flag stream_queue introduced in
3.9 will be enabled in 3.10.
This commit is contained in:
David Ansari 2022-02-14 09:30:04 +01:00
parent 1e812f9982
commit b934a42df3
2 changed files with 17 additions and 6 deletions

View File

@ -1283,8 +1283,18 @@ dlh(undefined, _, Strategy, _, QName) ->
"because dead-letter-exchange is not configured.",
[rabbit_misc:rs(QName), Strategy]),
undefined;
dlh(_, _, <<"at-least-once">>, reject_publish, _) ->
at_least_once;
dlh(Exchange, RoutingKey, <<"at-least-once">>, reject_publish, QName) ->
%% Feature flag stream_queue includes the rabbit_queue_type refactor
%% which is required by rabbit_fifo_dlx_worker.
case rabbit_feature_flags:is_enabled(stream_queue) of
true ->
at_least_once;
false ->
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~s "
"because feature flag stream_queue is disabled.",
[rabbit_misc:rs(QName)]),
dlh_at_most_once(Exchange, RoutingKey, QName)
end;
dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName) ->
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~s "
"because configured dead-letter-strategy at-least-once is incompatible with "

View File

@ -596,16 +596,17 @@ reject_publish_target_quorum_queue(Config) ->
{<<"x-dead-letter-routing-key">>, longstr, TargetQ},
{<<"x-dead-letter-strategy">>, longstr, <<"at-least-once">>},
{<<"x-overflow">>, longstr, <<"reject-publish">>},
{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 1}
{<<"x-queue-type">>, longstr, <<"quorum">>}
]),
declare_queue(Ch, TargetQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, <<"reject-publish">>},
{<<"x-max-length">>, long, 1}
]),
Msg = <<"m">>,
[ok,ok,ok,ok] = [amqp_channel:cast(Ch, #'basic.publish'{routing_key = SourceQ}, #amqp_msg{payload = Msg})
|| _N <- lists:seq(1,4)],
[ok,ok,ok,ok] = [amqp_channel:cast(Ch, #'basic.publish'{routing_key = SourceQ},
#amqp_msg{props = #'P_basic'{expiration = integer_to_binary(N)},
payload = Msg})
|| N <- lists:seq(1,4)],
%% Quorum queues reject publishes once the limit is already exceeded.
%% Therefore, although max-length of target queue is configured to be 1,
%% it will contain 2 messages before rejecting publishes.