diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d6eec4a76a..d017ad91e9 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -955,7 +955,7 @@ update_link(Link = #link{output_handle = OutHandle}, State#state{links = Links#{OutHandle => Link}}. incr_link_counters(#link{link_credit = LC, delivery_count = DC} = Link) -> - Link#link{delivery_count = DC+1, link_credit = LC+1}. + Link#link{delivery_count = DC+1, link_credit = LC-1}. append_partial_transfer(Transfer, Payload, #link{partial_transfers = undefined} = Link) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index de76cf9513..29707fc8db 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -38,7 +38,7 @@ -import(rabbit_data_coercion, [to_binary/1]). -define(INFO(Text, Args), rabbit_log_shovel:info(Text, Args)). --define(LINK_CREDIT_TIMEOUT, 5000). +-define(LINK_CREDIT_TIMEOUT, 20_000). -type state() :: rabbit_shovel_behaviour:state(). -type uri() :: rabbit_shovel_behaviour:uri(). @@ -173,7 +173,8 @@ dest_endpoint(#{shovel_type := dynamic, dest := #{target_address := Addr}}) -> [{dest_address, Addr}]. --spec handle_source(Msg :: any(), state()) -> not_handled | state(). +-spec handle_source(Msg :: any(), state()) -> + not_handled | state() | {stop, any()}. handle_source({amqp10_msg, _LinkRef, Msg}, State) -> Tag = amqp10_msg:delivery_id(Msg), Payload = amqp10_msg:body_bin(Msg), @@ -312,7 +313,8 @@ status(_) -> ignore. -spec forward(Tag :: tag(), Props :: #{atom() => any()}, - Payload :: binary(), state()) -> state(). + Payload :: binary(), state()) -> + state() | {stop, any()}. forward(_Tag, _Props, _Payload, #{source := #{remaining_unacked := 0}} = State) -> State; @@ -331,17 +333,33 @@ forward(Tag, Props, Payload, Msg = add_timestamp_header( State, set_message_properties( Props, add_forward_headers(State, Msg0))), - ok = amqp10_client:send_msg(Link, Msg), - rabbit_shovel_behaviour:decr_remaining_unacked( - case AckMode of - no_ack -> - rabbit_shovel_behaviour:decr_remaining(1, State); - on_confirm -> - State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}}; - on_publish -> - State1 = rabbit_shovel_behaviour:ack(Tag, false, State), - rabbit_shovel_behaviour:decr_remaining(1, State1) - end). + case send_msg(Link, Msg) of + ok -> + rabbit_shovel_behaviour:decr_remaining_unacked( + case AckMode of + no_ack -> + rabbit_shovel_behaviour:decr_remaining(1, State); + on_confirm -> + State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}}; + on_publish -> + State1 = rabbit_shovel_behaviour:ack(Tag, false, State), + rabbit_shovel_behaviour:decr_remaining(1, State1) + end); + Stop -> + Stop + end. + +send_msg(Link, Msg) -> + case amqp10_client:send_msg(Link, Msg) of + ok -> + ok; + {error, insufficient_credit} -> + receive {amqp10_event, {link, Link, credited}} -> + ok = amqp10_client:send_msg(Link, Msg) + after ?LINK_CREDIT_TIMEOUT -> + {stop, credited_timeout} + end + end. new_message(Tag, Payload, #{ack_mode := AckMode, dest := #{properties := Props, diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index af4f84f78f..09bc0a749a 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -140,7 +140,8 @@ source_endpoint(#{source := #{module := Mod}} = State) -> dest_endpoint(#{dest := #{module := Mod}} = State) -> Mod:dest_endpoint(State). --spec forward(tag(), #{atom() => any()}, binary(), state()) -> state(). +-spec forward(tag(), #{atom() => any()}, binary(), state()) -> + state() | {stop, any()}. forward(Tag, Props, Payload, #{dest := #{module := Mod}} = State) -> Mod:forward(Tag, Props, Payload, State).