Merge pull request #6073 from rabbitmq/stream-coordinator-overview

Implement ra_machine:overview/1 for rabbit_stream_coordinator
This commit is contained in:
Arnaud Cogoluègnes 2022-10-13 11:27:23 +02:00 committed by GitHub
commit 2460f1468b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 5 deletions

View File

@ -18,7 +18,8 @@
handle_aux/6,
tick/2,
version/0,
which_module/1]).
which_module/1,
overview/1]).
-export([recover/0,
add_replica/2,
@ -740,6 +741,36 @@ handle_aux(leader, _, {down, Pid, Reason},
handle_aux(_, _, _, AuxState, LogState, _) ->
{no_reply, AuxState, LogState}.
overview(#?MODULE{streams = Streams,
monitors = Monitors,
single_active_consumer = Sac}) ->
StreamsOverview = maps:map(
fun (_, #stream{epoch = Epoch,
members = Members,
listeners = StreamListeners,
target = Target}) ->
MembO = maps:map(
fun (_, #member{state = MS,
role = R,
current = C,
target = T}) ->
#{state => MS,
role => R,
current => C,
target => T}
end, Members),
#{epoch => Epoch,
members => MembO,
num_listeners => map_size(StreamListeners),
target => Target}
end, Streams),
#{
num_streams => map_size(Streams),
num_monitors => map_size(Monitors),
single_active_consumer => rabbit_stream_sac_coordinator:overview(Sac),
streams => StreamsOverview
}.
run_action(Action, StreamId, #{node := _Node,
epoch := _Epoch} = Args,
ActionFun, #aux{actions = Actions0} = Aux, Log) ->

View File

@ -39,7 +39,8 @@
handle_connection_down/2,
consumer_groups/3,
group_consumers/5,
is_ff_enabled/0]).
is_ff_enabled/0,
overview/1]).
%% Single Active Consumer API
-spec register_consumer(binary(),
@ -194,6 +195,19 @@ maybe_sac_execute(Fun) ->
{error, feature_flag_disabled}
end.
-spec overview(state()) -> map().
overview(undefined) ->
undefined;
overview(#?MODULE{groups = Groups}) ->
GroupsOverview = maps:map(fun (_, #group{consumers = Consumers,
partition_index = Idx}) ->
#{num_consumers => length(Consumers),
partition_index => Idx}
end, Groups),
#{num_groups => map_size(Groups),
groups => GroupsOverview}.
-spec init_state() -> state().
init_state() ->
#?MODULE{groups = #{}, pids_groups = #{}}.

View File

@ -6,7 +6,6 @@
-export([
]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit/src/rabbit_stream_coordinator.hrl").
@ -38,7 +37,8 @@ all_tests() ->
delete_replica,
delete_two_replicas,
delete_replica_2,
leader_start_failed
leader_start_failed,
overview
].
groups() ->
@ -661,7 +661,6 @@ leader_start_failed(_) ->
?assertMatch([{aux, {stop, StreamId, #{node := N3, epoch := E2}, _}}
], lists:sort(Actions8)),
ok.
leader_down_scenario_1(_) ->
@ -1312,6 +1311,36 @@ delete_replica_leader(_) ->
S4),
ok.
overview(_Config) ->
S0 = rabbit_stream_coordinator:init(undefined),
O0 = rabbit_stream_coordinator:overview(S0),
?assertMatch(#{num_monitors := 0,
num_streams := 0,
single_active_consumer := #{groups := _,
num_groups := 0},
streams := #{}}, O0),
StreamId = <<"bananas">>,
TypeState = #{name => StreamId,
retention => [],
nodes => [node()]},
Q = new_q(<<"bananas">>, TypeState),
Cmd = {new_stream, StreamId, #{leader_node => node(),
retention => [],
queue => Q}},
{S1, _, _} = apply_cmd(#{index => 1,
machine_version => 3,
system_time => 203984982374}, Cmd, S0),
?assertMatch(#{num_monitors := 0,
num_streams := 1,
single_active_consumer := #{groups := _,
num_groups := 0},
streams := #{StreamId := _}},
rabbit_stream_coordinator:overview(S1)),
ok.
meta(N) when is_integer(N) ->
#{index => N,
machine_version => 1,