Handle channel_consumer_deleted event

Test refactoring
This commit is contained in:
kjnilsson 2016-09-14 09:14:58 +01:00
parent 842cf970da
commit 761181aeaa
3 changed files with 47 additions and 32 deletions

View File

@ -55,6 +55,9 @@ handle_cast({event, #event{type = channel_closed, props = Props}},
Pid = pget(pid, Props),
remove_channel(Pid, Intervals),
{noreply, State};
handle_cast({event, #event{type = channel_consumer_deleted, props = Props}}, State) ->
remove_channel_consumer(Props),
{noreply, State};
handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) ->
remove_consumer(Props),
{noreply, State};
@ -107,9 +110,12 @@ remove_channel(Id, Intervals) ->
ets:select_delete(channel_exchange_stats_fine_stats, match_interval_spec(Id)),
ets:select_delete(channel_queue_stats_deliver_stats, match_interval_spec(Id)),
ets:select_delete(channel_consumer_created_stats, match_channel_consumer_spec(Id)),
% ets:match_delete(channel_consumer_created_stats, {'_', {Id, '_'}}),
ok.
remove_channel_consumer(Props) ->
Obj = {pget(queue, Props), {pget(channel, Props), pget(consumer_tag, Props)}},
ets:delete_object(channel_consumer_created_stats, Obj).
remove_consumer(Props) ->
Id = {pget(queue, Props), pget(channel, Props), pget(consumer_tag, Props)},
ets:delete(consumer_stats, Id),

View File

@ -35,10 +35,10 @@ init([]) ->
{rabbit_mgmt_metrics_collector, start_link, [Table]},
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_collector]}
|| {Table, _} <- ?CORE_TABLES],
MGC = [{rabbit_mgmt_metrics_gc:name(Table),
{rabbit_mgmt_metrics_gc, start_link, [Table]},
MGC = [{rabbit_mgmt_metrics_gc:name(Event),
{rabbit_mgmt_metrics_gc, start_link, [Event]},
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_gc]}
|| Table <- ?GC_EVENTS],
|| Event <- ?GC_EVENTS],
MD = {delegate_management_sup, {delegate_sup, start_link, [5, "delegate_management_"]},
permanent, ?SUPERVISOR_WAIT, supervisor, [delegate_sup]},
{ok, {{one_for_one, 10, 10}, [ST, DB, MD] ++ MC ++ MGC}}.

View File

