Merge pull request #14561 from rabbitmq/local-shovel-counters
Local shovels: Add global counters
This commit is contained in:
commit
668dbe2e2d
|
@ -15,7 +15,15 @@
|
||||||
-include_lib("rabbit/include/mc.hrl").
|
-include_lib("rabbit/include/mc.hrl").
|
||||||
-include("rabbit_shovel.hrl").
|
-include("rabbit_shovel.hrl").
|
||||||
|
|
||||||
|
-rabbit_boot_step({rabbit_global_local_shovel_counters,
|
||||||
|
[{description, "global local shovel counters"},
|
||||||
|
{mfa, {?MODULE, boot_step,
|
||||||
|
[]}},
|
||||||
|
{requires, rabbit_global_counters},
|
||||||
|
{enables, external_infrastructure}]}).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
boot_step/0,
|
||||||
parse/2,
|
parse/2,
|
||||||
connect_source/1,
|
connect_source/1,
|
||||||
connect_dest/1,
|
connect_dest/1,
|
||||||
|
@ -53,12 +61,20 @@
|
||||||
%% See rabbit_amqp_session.erl
|
%% See rabbit_amqp_session.erl
|
||||||
-define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4).
|
-define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4).
|
||||||
-define(DEFAULT_MAX_LINK_CREDIT, 1000).
|
-define(DEFAULT_MAX_LINK_CREDIT, 1000).
|
||||||
|
-define(PROTOCOL, 'local-shovel').
|
||||||
|
|
||||||
-record(pending_ack, {
|
-record(pending_ack, {
|
||||||
delivery_tag,
|
delivery_tag,
|
||||||
msg_id
|
msg_id
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
boot_step() ->
|
||||||
|
Labels = #{protocol => ?PROTOCOL},
|
||||||
|
rabbit_global_counters:init(Labels),
|
||||||
|
rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}),
|
||||||
|
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
|
||||||
|
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
|
||||||
|
|
||||||
parse(_Name, {source, Source}) ->
|
parse(_Name, {source, Source}) ->
|
||||||
Queue = parse_parameter(queue, fun parse_binary/1,
|
Queue = parse_parameter(queue, fun parse_binary/1,
|
||||||
proplists:get_value(queue, Source)),
|
proplists:get_value(queue, Source)),
|
||||||
|
@ -185,6 +201,7 @@ init_source(State = #{source := #{queue_r := QName,
|
||||||
end
|
end
|
||||||
end) of
|
end) of
|
||||||
{Remaining, {ok, QState1}} ->
|
{Remaining, {ok, QState1}} ->
|
||||||
|
rabbit_global_counters:consumer_created(?PROTOCOL),
|
||||||
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, MaxLinkCredit, false, QState1),
|
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, MaxLinkCredit, false, QState1),
|
||||||
State2 = State#{source => Src#{current => Current#{queue_states => QState,
|
State2 = State#{source => Src#{current => Current#{queue_states => QState,
|
||||||
consumer_tag => CTag},
|
consumer_tag => CTag},
|
||||||
|
@ -214,6 +231,7 @@ init_source(State = #{source := #{queue_r := QName,
|
||||||
init_dest(#{name := Name,
|
init_dest(#{name := Name,
|
||||||
shovel_type := Type,
|
shovel_type := Type,
|
||||||
dest := #{add_forward_headers := AFH} = Dst} = State) ->
|
dest := #{add_forward_headers := AFH} = Dst} = State) ->
|
||||||
|
rabbit_global_counters:publisher_created(?PROTOCOL),
|
||||||
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
|
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
|
||||||
case AFH of
|
case AFH of
|
||||||
true ->
|
true ->
|
||||||
|
@ -258,12 +276,14 @@ dest_endpoint(_Config) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
close_dest(_State) ->
|
close_dest(_State) ->
|
||||||
|
rabbit_global_counters:publisher_deleted(?PROTOCOL),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
close_source(#{source := #{current := #{queue_states := QStates0,
|
close_source(#{source := #{current := #{queue_states := QStates0,
|
||||||
consumer_tag := CTag,
|
consumer_tag := CTag,
|
||||||
user := User},
|
user := User},
|
||||||
queue_r := QName}}) ->
|
queue_r := QName}}) ->
|
||||||
|
rabbit_global_counters:consumer_deleted(?PROTOCOL),
|
||||||
case rabbit_amqqueue:with(
|
case rabbit_amqqueue:with(
|
||||||
QName,
|
QName,
|
||||||
fun(Q) ->
|
fun(Q) ->
|
||||||
|
@ -363,6 +383,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
|
||||||
Msg = set_annotations(Msg0, Dest),
|
Msg = set_annotations(Msg0, Dest),
|
||||||
RoutedQNames = route(Msg, Dest),
|
RoutedQNames = route(Msg, Dest),
|
||||||
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
|
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
|
||||||
|
messages_received(AckMode),
|
||||||
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
|
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
|
||||||
{ok, QState1, Actions} ->
|
{ok, QState1, Actions} ->
|
||||||
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
|
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
|
||||||
|
@ -451,13 +472,15 @@ handle_queue_actions(Actions, State) ->
|
||||||
end, State, Actions).
|
end, State, Actions).
|
||||||
|
|
||||||
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
|
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
|
||||||
|
NumMsgs = length(Msgs),
|
||||||
maybe_grant_credit(
|
maybe_grant_credit(
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
|
fun({QName, _QPid, MsgId, _Redelivered, Mc}, #{source := #{current := #{queue_states := QStates }}} = S0) ->
|
||||||
|
messages_delivered(QName, QStates),
|
||||||
DeliveryTag = next_tag(S0),
|
DeliveryTag = next_tag(S0),
|
||||||
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
|
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
|
||||||
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
|
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
|
||||||
end, sent_delivery(State, length(Msgs)), Msgs)).
|
end, sent_delivery(State, NumMsgs), Msgs)).
|
||||||
|
|
||||||
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
|
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
|
||||||
DeliveryTag.
|
DeliveryTag.
|
||||||
|
@ -616,6 +639,7 @@ settle(Op, DeliveryTag, Multiple,
|
||||||
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
|
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
|
||||||
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
|
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
|
||||||
{ok, QState1, Actions} ->
|
{ok, QState1, Actions} ->
|
||||||
|
messages_acknowledged(Op, QRef, QState1, MsgIds),
|
||||||
State = State0#{source => Src#{current => Current#{queue_states => QState1,
|
State = State0#{source => Src#{current => Current#{queue_states => QState1,
|
||||||
unacked_message_q => UAMQ}}},
|
unacked_message_q => UAMQ}}},
|
||||||
handle_queue_actions(Actions, State);
|
handle_queue_actions(Actions, State);
|
||||||
|
@ -739,12 +763,18 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
|
||||||
at_least_one_credit_req_in_flight => false}}
|
at_least_one_credit_req_in_flight => false}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
process_routing_confirm(undefined, _, _, State) ->
|
process_routing_confirm(undefined, _, [], State) ->
|
||||||
|
rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1),
|
||||||
|
State;
|
||||||
|
process_routing_confirm(undefined, _, QRefs, State) ->
|
||||||
|
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
|
||||||
State;
|
State;
|
||||||
process_routing_confirm(MsgSeqNo, Tag, [], State)
|
process_routing_confirm(MsgSeqNo, Tag, [], State)
|
||||||
when is_integer(MsgSeqNo) ->
|
when is_integer(MsgSeqNo) ->
|
||||||
|
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
|
||||||
record_confirms([{MsgSeqNo, Tag}], State);
|
record_confirms([{MsgSeqNo, Tag}], State);
|
||||||
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
|
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
|
||||||
|
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
|
||||||
State#{dest => Dst#{unconfirmed =>
|
State#{dest => Dst#{unconfirmed =>
|
||||||
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
|
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
|
||||||
|
|
||||||
|
@ -781,8 +811,10 @@ send_nacks(Rs, Cs, State) ->
|
||||||
send_confirms([], _, State) ->
|
send_confirms([], _, State) ->
|
||||||
State;
|
State;
|
||||||
send_confirms([MsgSeqNo], _, State) ->
|
send_confirms([MsgSeqNo], _, State) ->
|
||||||
|
rabbit_global_counters:messages_confirmed(?PROTOCOL, 1),
|
||||||
rabbit_shovel_behaviour:ack(MsgSeqNo, false, State);
|
rabbit_shovel_behaviour:ack(MsgSeqNo, false, State);
|
||||||
send_confirms(Cs, Rs, State) ->
|
send_confirms(Cs, Rs, State) ->
|
||||||
|
rabbit_global_counters:messages_confirmed(?PROTOCOL, length(Cs)),
|
||||||
coalesce_and_send(Cs, Rs,
|
coalesce_and_send(Cs, Rs,
|
||||||
fun(MsgSeqNo, Multiple, StateX) ->
|
fun(MsgSeqNo, Multiple, StateX) ->
|
||||||
rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX)
|
rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX)
|
||||||
|
@ -833,3 +865,30 @@ decr_remaining(Num, State) ->
|
||||||
_ = send_confirms_and_nacks(State),
|
_ = send_confirms_and_nacks(State),
|
||||||
exit(R)
|
exit(R)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
messages_acknowledged(complete, QName, QS, MsgIds) ->
|
||||||
|
case rabbit_queue_type:module(QName, QS) of
|
||||||
|
{ok, QType} ->
|
||||||
|
rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds));
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
messages_acknowledged(_, _, _, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
messages_received(AckMode) ->
|
||||||
|
rabbit_global_counters:messages_received(?PROTOCOL, 1),
|
||||||
|
case AckMode of
|
||||||
|
on_confirm ->
|
||||||
|
rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
messages_delivered(QName, S0) ->
|
||||||
|
case rabbit_queue_type:module(QName, S0) of
|
||||||
|
{ok, QType} ->
|
||||||
|
rabbit_global_counters:messages_delivered(?PROTOCOL, QType, 1);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
|
@ -77,7 +77,8 @@ groups() ->
|
||||||
local_to_local_stream_credit_flow_on_confirm,
|
local_to_local_stream_credit_flow_on_confirm,
|
||||||
local_to_local_stream_credit_flow_on_publish,
|
local_to_local_stream_credit_flow_on_publish,
|
||||||
local_to_local_stream_credit_flow_no_ack,
|
local_to_local_stream_credit_flow_no_ack,
|
||||||
local_to_local_simple_uri
|
local_to_local_simple_uri,
|
||||||
|
local_to_local_counters
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -1050,6 +1051,36 @@ local_to_local_simple_uri(Config) ->
|
||||||
none]),
|
none]),
|
||||||
shovel_test_utils:await_shovel(Config, ?PARAM).
|
shovel_test_utils:await_shovel(Config, ?PARAM).
|
||||||
|
|
||||||
|
local_to_local_counters(Config) ->
|
||||||
|
Src = ?config(srcq, Config),
|
||||||
|
Dest = ?config(destq, Config),
|
||||||
|
%% Let's restart the node so the counters are reset
|
||||||
|
ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
|
||||||
|
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
|
||||||
|
with_session(
|
||||||
|
Config,
|
||||||
|
fun (Sess) ->
|
||||||
|
?awaitMatch(#{publishers := 0, consumers := 0},
|
||||||
|
get_global_counters(Config), 30_000),
|
||||||
|
shovel_test_utils:set_param(Config, ?PARAM,
|
||||||
|
[{<<"src-protocol">>, <<"local">>},
|
||||||
|
{<<"src-queue">>, Src},
|
||||||
|
{<<"dest-protocol">>, <<"local">>},
|
||||||
|
{<<"dest-queue">>, Dest}
|
||||||
|
]),
|
||||||
|
?awaitMatch(#{publishers := 1, consumers := 1},
|
||||||
|
get_global_counters(Config), 30_000),
|
||||||
|
_ = publish_many(Sess, Src, Dest, <<"tag1">>, 150),
|
||||||
|
?awaitMatch(#{consumers := 1, publishers := 1,
|
||||||
|
messages_received_total := 150,
|
||||||
|
messages_received_confirm_total := 150,
|
||||||
|
messages_routed_total := 150,
|
||||||
|
messages_unroutable_dropped_total := 0,
|
||||||
|
messages_unroutable_returned_total := 0,
|
||||||
|
messages_confirmed_total := 150},
|
||||||
|
get_global_counters(Config), 30_000)
|
||||||
|
end).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
with_session(Config, Fun) ->
|
with_session(Config, Fun) ->
|
||||||
with_session(Config, <<"/">>, Fun).
|
with_session(Config, <<"/">>, Fun).
|
||||||
|
@ -1217,3 +1248,10 @@ delete_queue(Name, VHost) ->
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_global_counters(Config) ->
|
||||||
|
get_global_counters0(Config, #{protocol => 'local-shovel'}).
|
||||||
|
|
||||||
|
get_global_counters0(Config, Key) ->
|
||||||
|
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
|
||||||
|
maps:get(Key, Overview).
|
||||||
|
|
Loading…
Reference in New Issue