streams: improve stream_status command

To show all reachable replicas and their state
This commit is contained in:
kjnilsson 2021-04-27 12:24:43 +01:00
parent 3dea868441
commit 37275c4115
1 changed files with 48 additions and 11 deletions

View File

@ -516,6 +516,7 @@ i(type, _) ->
stream;
i(_, _) ->
''.
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
[[{binary(), term()}]] | {error, term()}.
status(Vhost, QueueName) ->
@ -527,20 +528,56 @@ status(Vhost, QueueName) ->
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
Data = osiris_counters:overview(),
case maps:get({osiris_writer, QName}, Data, undefined) of
undefined ->
[];
#{} = Cnt0 ->
Cnt = maps:without([chunks], Cnt0),
Conf = amqqueue:get_type_state(Q),
Max = maps:get(max_segment_size, Conf, osiris_log:get_default_max_segment_size()),
[maps:to_list(Cnt#{max_segment_size => Max})]
end;
{error, not_found} = E->
_Pid = amqqueue:get_pid(Q),
% Max = maps:get(max_segment_size, Conf, osiris_log:get_default_max_segment_size()),
[begin
[{role, Role},
get_key(node, C),
get_key(offset, C),
get_key(committed_offset, C),
get_key(first_offset, C),
get_key(readers, C),
get_key(segments, C)]
end || {Role, C} <- get_counters(Q)];
{error, not_found} = E ->
E
end.
get_key(Key, Cnt) ->
{Key, maps:get(Key, Cnt, undefined)}.
get_counters(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
{ok, Members} = rabbit_stream_coordinator:members(StreamId),
QName = amqqueue:get_name(Q),
Counters = [begin
Data = safe_get_overview(Node),
get_counter(QName, Data, #{node => Node})
end || Node <- maps:keys(Members)],
lists:filter(fun (X) -> X =/= undefined end, Counters).
safe_get_overview(Node) ->
case rpc:call(Node, osiris_counters, overview, []) of
{badrpc, _} ->
#{node => Node};
Data ->
Data
end.
get_counter(QName, Data, Add) ->
case maps:get({osiris_writer, QName}, Data, undefined) of
undefined ->
case maps:get({osiris_replica, QName}, Data, undefined) of
undefined ->
{undefined, Add};
M ->
{replica, maps:merge(Add, M)}
end;
M ->
{writer, maps:merge(Add, M)}
end.
-spec tracking_status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
[[{atom(), term()}]] | {error, term()}.
tracking_status(Vhost, QueueName) ->