From 2743df2e5f04cc0026bf1de84a0420b77fba06bf Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 10 Oct 2022 13:23:03 +0100 Subject: [PATCH] Implement ra_machine:overview/1 for rabbit_stream_coordinator So that e.g. sys:get_status/1 would return a more compact state representation than just a full state dump. --- deps/rabbit/src/rabbit_stream_coordinator.erl | 33 ++++++++++++++++- .../src/rabbit_stream_sac_coordinator.erl | 16 ++++++++- .../test/rabbit_stream_coordinator_SUITE.erl | 35 +++++++++++++++++-- 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 96550827b7..db6fb8adf3 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -18,7 +18,8 @@ handle_aux/6, tick/2, version/0, - which_module/1]). + which_module/1, + overview/1]). -export([recover/0, add_replica/2, @@ -740,6 +741,36 @@ handle_aux(leader, _, {down, Pid, Reason}, handle_aux(_, _, _, AuxState, LogState, _) -> {no_reply, AuxState, LogState}. +overview(#?MODULE{streams = Streams, + monitors = Monitors, + single_active_consumer = Sac}) -> + StreamsOverview = maps:map( + fun (_, #stream{epoch = Epoch, + members = Members, + listeners = StreamListeners, + target = Target}) -> + MembO = maps:map( + fun (_, #member{state = MS, + role = R, + current = C, + target = T}) -> + #{state => MS, + role => R, + current => C, + target => T} + end, Members), + #{epoch => Epoch, + members => MembO, + num_listeners => map_size(StreamListeners), + target => Target} + end, Streams), + #{ + num_streams => map_size(Streams), + num_monitors => map_size(Monitors), + single_active_consumer => rabbit_stream_sac_coordinator:overview(Sac), + streams => StreamsOverview + }. + run_action(Action, StreamId, #{node := _Node, epoch := _Epoch} = Args, ActionFun, #aux{actions = Actions0} = Aux, Log) -> diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 8ee12b0e2f..14ad6b980c 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -39,7 +39,8 @@ handle_connection_down/2, consumer_groups/3, group_consumers/5, - is_ff_enabled/0]). + is_ff_enabled/0, + overview/1]). %% Single Active Consumer API -spec register_consumer(binary(), @@ -194,6 +195,19 @@ maybe_sac_execute(Fun) -> {error, feature_flag_disabled} end. +-spec overview(state()) -> map(). +overview(undefined) -> + undefined; +overview(#?MODULE{groups = Groups}) -> + GroupsOverview = maps:map(fun (_, #group{consumers = Consumers, + partition_index = Idx}) -> + #{num_consumers => length(Consumers), + partition_index => Idx} + end, Groups), + #{num_groups => map_size(Groups), + groups => GroupsOverview}. + + -spec init_state() -> state(). init_state() -> #?MODULE{groups = #{}, pids_groups = #{}}. diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index e5ee529a63..5ab50056ed 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -6,7 +6,6 @@ -export([ ]). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit/src/rabbit_stream_coordinator.hrl"). @@ -38,7 +37,8 @@ all_tests() -> delete_replica, delete_two_replicas, delete_replica_2, - leader_start_failed + leader_start_failed, + overview ]. groups() -> @@ -661,7 +661,6 @@ leader_start_failed(_) -> ?assertMatch([{aux, {stop, StreamId, #{node := N3, epoch := E2}, _}} ], lists:sort(Actions8)), - ok. leader_down_scenario_1(_) -> @@ -1312,6 +1311,36 @@ delete_replica_leader(_) -> S4), ok. +overview(_Config) -> + S0 = rabbit_stream_coordinator:init(undefined), + O0 = rabbit_stream_coordinator:overview(S0), + ?assertMatch(#{num_monitors := 0, + num_streams := 0, + single_active_consumer := #{groups := _, + num_groups := 0}, + streams := #{}}, O0), + + StreamId = <<"bananas">>, + TypeState = #{name => StreamId, + retention => [], + nodes => [node()]}, + Q = new_q(<<"bananas">>, TypeState), + Cmd = {new_stream, StreamId, #{leader_node => node(), + retention => [], + queue => Q}}, + {S1, _, _} = apply_cmd(#{index => 1, + machine_version => 3, + system_time => 203984982374}, Cmd, S0), + + ?assertMatch(#{num_monitors := 0, + num_streams := 1, + single_active_consumer := #{groups := _, + num_groups := 0}, + streams := #{StreamId := _}}, + rabbit_stream_coordinator:overview(S1)), + + ok. + meta(N) when is_integer(N) -> #{index => N, machine_version => 1,