Report flow/blocked shovel status in Mgmt UI

This commit is contained in:
Péter Gömöri 2022-10-16 01:10:34 +02:00
parent 39c78aae1a
commit 6a6f29cbea
13 changed files with 260 additions and 39 deletions

View File

@ -51,7 +51,7 @@
Val -> Val
end).
-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0, state_delayed/1]).
-export([peer_down/1]).
-export([block/1, unblock/1]).
@ -156,21 +156,26 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
-spec state() -> running | flow.
state() -> case blocked() of
true -> flow;
false -> case get(credit_blocked_at) of
undefined -> running;
B -> Now = erlang:monotonic_time(),
Diff = erlang:convert_time_unit(Now - B,
native,
micro_seconds),
case Diff < ?STATE_CHANGE_INTERVAL of
true -> flow;
false -> running
end
end
false -> state_delayed(get(credit_blocked_at))
end.
-spec state_delayed(integer() | undefined) -> running | flow.
state_delayed(BlockedAt) ->
case BlockedAt of
undefined -> running;
B -> Now = erlang:monotonic_time(),
Diff = erlang:convert_time_unit(Now - B,
native,
micro_seconds),
case Diff < ?STATE_CHANGE_INTERVAL of
true -> flow;
false -> running
end
end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain

View File

