Shovel bugfix: increase forwarded counter for amqp10 protocol
(cherry picked from commit 646eda0802
)
This commit is contained in:
parent
c4ec86b7c9
commit
ca6e38c923
|
@ -341,15 +341,16 @@ forward(Tag, Msg0,
|
|||
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
|
||||
case send_msg(Link, Msg3) of
|
||||
ok ->
|
||||
#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),
|
||||
rabbit_shovel_behaviour:decr_remaining_unacked(
|
||||
case AckMode of
|
||||
no_ack ->
|
||||
rabbit_shovel_behaviour:decr_remaining(1, State);
|
||||
rabbit_shovel_behaviour:decr_remaining(1, State1);
|
||||
on_confirm ->
|
||||
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
|
||||
State1#{dest => Dst1#{unacked => Unacked#{OutTag => Tag}}};
|
||||
on_publish ->
|
||||
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
|
||||
rabbit_shovel_behaviour:decr_remaining(1, State1)
|
||||
State2 = rabbit_shovel_behaviour:ack(Tag, false, State1),
|
||||
rabbit_shovel_behaviour:decr_remaining(1, State2)
|
||||
end);
|
||||
Stop ->
|
||||
Stop
|
||||
|
|
Loading…
Reference in New Issue