Local shovels: place behind rabbitmq_4.0.0 feature flag

This commit is contained in:
Diana Parra Corbacho 2025-08-19 11:52:08 +02:00
parent d475a0e95f
commit 212ae64c2d
1 changed files with 13 additions and 12 deletions

View File

@ -85,6 +85,12 @@ parse(_Name, {destination, Dest}) ->
connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
queue := QName0,
uris := [Uri | _]}}) ->
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
true ->
ok;
false ->
exit({shutdown, feature_flag_rabbitmq_4_0_0_is_disabled})
end,
QState = rabbit_queue_type:init(),
{User, VHost} = get_user_vhost_from_amqp_param(Uri),
%% We handle the most recently declared queue to use anonymous functions
@ -106,7 +112,12 @@ connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
uris := [Uri | _]},
ack_mode := AckMode}) ->
%% Shall we get the user from an URI or something else?
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
true ->
ok;
false ->
exit({shutdown, feature_flag_rabbitmq_4_0_0_is_disabled})
end,
{User, VHost} = get_user_vhost_from_amqp_param(Uri),
apply(M, F, MFArgs ++ [VHost, User]),
@ -137,13 +148,7 @@ init_source(State = #{source := #{queue_r := QName,
vhost := VHost} = Current} = Src,
name := Name,
ack_mode := AckMode}) ->
%% TODO put this shovel behind the rabbitmq_4.0.0 feature flag
Mode = case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
true ->
{credited, ?INITIAL_DELIVERY_COUNT};
false ->
{credited, credit_api_v1}
end,
Mode = {credited, ?INITIAL_DELIVERY_COUNT},
MaxLinkCredit = max_link_credit(),
CTag = consumer_tag(Name),
case rabbit_amqqueue:with(
@ -501,10 +506,6 @@ expand_routing_key_shortcut(<<>>, <<>>, MRDQ) ->
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _) ->
RoutingKey.
%% TODO A missing queue stops the shovel but because the error reason
%% the failed status is not stored. Would not be it more useful to
%% report it??? This is a rabbit_shovel_worker issues, last terminate
%% clause
check_fun(QName, VHost, User) ->
Method = #'queue.declare'{queue = QName,
passive = true},