@ -111,9 +111,9 @@ multi_node_case1_test(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
Policy = [{pattern, <<".*">>},
{definition, [{'ha-mode', <<"all">>}]}],
http_put(Config, "/policies/%2f/HA", Policy, ?NO_CONTENT),
http_put(Config, "/policies/%2f/HA", Policy, ?CREATED),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?CREATED),
Q = wait_for(Config, "/queues/%2f/ha-queue"),
assert_node(Nodename2, pget(node, Q)),
assert_single_node(Nodename1, pget(slave_nodes, Q)),
@ -135,12 +135,11 @@ ha_queue_hosted_on_other_node(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
Policy = [{pattern, <<".*">>},
{definition, [{'ha-mode', <<"all">>}]}],
http_put(Config, "/policies/%2f/HA", Policy, ?NO_CONTENT),
http_put(Config, "/policies/%2f/HA", Policy, ?CREATED),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
#'basic.consume_ok'{consumer_tag = _Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
consume(Chan, <<"ha-queue">>),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/queues/%2f/ha-queue"),
@ -150,23 +149,22 @@ ha_queue_hosted_on_other_node(Config) ->
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
0 = pget(prefetch_count, Cons), % check one of the augmented properties
<<"ha-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
ok.
ha_queue_with_multiple_consumers(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
Policy = [{pattern, <<".*">>},
{definition, [{'ha-mode', <<"all">>}]}],
http_put(Config, "/policies/%2f/HA", Policy, ?NO_CONTENT),
http_put(Config, "/policies/%2f/HA", Policy, ?CREATED),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
trace_fun(Config, rabbit_core_metrics, channel_consumer_created),
#'basic.consume_ok'{consumer_tag = _Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
consume(Chan, <<"ha-queue">>),
timer:sleep(3000), % wait for metrics
#'basic.consume_ok'{consumer_tag = _Tag2} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
consume(Chan, <<"ha-queue">>),
timer:sleep(3000), % wait for metrics to be pushed
Res = http_get(Config, "/queues/%2f/ha-queue"),
amqp_channel:close(Chan),
@ -179,15 +177,16 @@ ha_queue_with_multiple_consumers(Config) ->
0 = pget(prefetch_count, C1),
0 = pget(prefetch_count, C2),
<<"ha-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
ok.
queue_on_other_node(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
#'basic.consume_ok'{consumer_tag = _Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/queues/%2f/some-queue"),
@ -197,19 +196,17 @@ queue_on_other_node(Config) ->
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
0 = pget(prefetch_count, Cons), % check one of the augmented properties
<<"some-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
ok.
queue_with_multiple_consumers(Config) ->
Nodename1 = get_node_config(Config, 0, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename1))}],
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/ha-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
trace_fun(Config, rabbit_core_metrics, channel_consumer_created),
#'basic.consume_ok'{consumer_tag = _Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
consume(Chan, <<"ha-queue">>),
#'basic.consume_ok'{consumer_tag = _Tag2} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"ha-queue">>}),
consume(Chan, <<"ha-queue">>),
timer:sleep(3000), % wait for metrics to be pushed
Res = http_get(Config, "/queues/%2f/ha-queue"),
amqp_channel:close(Chan),
@ -222,16 +219,16 @@ queue_with_multiple_consumers(Config) ->
0 = pget(prefetch_count, C1),
0 = pget(prefetch_count, C2),
<<"ha-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
ok.
queue_consumer_cancelled(Config) ->
% create queue on node 2
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
Tag = consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed before cancel
#'basic.cancel_ok'{} =
@ -239,20 +236,20 @@ queue_consumer_cancelled(Config) ->
timer:sleep(3000), % wait for metrics to be pushed
Res = http_get(Config, "/queues/%2f/some-queue"),
amqp_channel:close(Chan),
% assert there are no consumer details
[] = pget(consumer_details, Res),
<<"some-queue">> = pget(name, Res),
amqp_channel:close(Chan),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
ok.
queue_consumer_channel_closed(Config) ->
% create queue on node 2
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?NO_CONTENT),
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
#'basic.consume_ok'{consumer_tag = _Tag} =
amqp_channel:call(Chan, #'basic.consume'{queue = <<"some-queue">>}),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed before closing
amqp_channel:close(Chan),
timer:sleep(2000), % wait for metrics to be pushed
@ -260,10 +257,16 @@ queue_consumer_channel_closed(Config) ->
% assert there are no consumer details
[] = pget(consumer_details, Res),
<<"some-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
ok.
%%----------------------------------------------------------------------------
consume(Channel, Queue) ->
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}),
Tag.
trace_fun(Config, M, F) ->
Nodename1 = get_node_config(Config, 0, nodename),
Nodename2 = get_node_config(Config, 1, nodename),
@ -276,6 +279,12 @@ trace_fun(Config, M, F) ->
dbg:p(all,c),
dbg:tpl(M, F, cx).
dump_table(Config, Table) ->
Data = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [Table]),
ct:pal(?LOW_IMPORTANCE, "Node 0: Dump of table ~p:~n~p~n", [Table, Data]),
Data0 = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list, [Table]),
ct:pal(?LOW_IMPORTANCE, "Node 1: Dump of table ~p:~n~p~n", [Table, Data0]).
wait_for(Config, Path) ->
wait_for(Config, Path, [slave_nodes, synchronised_slave_nodes]).