Local shovel: handle destination queue events

This commit is contained in:
Diana Parra Corbacho 2025-07-24 11:11:32 +02:00
parent 30f67e0f7a
commit a855146b99
2 changed files with 37 additions and 22 deletions

View File

@ -300,16 +300,19 @@ handle_source({{'DOWN', #resource{name = Queue,
handle_source(_Msg, _State) ->
not_handled.
handle_dest({queue_event, _QRef, {confirm, MsgSeqNos, _QPid}},
#{ack_mode := on_confirm} = State) ->
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:ack(Tag, false, StateX)
end, MsgSeqNos, State);
handle_dest({queue_event, _QRef, {reject_publish, Seq, _QPid}},
#{ack_mode := on_confirm} = State) ->
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:nack(Tag, false, StateX)
end, Seq, State);
handle_dest({queue_event, QRef, Evt},
#{ack_mode := on_confirm,
dest := Dest = #{current := Current = #{queue_states := QueueStates0}}} = State0) ->
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
{ok, QState1, Actions} ->
State = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
handle_dest_queue_actions(Actions, State);
{eol, Actions} ->
_ = handle_dest_queue_actions(Actions, State0),
{stop, {outbound_link_or_channel_closure, queue_deleted}};
{protocol_error, _Type, Reason, ReasonArgs} ->
{stop, list_to_binary(io_lib:format(Reason, ReasonArgs))}
end;
handle_dest({{'DOWN', #resource{name = Queue,
kind = queue,
virtual_host = VHost}}, _, _, _, _} ,
@ -424,12 +427,6 @@ handle_queue_actions(Actions, State) ->
handle_credit_reply(Action, S0);
(_Action, S0) ->
S0
%% ({queue_down, QRef}, S0) ->
%% State;
%% ({block, QName}, S0) ->
%% State;
%% ({unblock, QName}, S0) ->
%% State
end, State, Actions).
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
@ -445,6 +442,22 @@ next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
increase_next_tag(#{source := Source = #{current := Current = #{next_tag := DeliveryTag}}} = State) ->
State#{source => Source#{current => Current#{next_tag => DeliveryTag + 1}}}.
handle_dest_queue_actions(Actions, State) ->
lists:foldl(
fun({settled, _QName, MsgSeqNos}, S0) ->
maybe_grant_or_stash_credit(
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:ack(Tag, false, StateX)
end, MsgSeqNos, S0));
({rejected, _QName, MsgSeqNos}, S0) ->
maybe_grant_or_stash_credit(
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:nack(Tag, false, StateX)
end, MsgSeqNos, S0));
(_Action, S0) ->
S0
end, State, Actions).
record_pending(false, _DeliveryTag, _MsgId, State) ->
State;
record_pending(true, DeliveryTag, MsgId, #{unacked_message_q := UAMQ0} = State) ->
@ -628,10 +641,9 @@ route(Msg, #{current := #{vhost := VHost}}) ->
confirm_to_inbound(ConfirmFun, SeqNos, State)
when is_list(SeqNos) ->
State1 = lists:foldl(fun(Seq, State0) ->
confirm_to_inbound(ConfirmFun, Seq, State0)
end, State, SeqNos),
maybe_grant_or_stash_credit(State1);
lists:foldl(fun(Seq, State0) ->
confirm_to_inbound(ConfirmFun, Seq, State0)
end, State, SeqNos);
confirm_to_inbound(ConfirmFun, Seq,
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
#{Seq := InTag} = Unacked,

View File

@ -546,12 +546,15 @@ local_to_local_delete_after_queue_length(Config) ->
{<<"dest-protocol">>, <<"local">>},
{<<"dest-queue">>, Dest}
]),
shovel_test_utils:await_no_shovel(Config, ?PARAM),
%% The shovel parameter is only deleted when 'delete-after'
%% is used. In any other failure, the shovel should
%% remain and try to restart
expect_many(Sess, Dest, 18),
?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])),
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
?awaitMatch([],
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),
30_000),
publish_many(Sess, Src, Dest, <<"tag1">>, 5),
expect_none(Sess, Dest)
end).