distributed queue query
introduce channel consumer stats to track consumers from a channel node
This commit is contained in:
parent
346a6b91ca
commit
1484443847
|
|
@ -24,42 +24,43 @@
|
|||
|
||||
-type(table_name() :: atom()).
|
||||
|
||||
-define(TABLES, [connection_stats_coarse_conn_stats,
|
||||
vhost_stats_coarse_conn_stats,
|
||||
connection_created_stats,
|
||||
connection_stats,
|
||||
channel_created_stats,
|
||||
channel_stats,
|
||||
channel_stats_fine_stats,
|
||||
channel_exchange_stats_fine_stats,
|
||||
channel_queue_stats_deliver_stats,
|
||||
vhost_stats_fine_stats,
|
||||
queue_stats_deliver_stats,
|
||||
vhost_stats_deliver_stats,
|
||||
channel_stats_deliver_stats,
|
||||
channel_process_stats,
|
||||
queue_stats_publish,
|
||||
queue_exchange_stats_publish,
|
||||
exchange_stats_publish_out,
|
||||
exchange_stats_publish_in,
|
||||
consumer_stats,
|
||||
queue_stats,
|
||||
queue_msg_stats,
|
||||
vhost_msg_stats,
|
||||
queue_process_stats,
|
||||
node_stats,
|
||||
node_coarse_stats,
|
||||
node_persister_stats,
|
||||
node_node_stats,
|
||||
node_node_coarse_stats,
|
||||
queue_msg_rates,
|
||||
vhost_msg_rates,
|
||||
old_aggr_stats
|
||||
]).
|
||||
-define(TABLES, [{connection_stats_coarse_conn_stats, set},
|
||||
{vhost_stats_coarse_conn_stats, set},
|
||||
{connection_created_stats, set},
|
||||
{connection_stats, set},
|
||||
{channel_created_stats, set},
|
||||
{channel_consumer_created_stats, bag},
|
||||
{channel_stats, set},
|
||||
{channel_stats_fine_stats, set},
|
||||
{channel_exchange_stats_fine_stats, set},
|
||||
{channel_queue_stats_deliver_stats, set},
|
||||
{vhost_stats_fine_stats, set},
|
||||
{queue_stats_deliver_stats, set},
|
||||
{vhost_stats_deliver_stats, set},
|
||||
{channel_stats_deliver_stats, set},
|
||||
{channel_process_stats, set},
|
||||
{queue_stats_publish, set},
|
||||
{queue_exchange_stats_publish, set},
|
||||
{exchange_stats_publish_out, set},
|
||||
{exchange_stats_publish_in, set},
|
||||
{consumer_stats, set},
|
||||
{queue_stats, set},
|
||||
{queue_msg_stats, set},
|
||||
{vhost_msg_stats, set},
|
||||
{queue_process_stats, set},
|
||||
{node_stats, set},
|
||||
{node_coarse_stats, set},
|
||||
{node_persister_stats, set},
|
||||
{node_node_stats, set},
|
||||
{node_node_coarse_stats, set},
|
||||
{queue_msg_rates, set},
|
||||
{vhost_msg_rates, set},
|
||||
{old_aggr_stats, set}
|
||||
]).
|
||||
|
||||
-define(GC_EVENTS, [connection_closed, channel_closed, consumer_deleted,
|
||||
exchange_deleted, queue_deleted, vhost_deleted,
|
||||
node_node_deleted]).
|
||||
exchange_deleted, queue_deleted, vhost_deleted,
|
||||
node_node_deleted, channel_consumer_deleted]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Only for documentation and testing purposes, so we keep track of the number and
|
||||
|
|
@ -70,6 +71,8 @@
|
|||
-define(connection_created_stats(Id, Name, Props), {Id, Name, Props}).
|
||||
-define(connection_stats(Id, Props), {Id, Props}).
|
||||
-define(channel_created_stats(Id, Name, Props), {Id, Name, Props}).
|
||||
-define(channel_consumer_created_stats(Queue, ChPid, ConsumerTag),
|
||||
{Queue, {ChPid, ConsumerTag}}).
|
||||
-define(channel_stats(Id, Props), {Id, Props}).
|
||||
-define(channel_stats_fine_stats(Publish, Confirm, Return_unroutable),
|
||||
{Publish, Confirm, Return_unroutable}).
|
||||
|
|
|
|||
|
|
@ -171,9 +171,9 @@ safe_call(Term, Default, Retries) ->
|
|||
init([]) ->
|
||||
%% When Rabbit is overloaded, it's usually especially important
|
||||
%% that the management plugin work.
|
||||
process_flag(priority, high),
|
||||
pg2:create(management_db),
|
||||
pg2:join(management_db, self()),
|
||||
process_flag(priority, high),
|
||||
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
|
||||
rabbit_log:info("Statistics database started.~n"),
|
||||
{ok, #state{interval = Interval}, hibernate,
|
||||
|
|
@ -373,28 +373,35 @@ detail_queue_stats(Ranges, Objs, Interval) ->
|
|||
Pid = pget(pid, Obj),
|
||||
Props = lookup_element(queue_stats, Id),
|
||||
Stats = message_stats(
|
||||
rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges),
|
||||
queue_stats_publish,
|
||||
Id, Interval) ++
|
||||
rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges),
|
||||
queue_stats_deliver_stats,
|
||||
Id, Interval)) ++
|
||||
rabbit_mgmt_stats:format(pick_range(process_stats, Ranges),
|
||||
queue_process_stats,
|
||||
Id, Interval) ++
|
||||
rabbit_mgmt_stats:format(pick_range(queue_msg_counts, Ranges),
|
||||
queue_msg_stats,
|
||||
Id, Interval),
|
||||
Consumers = [{consumer_details,
|
||||
[augment_consumer(C)
|
||||
|| C <- ets:select(consumer_stats, match_queue_consumer_spec(Id))]}],
|
||||
rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges),
|
||||
queue_stats_publish,
|
||||
Id, Interval) ++
|
||||
rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges),
|
||||
queue_stats_deliver_stats,
|
||||
Id, Interval)) ++
|
||||
rabbit_mgmt_stats:format(pick_range(process_stats, Ranges),
|
||||
queue_process_stats,
|
||||
Id, Interval) ++
|
||||
rabbit_mgmt_stats:format(pick_range(queue_msg_counts, Ranges),
|
||||
queue_msg_stats,
|
||||
Id, Interval),
|
||||
|
||||
ChannelConsumers = [{{Q, C, T}, []} || {Q, {C, T}} <-
|
||||
ets:lookup(channel_consumer_created_stats,
|
||||
Id)],
|
||||
Consumers = ets:select(consumer_stats, match_queue_consumer_spec(Id)),
|
||||
% de-dupe consumers
|
||||
Consumers0 = lists:usort(fun({{_,_,T1}, _},{{_,_,T2}, _}) -> T1 =:= T2 end,
|
||||
Consumers ++ ChannelConsumers),
|
||||
|
||||
Consumers1 = [{consumer_details, [augment_consumer(C) || C <- Consumers0 ]}],
|
||||
StatsD = [{deliveries, detail_stats(channel_queue_stats_deliver_stats,
|
||||
deliver_get, second(Id), Ranges,
|
||||
Interval)},
|
||||
{incoming, detail_stats(queue_exchange_stats_publish,
|
||||
fine_stats, first(Id), Ranges,
|
||||
Interval)}],
|
||||
{Pid, augment_msg_stats(combine(Props, Obj)) ++ Stats ++ StatsD ++ Consumers}
|
||||
{Pid, augment_msg_stats(combine(Props, Obj)) ++ Stats ++ StatsD ++ Consumers1}
|
||||
end || Obj <- Objs]).
|
||||
|
||||
list_exchange_stats(Ranges, Objs, Interval) ->
|
||||
|
|
@ -476,7 +483,7 @@ detail_channel_stats(Ranges, Objs, Interval) ->
|
|||
Id, Interval),
|
||||
Consumers = [{consumer_details,
|
||||
[augment_consumer(C)
|
||||
|| C <- ets:select(consumer_stats, match_consumer_spec(Id))]}],
|
||||
|| C <- ets:select(consumer_stats, match_consumer_spec(Id))]}],
|
||||
StatsD = [{publishes, detail_stats(channel_exchange_stats_fine_stats,
|
||||
fine_stats, first(Id), Ranges,
|
||||
Interval)},
|
||||
|
|
|
|||
|
|
@ -56,14 +56,14 @@ init([Table]) ->
|
|||
Policy = retention_policy(Table),
|
||||
Interval = take_smaller(proplists:get_value(Policy, Policies)),
|
||||
{ok, Agent} = rabbit_mgmt_agent_collector_sup:start_child(self(), Table,
|
||||
Interval * 1000),
|
||||
Interval * 1000),
|
||||
{ok, #state{table = Table, agent = Agent,
|
||||
policies = {proplists:get_value(basic, Policies),
|
||||
proplists:get_value(detailed, Policies),
|
||||
proplists:get_value(global, Policies)},
|
||||
rates_mode = RatesMode,
|
||||
lookup_queue = fun queue_exists/1,
|
||||
lookup_exchange = fun exchange_exists/1}}.
|
||||
policies = {proplists:get_value(basic, Policies),
|
||||
proplists:get_value(detailed, Policies),
|
||||
proplists:get_value(global, Policies)},
|
||||
rates_mode = RatesMode,
|
||||
lookup_queue = fun queue_exists/1,
|
||||
lookup_exchange = fun exchange_exists/1}}.
|
||||
|
||||
handle_call(reset_lookups, _From, State) ->
|
||||
{reply, ok, State#state{lookup_queue = fun queue_exists/1,
|
||||
|
|
@ -105,8 +105,9 @@ retention_policy(channel_queue_exchange_metrics) -> detailed;
|
|||
retention_policy(channel_exchange_metrics) -> detailed;
|
||||
retention_policy(channel_queue_metrics) -> detailed;
|
||||
retention_policy(channel_process_metrics) -> basic;
|
||||
retention_policy(channel_consumer_created) -> basic;
|
||||
retention_policy(consumer_created) -> basic;
|
||||
retention_policy(queue_metrics) -> basic;
|
||||
retention_policy(queue_metrics) -> basic;
|
||||
retention_policy(queue_coarse_metrics) -> basic;
|
||||
retention_policy(node_persister_metrics) -> global;
|
||||
retention_policy(node_coarse_metrics) -> global;
|
||||
|
|
@ -143,6 +144,9 @@ aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions},
|
|||
aggregate_entry(_TS, {Id, Metrics}, #state{table = channel_created}) ->
|
||||
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
|
||||
ets:insert(channel_created_stats, ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd));
|
||||
aggregate_entry(_TS, {Queue, {ChPid, ConsumerTag}}, #state{table = channel_consumer_created}) ->
|
||||
ets:insert(channel_consumer_created_stats,
|
||||
?channel_consumer_created_stats(Queue, ChPid, ConsumerTag));
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = channel_metrics}) ->
|
||||
Ftd = rabbit_mgmt_format:format(Metrics,
|
||||
{fun rabbit_mgmt_format:format_channel_stats/1, true}),
|
||||
|
|
@ -245,16 +249,16 @@ aggregate_entry(TS, {Id, Reductions}, #state{table = channel_process_metrics,
|
|||
Size, Interval, false)
|
||||
end || {Size, Interval} <- BPolicies];
|
||||
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args},
|
||||
#state{table = consumer_created}) ->
|
||||
#state{table = consumer_created}) ->
|
||||
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
|
||||
{ack_required, AckRequired},
|
||||
{prefetch_count, PrefetchCount},
|
||||
{arguments, Args}], {[], false}),
|
||||
{ack_required, AckRequired},
|
||||
{prefetch_count, PrefetchCount},
|
||||
{arguments, Args}], {[], false}),
|
||||
ets:insert(consumer_stats, ?consumer_stats(Id, Fmt)),
|
||||
ok;
|
||||
aggregate_entry(TS, {Id, Metrics}, #state{table = queue_metrics,
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun}) ->
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun}) ->
|
||||
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)),
|
||||
Diff = get_difference({Id, rates}, Stats),
|
||||
ets:insert(old_aggr_stats, ?old_aggr_stats({Id, rates}, Stats)),
|
||||
|
|
|
|||
|
|
@ -106,11 +106,15 @@ remove_channel(Id, Intervals) ->
|
|||
ets:select_delete(old_aggr_stats, match_spec(Id)),
|
||||
ets:select_delete(channel_exchange_stats_fine_stats, match_interval_spec(Id)),
|
||||
ets:select_delete(channel_queue_stats_deliver_stats, match_interval_spec(Id)),
|
||||
ets:select_delete(channel_consumer_created_stats, match_channel_consumer_spec(Id)),
|
||||
% ets:match_delete(channel_consumer_created_stats, {'_', {Id, '_'}}),
|
||||
ok.
|
||||
|
||||
remove_consumer(Props) ->
|
||||
Id = {pget(queue, Props), pget(channel, Props), pget(consumer_tag, Props)},
|
||||
ets:delete(consumer_stats, Id).
|
||||
ets:delete(consumer_stats, Id),
|
||||
Obj = {pget(queue, Props), {pget(channel, Props), pget(consumer_tag, Props)}},
|
||||
ets:delete_object(channel_consumer_created_stats, Obj).
|
||||
|
||||
remove_exchange(Name, Intervals) ->
|
||||
delete_samples(exchange_stats_publish_out, Name, Intervals),
|
||||
|
|
@ -168,5 +172,8 @@ match_second_interval_spec(Id) ->
|
|||
match_consumer_spec(Id) ->
|
||||
[{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
|
||||
|
||||
match_channel_consumer_spec(Id) ->
|
||||
[{{'_', {'$1', '_'}}, [{'==', Id, '$1'}], [true]}].
|
||||
|
||||
match_queue_consumer_spec(Id) ->
|
||||
[{{{'$1', '_', '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ start_link() ->
|
|||
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init(_) ->
|
||||
[ets:new(Key, [public, set, named_table]) || Key <- ?TABLES],
|
||||
[ets:new(Key, [public, Type, named_table]) || {Key, Type} <- ?TABLES],
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
|
|
|
|||
|
|
@ -33,11 +33,11 @@ init([]) ->
|
|||
MC = [{rabbit_mgmt_metrics_collector:name(Table),
|
||||
{rabbit_mgmt_metrics_collector, start_link, [Table]},
|
||||
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_collector]}
|
||||
|| Table <- ?CORE_TABLES],
|
||||
|| {Table, _} <- ?CORE_TABLES],
|
||||
MGC = [{rabbit_mgmt_metrics_gc:name(Table),
|
||||
{rabbit_mgmt_metrics_gc, start_link, [Table]},
|
||||
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_gc]}
|
||||
|| Table <- ?GC_EVENTS],
|
||||
{rabbit_mgmt_metrics_gc, start_link, [Table]},
|
||||
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_gc]}
|
||||
|| Table <- ?GC_EVENTS],
|
||||
MD = {delegate_management_sup, {delegate_sup, start_link, [5, "delegate_management_"]},
|
||||
permanent, ?SUPERVISOR_WAIT, supervisor, [delegate_sup]},
|
||||
{ok, {{one_for_one, 10, 10}, [ST, DB, MD] ++ MC ++ MGC}}.
|
||||
|
|
|
|||
|
|
@ -60,14 +60,29 @@ to_json(ReqData, Context) ->
|
|||
rabbit_mgmt_util:range_ceil(ReqData), full}),
|
||||
|
||||
Q = merge(lists:append([R || {_, R} <- PidResults])),
|
||||
Q1 = rabbit_misc:pupdate(consumer_details,
|
||||
fun(C) ->
|
||||
lists:map(fun merge/1,
|
||||
group_by(consumer_tag, C)) end,
|
||||
Q),
|
||||
|
||||
rabbit_mgmt_util:reply(rabbit_mgmt_format:strip_pids(Q),
|
||||
rabbit_mgmt_util:reply(rabbit_mgmt_format:strip_pids(Q1),
|
||||
ReqData, Context)
|
||||
catch
|
||||
{error, invalid_range_parameters, Reason} ->
|
||||
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
|
||||
end.
|
||||
|
||||
group_by(Key, ListOfPropLists) ->
|
||||
Res = lists:foldl(fun(X, S) ->
|
||||
V = rabbit_misc:pget(Key, X),
|
||||
dict:update(V, fun (Old) -> [X|Old] end, [X], S) end,
|
||||
dict:new(),
|
||||
ListOfPropLists),
|
||||
[ X || {_, X} <- dict:to_list(Res)].
|
||||
|
||||
|
||||
|
||||
merge(Results) ->
|
||||
X = lists:foldl(fun(Q, S) ->
|
||||
QD = dict:from_list(Q),
|
||||
|
|
@ -75,6 +90,8 @@ merge(Results) ->
|
|||
end, dict:new(), Results),
|
||||
dict:to_list(X).
|
||||
|
||||
merge_fun(channel_details, V1, []) -> V1;
|
||||
merge_fun(channel_details, [], V2) -> V2;
|
||||
merge_fun(consumer_details, V1, V2) ->
|
||||
V1 ++ V2;
|
||||
merge_fun(_K, _V1, V2) -> V2.
|
||||
|
|
|
|||
|
|
@ -36,7 +36,12 @@ groups() ->
|
|||
{non_parallel_tests, [], [
|
||||
list_cluster_nodes_test,
|
||||
multi_node_case1_test,
|
||||
queue
|
||||
ha_queue_hosted_on_other_node,
|
||||
queue_on_other_node,
|
||||
ha_queue_with_multiple_consumers,
|
||||
queue_with_multiple_consumers,
|
||||
queue_consumer_cancelled,
|
||||
queue_consumer_channel_closed
|
||||
]}
|
||||
].
|
||||
|
||||
|
|
@ -44,6 +49,20 @@ groups() ->
|
|||
%% Testsuite setup/teardown.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
merge_app_env(Config) ->
|
||||
Config1 = rabbit_ct_helpers:merge_app_env(Config,
|
||||
{rabbit, [
|
||||
{collect_statistics, fine},
|
||||
{collect_statistics_interval, 500}
|
||||
]}),
|
||||
rabbit_ct_helpers:merge_app_env(Config1,
|
||||
{rabbitmq_management, [
|
||||
{sample_retention_policies,
|
||||
%% List of {MaxAgeInSeconds, SampleEveryNSeconds}
|
||||
[{global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},
|
||||
{basic, [{605, 1}, {3600, 60}]},
|
||||
{detailed, [{10, 5}]}] }]}).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
inets:start(),
|
||||
|
|
@ -51,7 +70,8 @@ init_per_suite(Config) ->
|
|||
{rmq_nodename_suffix, ?MODULE},
|
||||
{rmq_nodes_count, 2}
|
||||
]),
|
||||
rabbit_ct_helpers:run_setup_steps(Config1,
|
||||
Config2 = merge_app_env(Config1),
|
||||
rabbit_ct_helpers:run_setup_steps(Config2,
|
||||
rabbit_ct_broker_helpers:setup_steps()).
|
||||
|
||||
end_per_suite(Config) ->
|
||||
|
|
@ -64,10 +84,17 @@ init_per_group(_, Config) ->
|
|||
end_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
init_per_testcase(multi_node_case1_test = Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase);
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, {conn, Conn}),
|
||||
rabbit_ct_helpers:testcase_started(Config1, Testcase).
|
||||
|
||||
end_per_testcase(multi_node_case1_test = Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase);
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_client_helpers:close_connection(?config(conn, Config)),
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
@ -103,16 +130,136 @@ multi_node_case1_test(Config) ->
|
|||
|
||||
passed.
|
||||
|
||||
queue(Config) ->
|
||||
ha_queue_hosted_on_other_node(Config) ->
|
||||
% create ha queue on node 2
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
Policy = [{pattern, <<".*">>},
|
||||
{definition, [{'ha-mode', <<"all">>}]}],
|
||||
http_put(Config, "/policies/%2f/HA", Policy, ?NO_CONTENT),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
|
||||
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
|
||||
timer:sleep(100),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
#'basic.consume_ok'{consumer_tag = _Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
|
||||
|
||||
timer:sleep(2000), % wait for metrics to be pushed :(
|
||||
Res = http_get(Config, "/queues/%2f/ha-queue"),
|
||||
ct:pal("~p", [Res]),
|
||||
amqp_channel:close(Chan),
|
||||
% assert some basic data is there
|
||||
[Cons] = pget(consumer_details, Res),
|
||||
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
|
||||
0 = pget(prefetch_count, Cons), % check one of the augmented properties
|
||||
<<"ha-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
ha_queue_with_multiple_consumers(Config) ->
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
Policy = [{pattern, <<".*">>},
|
||||
{definition, [{'ha-mode', <<"all">>}]}],
|
||||
http_put(Config, "/policies/%2f/HA", Policy, ?NO_CONTENT),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
|
||||
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
trace_fun(Config, rabbit_core_metrics, channel_consumer_created),
|
||||
#'basic.consume_ok'{consumer_tag = _Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
|
||||
|
||||
timer:sleep(3000), % wait for metrics
|
||||
#'basic.consume_ok'{consumer_tag = _Tag2} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
|
||||
timer:sleep(3000), % wait for metrics to be pushed
|
||||
Res = http_get(Config, "/queues/%2f/ha-queue"),
|
||||
amqp_channel:close(Chan),
|
||||
% assert some basic data is there
|
||||
[C1, C2] = pget(consumer_details, Res),
|
||||
% channel details proplist must not be empty
|
||||
[_|_] = pget(channel_details, C1),
|
||||
[_|_] = pget(channel_details, C2),
|
||||
% check one of the augmented properties
|
||||
0 = pget(prefetch_count, C1),
|
||||
0 = pget(prefetch_count, C2),
|
||||
<<"ha-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
queue_on_other_node(Config) ->
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
|
||||
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
#'basic.consume_ok'{consumer_tag = _Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
|
||||
|
||||
timer:sleep(2000), % wait for metrics to be pushed :(
|
||||
Res = http_get(Config, "/queues/%2f/some-queue"),
|
||||
amqp_channel:close(Chan),
|
||||
% assert some basic data is present
|
||||
[Cons] = pget(consumer_details, Res),
|
||||
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
|
||||
0 = pget(prefetch_count, Cons), % check one of the augmented properties
|
||||
<<"some-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
queue_with_multiple_consumers(Config) ->
|
||||
Nodename1 = get_node_config(Config, 0, nodename),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename1))}],
|
||||
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
trace_fun(Config, rabbit_core_metrics, channel_consumer_created),
|
||||
#'basic.consume_ok'{consumer_tag = _Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
|
||||
|
||||
#'basic.consume_ok'{consumer_tag = _Tag2} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
|
||||
timer:sleep(3000), % wait for metrics to be pushed
|
||||
Res = http_get(Config, "/queues/%2f/ha-queue"),
|
||||
amqp_channel:close(Chan),
|
||||
% assert some basic data is there
|
||||
[C1, C2] = pget(consumer_details, Res),
|
||||
% channel details proplist must not be empty
|
||||
[_|_] = pget(channel_details, C1),
|
||||
[_|_] = pget(channel_details, C2),
|
||||
% check one of the augmented properties
|
||||
0 = pget(prefetch_count, C1),
|
||||
0 = pget(prefetch_count, C2),
|
||||
<<"ha-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
queue_consumer_cancelled(Config) ->
|
||||
% create queue on node 2
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
|
||||
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
#'basic.consume_ok'{consumer_tag = Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
|
||||
|
||||
timer:sleep(2000), % wait for metrics to be pushed before cancel
|
||||
#'basic.cancel_ok'{} =
|
||||
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = Tag}),
|
||||
|
||||
timer:sleep(3000), % wait for metrics to be pushed
|
||||
Res = http_get(Config, "/queues/%2f/some-queue"),
|
||||
amqp_channel:close(Chan),
|
||||
% assert there are no consumer details
|
||||
[] = pget(consumer_details, Res),
|
||||
<<"some-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
queue_consumer_channel_closed(Config) ->
|
||||
% create queue on node 2
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
|
||||
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
|
||||
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
||||
#'basic.consume_ok'{consumer_tag = _Tag} =
|
||||
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
|
||||
timer:sleep(2000), % wait for metrics to be pushed before closing
|
||||
amqp_channel:close(Chan),
|
||||
timer:sleep(2000), % wait for metrics to be pushed
|
||||
Res = http_get(Config, "/queues/%2f/some-queue"),
|
||||
% assert there are no consumer details
|
||||
[] = pget(consumer_details, Res),
|
||||
<<"some-queue">> = pget(name, Res),
|
||||
ok.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
|
@ -120,7 +267,10 @@ queue(Config) ->
|
|||
trace_fun(Config, M, F) ->
|
||||
Nodename1 = get_node_config(Config, 0, nodename),
|
||||
Nodename2 = get_node_config(Config, 1, nodename),
|
||||
dbg:tracer(process, {fun(A,_) -> ct:pal("TRACE: ~p", [A]) end, ok}),
|
||||
dbg:tracer(process, {fun(A,_) ->
|
||||
ct:pal(?LOW_IMPORTANCE,
|
||||
"TRACE: ~p", [A])
|
||||
end, ok}),
|
||||
dbg:n(Nodename1),
|
||||
dbg:n(Nodename2),
|
||||
dbg:p(all,c),
|
||||
|
|
|
|||
Loading…
Reference in New Issue