This includes a new ra:key_metrics/1 API that is more available
than parsing the output of sys:get_status/1.

the rabbit_quorum_queue:status/1 function has been ported to use
this API instead as well as now inludes a few new fields.
This commit is contained in:
Karl Nilsson 2023-09-06 17:09:12 +01:00 committed by Michael Klishin
parent f6dd6a4a66
commit 882e0c1749
6 changed files with 145 additions and 43 deletions

View File

@ -49,7 +49,7 @@ bazel_dep(
bazel_dep(
name = "rabbitmq_ra",
version = "2.6.3",
version = "2.7.0",
repo_name = "ra",
)

View File

@ -151,6 +151,7 @@ PLT_APPS += mnesia
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.7
dep_systemd = hex 0.6.1
dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1
define usage_xml_to_erl

View File

@ -112,7 +112,7 @@ sheet_header() ->
].
sheet_body(PrevState) ->
RaStates = ets:tab2list(ra_state),
{_, RaStates} = rabbit_quorum_queue:all_replica_states(),
Body = [begin
#resource{name = Name, virtual_host = Vhost} = R = amqqueue:get_name(Q),
case rabbit_amqqueue:pid_of(Q) of
@ -134,7 +134,7 @@ sheet_body(PrevState) ->
[
Pid,
QName,
case proplists:get_value(InternalName, RaStates) of
case maps:get(InternalName, RaStates, undefined) of
leader -> "L";
follower -> "F";
_ -> "?"
@ -142,7 +142,7 @@ sheet_body(PrevState) ->
format_int(proplists:get_value(memory, ProcInfo)),
format_int(proplists:get_value(message_queue_len, ProcInfo)),
format_int(maps:get(commands, QQCounters)),
case proplists:get_value(InternalName, RaStates) of
case maps:get(InternalName, RaStates, undefined) of
leader -> format_int(maps:get(snapshots_written, QQCounters));
follower -> format_int(maps:get(snapshot_installed, QQCounters));
_ -> "?"

View File

@ -33,11 +33,13 @@
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
-export([rpc_delete_metrics/1]).
-export([rpc_delete_metrics/1,
key_metrics_rpc/1]).
-export([format/1]).
-export([open_files/1]).
-export([peek/2, peek/3]).
-export([add_member/4, add_member/2]).
-export([add_member/2,
add_member/4]).
-export([delete_member/3, delete_member/2]).
-export([requeue/3]).
-export([policy_changed/1]).
@ -380,7 +382,15 @@ become_leader(QName, Name) ->
-spec all_replica_states() -> {node(), #{atom() => atom()}}.
all_replica_states() ->
Rows = ets:tab2list(ra_state),
Rows0 = ets:tab2list(ra_state),
Rows = lists:map(fun
%% TODO: support other membership types
({K, S, voter}) ->
{K, S};
(T) ->
T
end, Rows0),
{node(), maps:from_list(Rows)}.
-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
@ -420,7 +430,9 @@ filter_quorum_critical(Queues, ReplicaStates) ->
AllUp = lists:filter(fun (N) ->
{Name, _} = amqqueue:get_pid(Q),
case maps:get(N, ReplicaStates, undefined) of
#{Name := State} when State =:= follower orelse State =:= leader ->
#{Name := State}
when State =:= follower orelse
State =:= leader ->
true;
_ -> false
end
@ -1037,6 +1049,10 @@ cluster_state(Name) ->
end
end.
key_metrics_rpc(ServerId) ->
Metrics = ra:key_metrics(ServerId),
Metrics#{machine_version := rabbit_fifo:version()}.
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
[[{binary(), term()}]] | {error, term()}.
status(Vhost, QueueName) ->
@ -1047,34 +1063,67 @@ status(Vhost, QueueName) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RName, _} = amqqueue:get_pid(Q),
Nodes = get_nodes(Q),
Nodes = lists:sort(get_nodes(Q)),
[begin
case get_sys_status({RName, N}) of
{ok, Sys} ->
{_, M} = lists:keyfind(ra_server_state, 1, Sys),
{_, RaftState} = lists:keyfind(raft_state, 1, Sys),
#{commit_index := Commit,
machine_version := MacVer,
current_term := Term,
log := #{last_index := Last,
snapshot_index := SnapIdx}} = M,
ServerId = {RName, N},
case erpc_call(N, ?MODULE, key_metrics_rpc, [ServerId], ?RPC_TIMEOUT) of
#{state := RaftState,
membership := Membership,
commit_index := Commit,
term := Term,
last_index := Last,
last_applied := LastApplied,
last_written_index := LastWritten,
snapshot_index := SnapIdx,
machine_version := MacVer} ->
[{<<"Node Name">>, N},
{<<"Raft State">>, RaftState},
{<<"Log Index">>, Last},
{<<"Membership">>, Membership},
{<<"Last Log Index">>, Last},
{<<"Last Written">>, LastWritten},
{<<"Last Applied">>, LastApplied},
{<<"Commit Index">>, Commit},
{<<"Snapshot Index">>, SnapIdx},
{<<"Term">>, Term},
{<<"Machine Version">>, MacVer}
];
{error, Err} ->
[{<<"Node Name">>, N},
{<<"Raft State">>, Err},
{<<"Log Index">>, <<>>},
{<<"Commit Index">>, <<>>},
{<<"Snapshot Index">>, <<>>},
{<<"Term">>, <<>>},
{<<"Machine Version">>, <<>>}
]
%% try the old method
case get_sys_status(ServerId) of
{ok, Sys} ->
{_, M} = lists:keyfind(ra_server_state, 1, Sys),
{_, RaftState} = lists:keyfind(raft_state, 1, Sys),
#{commit_index := Commit,
machine_version := MacVer,
current_term := Term,
last_applied := LastApplied,
log := #{last_index := Last,
last_written_index_term := {LastWritten, _},
snapshot_index := SnapIdx}} = M,
[{<<"Node Name">>, N},
{<<"Raft State">>, RaftState},
{<<"Membership">>, voter},
{<<"Last Log Index">>, Last},
{<<"Last Written">>, LastWritten},
{<<"Last Applied">>, LastApplied},
{<<"Commit Index">>, Commit},
{<<"Snapshot Index">>, SnapIdx},
{<<"Term">>, Term},
{<<"Machine Version">>, MacVer}
];
{error, Err} ->
[{<<"Node Name">>, N},
{<<"Raft State">>, Err},
{<<"Membership">>, <<>>},
{<<"LastLog Index">>, <<>>},
{<<"Last Written">>, <<>>},
{<<"Last Applied">>, <<>>},
{<<"Commit Index">>, <<>>},
{<<"Snapshot Index">>, <<>>},
{<<"Term">>, <<>>},
{<<"Machine Version">>, <<>>}
]
end
end
end || N <- Nodes];
{ok, _Q} ->
@ -1094,7 +1143,6 @@ get_sys_status(Proc) ->
end.
add_member(VHost, Name, Node, Timeout) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
@ -1124,6 +1172,7 @@ add_member(VHost, Name, Node, Timeout) ->
add_member(Q, Node) ->
add_member(Q, Node, ?ADD_MEMBER_TIMEOUT).
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
@ -1134,10 +1183,11 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
?TICK_TIMEOUT),
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
?SNAPSHOT_INTERVAL),
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval),
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter),
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
case ra:add_member(Members, ServerId, Timeout) of
ServerIdSpec = maps:with([id, uid, membership], Conf),
case ra:add_member(Members, ServerIdSpec, Timeout) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
Q2 = update_type_state(
@ -1638,23 +1688,27 @@ format_ra_event(ServerId, Evt, QRef) ->
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter).
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership) ->
QName = amqqueue:get_name(Q),
RaMachine = ra_machine(Q),
[{ClusterName, _} | _] = Members = members(Q),
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
FName = rabbit_misc:rs(QName),
Formatter = {?MODULE, format_ra_event, [QName]},
#{cluster_name => ClusterName,
id => ServerId,
uid => UId,
friendly_name => FName,
metrics_key => QName,
initial_members => Members,
log_init_args => #{uid => UId,
snapshot_interval => SnapshotInterval},
tick_timeout => TickTimeout,
machine => RaMachine,
ra_event_formatter => Formatter}.
rabbit_misc:maps_put_truthy(membership, Membership,
#{cluster_name => ClusterName,
id => ServerId,
uid => UId,
friendly_name => FName,
metrics_key => QName,
initial_members => Members,
log_init_args => #{uid => UId,
snapshot_interval => SnapshotInterval},
tick_timeout => TickTimeout,
machine => RaMachine,
ra_event_formatter => Formatter}).
get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),