@ -101,8 +101,8 @@ fmt_ts({{YY, MM, DD}, {Hour, Min, Sec}}) ->
io_lib:format("~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w",
[YY, MM, DD, Hour, Min, Sec])).
fmt_status({'running' = St, Proplist}, Map) ->
maps:merge(Map#{state => St,
fmt_status({'running', Proplist}, Map) ->
maps:merge(Map#{state => proplists:get_value(blocked_status, Proplist, running),
source_protocol => proplists:get_value(src_protocol, Proplist,
undefined),
source => proplists:get_value(src_uri, Proplist),

View File

@ -30,6 +30,7 @@
close_dest/1,
ack/3,
nack/3,
status/1,
forward/4
]).
@ -173,16 +174,16 @@ forward_pending(State) ->
end.
forward(IncomingTag, Props, Payload, State) ->
State1 = control_throttle(State),
case is_blocked(State1) of
case is_blocked(State) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
add_pending(PendingEntry, State1);
add_pending(PendingEntry, State);
false ->
do_forward(IncomingTag, Props, Payload, State1)
State1 = do_forward(IncomingTag, Props, Payload, State),
control_throttle(State1)
end.
do_forward(IncomingTag, Props, Payload,
@ -382,6 +383,13 @@ is_blocked(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
is_blocked(_) ->
false.
status(#{dest := #{blocked_by := [flow]}}) ->
flow;
status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
blocked;
status(_) ->
running.
add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.

View File

@ -30,6 +30,7 @@
close_dest/1,
ack/3,
nack/3,
status/1,
forward/4
]).
@ -302,6 +303,14 @@ nack(Tag, true, State = #{source := #{current := #{session := Session},
Tag, true, accepted),
State#{source => Src#{last_nacked_tag => Tag}}.
status(#{dest := #{current := #{link_state := attached}}}) ->
flow;
status(#{dest := #{current := #{link_state := credited}}}) ->
running;
status(_) ->
%% Destination not yet connected
ignore.
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
Payload :: binary(), state()) -> state().
forward(_Tag, _Props, _Payload,

View File

@ -27,6 +27,7 @@
forward/4,
ack/3,
nack/3,
status/1,
% common functions
decr_remaining_unacked/1,
decr_remaining/2
@ -80,7 +81,7 @@
-callback nack(Tag :: tag(), Multi :: boolean(), state()) -> state().
-callback forward(Tag :: tag(), Props :: #{atom() => any()},
Payload :: binary(), state()) -> state().
-callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore.
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config().
@ -151,6 +152,9 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
Mod:nack(Tag, Multi, State).
status(#{dest := #{module := Mod}} = State) ->
Mod:status(State).
%% Common functions
%% Count down until we stop publishing in on-confirm mode

View File

@ -10,7 +10,13 @@
-export([start_link/0]).
-export([report/3, remove/1, status/0, lookup/1, cluster_status/0, cluster_status_with_nodes/0]).
-export([report/3,
report_blocked_status/2,
remove/1,
status/0,
lookup/1,
cluster_status/0,
cluster_status_with_nodes/0]).
-export([inject_node_info/2, find_matching_shovel/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -20,25 +26,56 @@
-define(ETS_NAME, ?MODULE).
-define(CHECK_FREQUENCY, 60000).
%% rabbit_shovel_mgmt_util:format_info as well as CLI shovel commands
%% rely on this strict type as of 3.11.2
%% (would be good to allow any atom as status name)
-type info() :: starting
| {running, proplists:proplist()}
| {terminated, term()}.
-type blocked_status() :: running | flow | blocked.
-type name() :: binary() | {rabbit_types:vhost(), binary()}.
-type type() :: static | dynamic.
-type status_tuple() :: {name(), type(), info(), calendar:datetime()}.
-export_type([info/0, blocked_status/0]).
-record(state, {timer}).
-record(entry, {name, type, info, timestamp}).
-record(entry, {name :: name(),
type :: type(),
info :: info(),
blocked_status = running :: blocked_status(),
blocked_at :: integer() | undefined,
timestamp :: calendar:datetime()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec report(name(), type(), info()) -> ok.
report(Name, Type, Info) ->
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
-spec report_blocked_status(name(), blocked_status()) -> ok.
report_blocked_status(Name, Status) ->
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).
-spec remove(name()) -> ok.
remove(Name) ->
gen_server:cast(?SERVER, {remove, Name}).
%% Warning: this function could be called from other nodes in the
%% cluster with different RabbitMQ version. Don't change the returned
%% format without a feature flag.
-spec status() -> [status_tuple()].
status() ->
gen_server:call(?SERVER, status, infinity).
-spec cluster_status() -> [status_tuple()].
cluster_status() ->
Nodes = rabbit_nodes:all_running(),
lists:usort(rabbit_misc:append_rpc_all_nodes(Nodes, ?MODULE, status, [])).
-spec cluster_status_with_nodes() -> [status_tuple()].
cluster_status_with_nodes() ->
Nodes = rabbit_nodes:all_running(),
lists:foldl(
@ -52,6 +89,7 @@ cluster_status_with_nodes() ->
end
end, [], Nodes).
-spec lookup(name()) -> proplists:proplist() | not_found.
lookup(Name) ->
gen_server:call(?SERVER, {lookup, Name}, infinity).
@ -62,7 +100,9 @@ init([]) ->
handle_call(status, _From, State) ->
Entries = ets:tab2list(?ETS_NAME),
{reply, [{Entry#entry.name, Entry#entry.type, Entry#entry.info,
{reply, [{Entry#entry.name,
Entry#entry.type,
blocked_status_to_info(Entry),
Entry#entry.timestamp}
|| Entry <- Entries], State};
@ -70,7 +110,7 @@ handle_call({lookup, Name}, _From, State) ->
Link = case ets:lookup(?ETS_NAME, Name) of
[Entry] -> [{name, Name},
{type, Entry#entry.type},
{info, Entry#entry.info},
{info, blocked_status_to_info(Entry)},
{timestamp, Entry#entry.timestamp}];
[] -> not_found
end,
@ -83,6 +123,16 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
split_name(Name) ++ split_status(Info)),
{noreply, State};
handle_cast({report_blocked_status, Name, Status, Timestamp}, State) ->
case Status of
flow ->
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow},
{#entry.blocked_at, Timestamp}]);
_ ->
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status}])
end,
{noreply, State};
handle_cast({remove, Name}, State) ->
true = ets:delete(?ETS_NAME, Name),
rabbit_event:notify(shovel_worker_removed, split_name(Name)),
@ -106,6 +156,7 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec inject_node_info(node(), [status_tuple()]) -> [status_tuple()].
inject_node_info(Node, Shovels) ->
lists:map(
fun({Name, Type, {State, Opts}, Timestamp}) ->
@ -113,6 +164,7 @@ inject_node_info(Node, Shovels) ->
{Name, Type, {State, Opts1}, Timestamp}
end, Shovels).
-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.
find_matching_shovel(VHost, Name, Shovels) ->
case lists:filter(
fun ({{V, S}, _Kind, _Status, _}) ->
@ -126,6 +178,7 @@ find_matching_shovel(VHost, Name, Shovels) ->
%% Implementation
%%
-spec split_status(info()) -> proplists:proplist().
split_status({running, MoreInfo}) -> [{status, running} | MoreInfo];
split_status({terminated, Reason}) -> [{status, terminated},
{reason, Reason}];
@ -137,4 +190,20 @@ split_name(Name) when is_atom(Name) -> [{name, Name}].
ensure_timer(State0) ->
State1 = rabbit_misc:stop_timer(State0, #state.timer),
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).
-spec blocked_status_to_info(#entry{}) -> info().
blocked_status_to_info(#entry{info = {running, Info},
blocked_status = BlockedStatus0,
blocked_at = BlockedAt}) ->
BlockedStatus =
case BlockedStatus0 of
running ->
credit_flow:state_delayed(BlockedAt);
_ ->
BlockedStatus0
end,
{running, Info ++ [{blocked_status, BlockedStatus}]};
blocked_status_to_info(#entry{info = Info}) ->
Info.

View File

@ -20,7 +20,8 @@
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
type :: static | dynamic,
config :: rabbit_shovel_behaviour:state()}).
config :: rabbit_shovel_behaviour:state(),
last_reported_status = running :: rabbit_shovel_status:blocked_status()}).
start_link(Type, Name, Config) ->
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
@ -116,7 +117,9 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
rabbit_log_shovel:debug("Shovel ~ts decided to stop due a message from destination: ~tp", [human_readable_name(Name), Reason]),
{stop, Reason, State};
Config1 ->
{noreply, State#state{config = Config1}}
State1 = State#state{config = Config1},
State2 = maybe_report_blocked_status(State1),
{noreply, State2}
end;
{stop, {inbound_conn_died, heartbeat_timeout}} ->
rabbit_log_shovel:error("Shovel ~ts detected missed heartbeats on source connection", [human_readable_name(Name)]),
@ -131,7 +134,9 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
rabbit_log_shovel:error("Shovel ~ts decided to stop due a message from source: ~tp", [human_readable_name(Name), Reason]),
{stop, Reason, State};
Config1 ->
{noreply, State#state{config = Config1}}
State1 = State#state{config = Config1},
State2 = maybe_report_blocked_status(State1),
{noreply, State2}
end.
terminate({shutdown, autodelete}, State = #state{name = Name,
@ -209,6 +214,18 @@ human_readable_name(Name) ->
ShovelName -> rabbit_misc:format("'~ts'", [ShovelName])
end.
maybe_report_blocked_status(#state{config = Config,
last_reported_status = LastStatus} = State) ->
case rabbit_shovel_behaviour:status(Config) of
ignore ->
State;
LastStatus ->
State;
NewStatus ->
rabbit_shovel_status:report_blocked_status(State#state.name, NewStatus),
State#state{last_reported_status = NewStatus}
end.
report_running(#state{config = Config} = State) ->
InUri = rabbit_shovel_behaviour:source_uri(Config),
OutUri = rabbit_shovel_behaviour:dest_uri(Config),

View File

@ -459,6 +459,7 @@ credit_flow(Config) ->
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"never">>}]),
shovel_test_utils:await_shovel(Config, <<"test">>),
running = shovel_test_utils:get_shovel_status(Config, <<"test">>),
ShovelPid = find_shovel_pid(Config),
#{dest :=
@ -482,9 +483,9 @@ credit_flow(Config) ->
%% Wait until the shovel is blocked
shovel_test_utils:await(
fun() ->
case get_shovel_state(ShovelPid) of
#{dest := #{blocked_by := [flow]}} -> true;
Conf -> Conf
case shovel_test_utils:get_shovel_status(Config, <<"test">>) of
flow -> true;
Status -> Status
end
end,
5000),
@ -521,8 +522,12 @@ credit_flow(Config) ->
#{messages := 1000} = message_count(Config, <<"dest">>),
[{_, 0, _}] =
rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 1])
Config, 0, recon, proc_count, [message_queue_len, 1]),
%% Status only transitions from flow to running
%% after a 1 second state-change-interval
timer:sleep(1000),
running = shovel_test_utils:get_shovel_status(Config, <<"test">>)
after
resume_process(Config),
set_default_credit(Config, OrigCredit)
@ -566,9 +571,7 @@ dest_resource_alarm(AckMode, Config) ->
{<<"src-delete-after">>, <<"never">>}]),
%% The shovel is blocked
ShovelPid = find_shovel_pid(Config),
Conf = get_shovel_state(ShovelPid),
#{dest := #{blocked_by := [connection_blocked]}} = Conf,
blocked = shovel_test_utils:get_shovel_status(Config, <<"test">>),
%% The shoveled message triggered a
%% connection.blocked notification, but hasn't
@ -645,7 +648,8 @@ dest_resource_alarm(AckMode, Config) ->
Cnt =:= 1001
end,
5000),
#{messages := 0} = message_count(Config, <<"src">>)
#{messages := 0} = message_count(Config, <<"src">>),
running = shovel_test_utils:get_shovel_status(Config, <<"test">>)
after
set_vm_memory_high_watermark(Config, OrigLimit)
end

View File

@ -9,7 +9,8 @@
-include_lib("common_test/include/ct.hrl").
-export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2,
shovels_from_status/0, await/1, await/2, clear_param/2]).
shovels_from_status/0, get_shovel_status/2,
await/1, await/2, clear_param/2]).
make_uri(Config) ->
Hostname = ?config(rmq_hostname, Config),
@ -39,6 +40,17 @@ shovels_from_status() ->
S = rabbit_shovel_status:status(),
[N || {{<<"/">>, N}, dynamic, {running, _}, _} <- S].
get_shovel_status(Config, Name) ->
S = rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
case S of
not_found ->
not_found;
_ ->
{Status, Info} = proplists:get_value(info, S),
proplists:get_value(blocked_status, Info, Status)
end.
await(Pred) ->
case Pred() of
true -> ok;

