Local shovels: move unacked_message_q inside source config

This commit is contained in:
Diana Parra Corbacho 2025-08-12 14:59:44 +02:00
parent 8c9f79fb36
commit 0b1aefd4ae
1 changed files with 14 additions and 12 deletions

View File

@ -103,9 +103,9 @@ connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
State#{source => Src#{current => #{queue_states => QState, State#{source => Src#{current => #{queue_states => QState,
next_tag => 1, next_tag => 1,
user => User, user => User,
vhost => VHost}, vhost => VHost,
queue => QName}, unacked_message_q => ?QUEUE:new()},
unacked_message_q => ?QUEUE:new()}. queue => QName}}.
connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs}, connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
uris := [Uri | _] uris := [Uri | _]
@ -452,10 +452,11 @@ handle_dest_queue_actions(Actions, State) ->
record_pending(false, _DeliveryTag, _MsgId, State) -> record_pending(false, _DeliveryTag, _MsgId, State) ->
State; State;
record_pending(true, DeliveryTag, MsgId, #{unacked_message_q := UAMQ0} = State) -> record_pending(true, DeliveryTag, MsgId,
#{source := Src = #{current := Current = #{unacked_message_q := UAMQ0}}} = State) ->
UAMQ = ?QUEUE:in(#pending_ack{delivery_tag = DeliveryTag, UAMQ = ?QUEUE:in(#pending_ack{delivery_tag = DeliveryTag,
msg_id = MsgId}, UAMQ0), msg_id = MsgId}, UAMQ0),
State#{unacked_message_q => UAMQ}. State#{source => Src#{current => Current#{unacked_message_q => UAMQ}}}.
remaining(_Q, #{source := #{delete_after := never}}) -> remaining(_Q, #{source := #{delete_after := never}}) ->
unlimited; unlimited;
@ -567,18 +568,19 @@ get_user_vhost_from_amqp_param(Uri) ->
exit({shutdown, {access_refused, Username}}) exit({shutdown, {access_refused, Username}})
end. end.
settle(Op, DeliveryTag, Multiple, #{unacked_message_q := UAMQ0, settle(Op, DeliveryTag, Multiple,
source := #{queue := Queue, #{source := #{queue := Queue,
current := Current = #{queue_states := QState0, current := Current = #{queue_states := QState0,
consumer_tag := CTag, consumer_tag := CTag,
unacked_message_q := UAMQ0,
vhost := VHost}} = Src} = State0) -> vhost := VHost}} = Src} = State0) ->
{Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), {Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
QRef = rabbit_misc:r(VHost, queue, Queue), QRef = rabbit_misc:r(VHost, queue, Queue),
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked], MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
{ok, QState1, Actions} -> {ok, QState1, Actions} ->
State = State0#{source => Src#{current => Current#{queue_states => QState1}}, State = State0#{source => Src#{current => Current#{queue_states => QState1,
unacked_message_q => UAMQ}, unacked_message_q => UAMQ}}},
handle_queue_actions(Actions, State); handle_queue_actions(Actions, State);
{'protocol_error', Type, Reason, Args} -> {'protocol_error', Type, Reason, Args} ->
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",