View File

@ -84,7 +84,8 @@ groups() ->
leader_locator_balanced,
leader_locator_balanced_maintenance,
leader_locator_balanced_random_maintenance,
leader_locator_policy
leader_locator_policy,
status
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
@ -2708,6 +2709,52 @@ peek(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
ok.
-define(STATUS_MATCH(N, T),
[{<<"Node Name">>, N},
{<<"Raft State">>, _},
{<<"Membership">>, _},
{<<"Last Log Index">>, _},
{<<"Last Written">>, _},
{<<"Last Applied">>, _},
{<<"Commit Index">>, _},
{<<"Snapshot Index">>, _},
{<<"Term">>, T},
{<<"Machine Version">>, _}
]).
status(Config) ->
[Server | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-max-in-memory-length">>, long, 2}])),
Msg1 = <<"msg1">>,
Msg2 = <<"msg11">>,
publish(Ch, QQ, Msg1),
publish(Ch, QQ, Msg2),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
ct:pal("Server ~p", [Server]),
[N1, N2, N3] = lists:sort(Nodes),
%% check that nodes are returned and that at least the term isn't
%% defaulted (i.e. there was an error)
?assertMatch([?STATUS_MATCH(N1, T1),
?STATUS_MATCH(N2, T2),
?STATUS_MATCH(N3, T3)
] when T1 /= <<>> andalso
T2 /= <<>> andalso
T3 /= <<>>,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
status, [<<"/">>, QQ])),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
ok.
peek_with_wrong_queue_type(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

View File

@ -116,7 +116,7 @@ dep_cowlib = hex 2.12.1
dep_credentials_obfuscation = hex 3.4.0
dep_looking_glass = git https://github.com/rabbitmq/looking_glass.git main
dep_prometheus = hex 4.10.0
dep_ra = hex 2.6.3
dep_ra = hex 2.7.0
dep_ranch = hex 2.1.0
dep_recon = hex 2.5.3
dep_redbug = hex 2.0.7