View File

@ -83,6 +83,16 @@ suites = [
"//deps/rabbitmq_management_agent:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_shovel_mgmt_util_SUITE",
runtime_deps = [
"@meck//:erlang_app",
],
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_shovel:erlang_app",
],
),
]
assert_suites(

View File

@ -46,7 +46,7 @@
</td>
<td><%= shovel.timestamp %></td>
<% } else { %>
<td><%= fmt_state('green', shovel.state) %></td>
<td><%= fmt_object_state(shovel) %></td>
<td><%= fmt_string(shovel.src_protocol) %></td>
<td><%= shovel.src_uri == undefined ? fmt_string(shovel.src_uri) : fmt_string(fmt_uri_with_credentials(shovel.src_uri)) %></td>
<td><%= fmt_shovel_endpoint('src_', shovel) %></td>

View File

@ -9,6 +9,10 @@
-export([status/2]).
-ifdef(TEST).
-export([status/1]).
-endif.
-import(rabbit_misc, [pget/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
@ -53,7 +57,8 @@ format_info(starting) ->
[{state, starting}];
format_info({running, Props}) ->
[{state, running}] ++ Props;
BlockedStatus = proplists:get_value(blocked_status, Props, running),
[{state, BlockedStatus}] ++ Props;
format_info({terminated, Reason}) ->
[{state, terminated},
@ -63,4 +68,4 @@ format_ts({{Y, M, D}, {H, Min, S}}) ->
print("~w-~2.2.0w-~2.2.0w ~w:~2.2.0w:~2.2.0w", [Y, M, D, H, Min, S]).
print(Fmt, Val) ->
list_to_binary(io_lib:format(Fmt, Val)).
list_to_binary(io_lib:format(Fmt, Val)).

View File

@ -0,0 +1,78 @@
%%% @doc Unit tests of rabbit_shovel_mgmt_util
-module(rabbit_shovel_mgmt_util_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
all() ->
[blocked_status].
init_per_testcase(_, Config) ->
meck:expect(rabbit_shovel_dyn_worker_sup_sup, cleanup_specs, 0, ok),
rabbit_shovel_status:start_link(),
Config.
end_per_testcase(_, Config) ->
meck:unload(rabbit_shovel_dyn_worker_sup_sup),
Config.
blocked_status(_Config) ->
?assertNotEqual(undefined, whereis(rabbit_shovel_status)),
Name = {<<"/">>, <<"test">>},
Type = dynamic,
Props = [{src_uri,<<"amqp://">>},
{src_protocol,<<"amqp091">>},
{dest_protocol,<<"amqp091">>},
{dest_uri,<<"amqp://">>},
{src_queue,<<"q1">>},
{dest_queue,<<"q2">>}],
ok = rabbit_shovel_status:report(Name, Type, starting),
ok = rabbit_shovel_status:report(Name, Type, {running, Props}),
?assertEqual([{Name, running}], get_shovel_states()),
ok = rabbit_shovel_status:report_blocked_status(Name, flow),
?assertEqual([{Name, flow}], get_shovel_states()),
%% If the shovel was blocked by credit flow in the last
%% STATE_CHANGE_INTERVAL, its state will be reported as "in flow".
ok = rabbit_shovel_status:report_blocked_status(Name, running),
?assertEqual([{Name, flow}], get_shovel_states()),
timer:sleep(1000),
?assertEqual([{Name, running}], get_shovel_states()),
ok = rabbit_shovel_status:report_blocked_status(Name, flow),
?assertEqual([{Name, flow}], get_shovel_states()),
ok = rabbit_shovel_status:report_blocked_status(Name, blocked),
?assertEqual([{Name, blocked}], get_shovel_states()),
%% If the shovel was blocked by credit flow in the last
%% STATE_CHANGE_INTERVAL, its state will be reported as "in flow",
%% even if there was a blocked state in-between
ok = rabbit_shovel_status:report_blocked_status(Name, running),
?assertEqual([{Name, flow}], get_shovel_states()),
timer:sleep(1000),
?assertEqual([{Name, running}], get_shovel_states()),
ok = rabbit_shovel_status:report_blocked_status(Name, blocked),
?assertEqual([{Name, blocked}], get_shovel_states()),
%% Switching back from blocked to running happens immediately
ok = rabbit_shovel_status:report_blocked_status(Name, running),
?assertEqual([{Name, running}], get_shovel_states()),
%% Switching from flow to terminated happens immediately
ok = rabbit_shovel_status:report_blocked_status(Name, flow),
rabbit_shovel_status:report(Name, Type, {terminated, reason}),
?assertEqual([{Name, terminated}], get_shovel_states()),
ok.
get_shovel_states() ->
[{{proplists:get_value(vhost, S), proplists:get_value(name, S)},
proplists:get_value(state, S)}
|| S <- rabbit_shovel_mgmt_util:status(node())].