Add a classic queue observer_cli plugin

This specifically targets CQv2 and aims to identify where messages
are located (Erlang/GS2 mailbox, memory/disk/buffer, pending acks
and confirms).

This commit also makes it easy to add more plugins in the future.
This commit is contained in:
Loïc Hoguin 2021-08-17 16:28:06 +02:00
parent ab269cca1a
commit c7e7179fd0
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
7 changed files with 149 additions and 5 deletions

View File

@ -251,6 +251,12 @@
{requires, [core_initialized, recovery]},
{enables, routing_ready}]}).
-rabbit_boot_step({rabbit_observer_cli,
[{description, "Observer CLI configuration"},
{mfa, {rabbit_observer_cli, init, []}},
{requires, [core_initialized, recovery]},
{enables, routing_ready}]}).
-rabbit_boot_step({pre_flight,
[{description, "ready to communicate with peers and clients"},
{requires, [core_initialized, recovery, routing_ready]}]}).

View File

@ -9,7 +9,7 @@
-export([erase/1, init/3, reset_state/1, recover/7,
terminate/3, delete_and_terminate/1,
publish/7, publish/8, ack/2, read/3]).
info/1, publish/7, publish/8, ack/2, read/3]).
%% Recovery. Unlike other functions in this module, these
%% apply to all queues all at once.
@ -559,6 +559,14 @@ delete_and_terminate(State = #qi { dir = Dir,
State#qi{ segments = #{},
fds = #{} }.
-spec info(state()) -> [{atom(), integer()}].
info(#qi{ write_buffer = WriteBuffer, write_buffer_updates = NumUpdates }) ->
[
{qi_buffer_size, map_size(WriteBuffer)},
{qi_buffer_num_up, NumUpdates}
].
-spec publish(rabbit_types:msg_id(), rabbit_variable_queue:seq_id(),
rabbit_variable_queue:msg_location(),
rabbit_types:message_properties(), boolean(),

View File

@ -49,7 +49,7 @@
-module(rabbit_classic_queue_store_v2).
-export([init/1, terminate/1,
-export([init/1, terminate/1, info/1,
write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3,
remove/2, delete_segments/2]).
@ -136,6 +136,11 @@ maybe_close_fd(undefined) ->
maybe_close_fd(Fd) ->
ok = file:close(Fd).
-spec info(state()) -> [{atom(), non_neg_integer()}].
info(#qs{ write_buffer = WriteBuffer }) ->
[{qs_buffer_size, map_size(WriteBuffer)}].
-spec write(rabbit_variable_queue:seq_id(), rabbit_types:basic_message(),
rabbit_types:message_properties(), State)
-> {msg_location(), State} when State::state().

15
deps/rabbit/src/rabbit_observer_cli.erl vendored Normal file
View File

@ -0,0 +1,15 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_observer_cli).
-export([init/0]).
init() ->
application:set_env(observer_cli, plugins, [
rabbit_observer_cli_classic_queues:plugin_info()
]).

View File

@ -0,0 +1,95 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_observer_cli_classic_queues).
-export([plugin_info/0]).
-export([attributes/1, sheet_header/0, sheet_body/1]).
-include_lib("rabbit_common/include/rabbit.hrl").
plugin_info() ->
#{
module => rabbit_observer_cli_classic_queues,
title => "Classic",
shortcut => "CQ",
sort_column => 4
}.
attributes(State) ->
Content1 = "Location of messages in classic queues.",
Content2 = "Q, MQ and GQ are Erlang messages (total, mailbox and GS2 queue).",
Content3 = "mem/disk are AMQP messages in memory or on disk.",
Content4 = "pa/cf are messages pending acks or confirms.",
Content5 = "qib/qibu/qsb are index/store buffer sizes, with qib = AMQP messages + qibu (acks).",
{[
[#{content => Content1, width => 133}],
[#{content => Content2, width => 133}],
[#{content => Content3, width => 133}],
[#{content => Content4, width => 133}],
[#{content => Content5, width => 133}]
], State}.
sheet_header() ->
[
#{title => "Pid", width => 12, shortcut => "P"},
#{title => "Vhost", width => 10, shortcut => "V"},
#{title => "Name", width => 26, shortcut => "N"},
#{title => "", width => 5, shortcut => "Q"},
#{title => "", width => 5, shortcut => "MQ"},
#{title => "", width => 5, shortcut => "GQ"},
#{title => "", width => 10, shortcut => "mem"},
#{title => "", width => 10, shortcut => "disk"},
#{title => "", width => 8, shortcut => "pa"},
#{title => "", width => 8, shortcut => "cf"},
#{title => "", width => 6, shortcut => "qib"},
#{title => "", width => 6, shortcut => "qibu"},
#{title => "", width => 6, shortcut => "qsb"}
].
sheet_body(State) ->
Body = [begin
#resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q),
case rabbit_amqqueue:pid_of(Q) of
{error, not_found} ->
["dead", Vhost, unicode:characters_to_binary([Name, " (dead)"]),
0,0,0,0,0,0,0,0,0,0];
Pid ->
case process_info(Pid, message_queue_len) of
undefined ->
[Pid, Vhost, unicode:characters_to_binary([Name, " (dead)"]),
0,0,0,0,0,0,0,0,0,0];
{_, MsgQ} ->
GS2Q = rabbit_core_metrics:get_gen_server2_stats(Pid),
Info = rabbit_amqqueue:info(Q),
BQInfo = proplists:get_value(backing_queue_status, Info),
[
Pid, Vhost, Name,
MsgQ + GS2Q, MsgQ, GS2Q,
proplists:get_value(q3, BQInfo),
element(3, proplists:get_value(delta, BQInfo)),
proplists:get_value(num_pending_acks, BQInfo),
proplists:get_value(num_unconfirmed, BQInfo),
proplists:get_value(qi_buffer_size, BQInfo, 0),
proplists:get_value(qi_buffer_num_up, BQInfo, 0),
proplists:get_value(qs_buffer_size, BQInfo)
]
end
end
end || Q <- list_classic_queues()],
{Body, State}.
%% This function gets all classic queues regardless of durable/exclusive status.
list_classic_queues() ->
{atomic, Qs} =
mnesia:sync_transaction(
fun () ->
mnesia:match_object(rabbit_queue,
amqqueue:pattern_match_on_type(rabbit_classic_queue),
read)
end),
Qs.

View File

@ -10,7 +10,7 @@
-compile({inline, [segment_entry_count/0]}).
-export([erase/1, init/3, reset_state/1, recover/7,
terminate/3, delete_and_terminate/1,
terminate/3, delete_and_terminate/1, info/1,
pre_publish/7, flush_pre_publish_cache/2,
publish/7, publish/8, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
@ -358,6 +358,11 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-spec info(qistate()) -> [].
%% No info is implemented for v1 at this time.
info(_) -> [].
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
State = #qistate{pre_publish_cache = PPC,
delivered_cache = DC}) ->

View File

@ -869,11 +869,17 @@ info(backing_queue_status, #vqstate {
target_ram_count = TargetRamCount,
next_seq_id = NextSeqId,
next_deliver_seq_id = NextDeliverSeqId,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
unconfirmed = UC,
unconfirmed_simple = UCS,
index_mod = IndexMod,
index_state = IndexState,
store_state = StoreState,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
ack_out = AvgAckEgressRate }}) ->
[ {mode , Mode},
{version , Version},
{q1 , 0},
@ -885,10 +891,14 @@ info(backing_queue_status, #vqstate {
{target_ram_count , TargetRamCount},
{next_seq_id , NextSeqId},
{next_deliver_seq_id , NextDeliverSeqId},
{num_pending_acks , map_size(RPA) + map_size(DPA)},
{num_unconfirmed , sets:size(UC) + sets:size(UCS)},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ];
{avg_ack_egress_rate , AvgAckEgressRate} ]
++ IndexMod:info(IndexState)
++ rabbit_classic_queue_store_v2:info(StoreState);
info(_, _) ->
''.