diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index f513b27a97..ac491cb5fb 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -15,7 +15,15 @@ -include_lib("rabbit/include/mc.hrl"). -include("rabbit_shovel.hrl"). +-rabbit_boot_step({rabbit_global_local_shovel_counters, + [{description, "global local shovel counters"}, + {mfa, {?MODULE, boot_step, + []}}, + {requires, rabbit_global_counters}, + {enables, external_infrastructure}]}). + -export([ + boot_step/0, parse/2, connect_source/1, connect_dest/1, @@ -53,12 +61,20 @@ %% See rabbit_amqp_session.erl -define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4). -define(DEFAULT_MAX_LINK_CREDIT, 1000). +-define(PROTOCOL, 'local-shovel'). -record(pending_ack, { delivery_tag, msg_id }). +boot_step() -> + Labels = #{protocol => ?PROTOCOL}, + rabbit_global_counters:init(Labels), + rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}), + rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}), + rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}). + parse(_Name, {source, Source}) -> Queue = parse_parameter(queue, fun parse_binary/1, proplists:get_value(queue, Source)), @@ -185,6 +201,7 @@ init_source(State = #{source := #{queue_r := QName, end end) of {Remaining, {ok, QState1}} -> + rabbit_global_counters:consumer_created(?PROTOCOL), {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, MaxLinkCredit, false, QState1), State2 = State#{source => Src#{current => Current#{queue_states => QState, consumer_tag => CTag}, @@ -214,6 +231,7 @@ init_source(State = #{source := #{queue_r := QName, init_dest(#{name := Name, shovel_type := Type, dest := #{add_forward_headers := AFH} = Dst} = State) -> + rabbit_global_counters:publisher_created(?PROTOCOL), _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), case AFH of true -> @@ -258,12 +276,14 @@ dest_endpoint(_Config) -> []. close_dest(_State) -> + rabbit_global_counters:publisher_deleted(?PROTOCOL), ok. close_source(#{source := #{current := #{queue_states := QStates0, consumer_tag := CTag, user := User}, queue_r := QName}}) -> + rabbit_global_counters:consumer_deleted(?PROTOCOL), case rabbit_amqqueue:with( QName, fun(Q) -> @@ -363,6 +383,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Msg = set_annotations(Msg0, Dest), RoutedQNames = route(Msg, Dest), Queues = rabbit_amqqueue:lookup_many(RoutedQNames), + messages_received(AckMode), case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of {ok, QState1, Actions} -> State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}}, @@ -451,13 +472,15 @@ handle_queue_actions(Actions, State) -> end, State, Actions). handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) -> + NumMsgs = length(Msgs), maybe_grant_credit( lists:foldl( - fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) -> + fun({QName, _QPid, MsgId, _Redelivered, Mc}, #{source := #{current := #{queue_states := QStates }}} = S0) -> + messages_delivered(QName, QStates), DeliveryTag = next_tag(S0), S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)), rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S) - end, sent_delivery(State, length(Msgs)), Msgs)). + end, sent_delivery(State, NumMsgs), Msgs)). next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) -> DeliveryTag. @@ -616,6 +639,7 @@ settle(Op, DeliveryTag, Multiple, {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of {ok, QState1, Actions} -> + messages_acknowledged(Op, QRef, QState1, MsgIds), State = State0#{source => Src#{current => Current#{queue_states => QState1, unacked_message_q => UAMQ}}}, handle_queue_actions(Actions, State); @@ -739,12 +763,18 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra at_least_one_credit_req_in_flight => false}} end. -process_routing_confirm(undefined, _, _, State) -> +process_routing_confirm(undefined, _, [], State) -> + rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1), + State; +process_routing_confirm(undefined, _, QRefs, State) -> + rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)), State; process_routing_confirm(MsgSeqNo, Tag, [], State) when is_integer(MsgSeqNo) -> + rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), record_confirms([{MsgSeqNo, Tag}], State); process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) -> + rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)), State#{dest => Dst#{unconfirmed => rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}. @@ -781,8 +811,10 @@ send_nacks(Rs, Cs, State) -> send_confirms([], _, State) -> State; send_confirms([MsgSeqNo], _, State) -> + rabbit_global_counters:messages_confirmed(?PROTOCOL, 1), rabbit_shovel_behaviour:ack(MsgSeqNo, false, State); send_confirms(Cs, Rs, State) -> + rabbit_global_counters:messages_confirmed(?PROTOCOL, length(Cs)), coalesce_and_send(Cs, Rs, fun(MsgSeqNo, Multiple, StateX) -> rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX) @@ -833,3 +865,30 @@ decr_remaining(Num, State) -> _ = send_confirms_and_nacks(State), exit(R) end. + +messages_acknowledged(complete, QName, QS, MsgIds) -> + case rabbit_queue_type:module(QName, QS) of + {ok, QType} -> + rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds)); + _ -> + ok + end; +messages_acknowledged(_, _, _, _) -> + ok. + +messages_received(AckMode) -> + rabbit_global_counters:messages_received(?PROTOCOL, 1), + case AckMode of + on_confirm -> + rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1); + _ -> + ok + end. + +messages_delivered(QName, S0) -> + case rabbit_queue_type:module(QName, S0) of + {ok, QType} -> + rabbit_global_counters:messages_delivered(?PROTOCOL, QType, 1); + _ -> + ok + end. diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 2e0466d805..94e8acac49 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -77,7 +77,8 @@ groups() -> local_to_local_stream_credit_flow_on_confirm, local_to_local_stream_credit_flow_on_publish, local_to_local_stream_credit_flow_no_ack, - local_to_local_simple_uri + local_to_local_simple_uri, + local_to_local_counters ]} ]. @@ -1050,6 +1051,36 @@ local_to_local_simple_uri(Config) -> none]), shovel_test_utils:await_shovel(Config, ?PARAM). +local_to_local_counters(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + %% Let's restart the node so the counters are reset + ok = rabbit_ct_broker_helpers:stop_node(Config, 0), + ok = rabbit_ct_broker_helpers:start_node(Config, 0), + with_session( + Config, + fun (Sess) -> + ?awaitMatch(#{publishers := 0, consumers := 0}, + get_global_counters(Config), 30_000), + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-queue">>, Dest} + ]), + ?awaitMatch(#{publishers := 1, consumers := 1}, + get_global_counters(Config), 30_000), + _ = publish_many(Sess, Src, Dest, <<"tag1">>, 150), + ?awaitMatch(#{consumers := 1, publishers := 1, + messages_received_total := 150, + messages_received_confirm_total := 150, + messages_routed_total := 150, + messages_unroutable_dropped_total := 0, + messages_unroutable_returned_total := 0, + messages_confirmed_total := 150}, + get_global_counters(Config), 30_000) + end). + %%---------------------------------------------------------------------------- with_session(Config, Fun) -> with_session(Config, <<"/">>, Fun). @@ -1217,3 +1248,10 @@ delete_queue(Name, VHost) -> _ -> ok end. + +get_global_counters(Config) -> + get_global_counters0(Config, #{protocol => 'local-shovel'}). + +get_global_counters0(Config, Key) -> + Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []), + maps:get(Key, Overview).