brings file from old rabbitmq-server-62 branch

This commit is contained in:
Alvaro Videla 2015-10-30 15:31:41 +01:00
parent 3358a8c96a
commit faf76042f5
3 changed files with 49 additions and 11 deletions

View File

@ -24,10 +24,11 @@
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
info_all/4]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
@ -118,6 +119,8 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(info_all/4 :: (rabbit_types:vhost(), rabbit_types:info_keys(),
reference(), pid()) -> 'ok').
-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
@ -128,6 +131,9 @@
(rabbit_types:vhost())
-> [{name(), pid(), rabbit_types:ctag(), boolean(),
non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(consumers_all/3 ::
(rabbit_types:vhost(), reference(), pid())
-> 'ok').
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
@ -547,6 +553,7 @@ list_down(VHostPath) ->
info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
map_fun() -> fun(F, Qs) -> rabbit_misc:filter_exit_map(F, Qs) end.
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
@ -586,6 +593,14 @@ info_all(VHostPath, Items) ->
map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_main:emitting_map_with_wrapper_fun(
AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, map_fun(), list(VHostPath),
continue),
rabbit_control_main:emitting_map_with_wrapper_fun(
AggregatorPid, Ref,
fun(Q) -> info_down(Q, Items) end, map_fun(), list_down(VHostPath)).
force_event_refresh(Ref) ->
[gen_server2:cast(Q#amqqueue.pid,
{force_event_refresh, Ref}) || Q <- list()],
@ -599,15 +614,24 @@ consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
ConsumerInfoKeys=consumer_info_keys(),
ConsumerInfoKeys = consumer_info_keys(),
lists:append(
map(list(VHostPath),
fun (Q) ->
[lists:zip(
ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]
end)).
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
consumers_all(VHostPath, Ref, AggregatorPid) ->
ConsumerInfoKeys = consumer_info_keys(),
rabbit_control_main:emitting_map(
AggregatorPid, Ref,
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
list(VHostPath)).
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
lists:flatten(
[lists:zip(ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag,
AckRequired, Prefetch, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).

View File

@ -55,7 +55,8 @@
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
info_all/3]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/1]).
@ -218,6 +219,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(info_all/3 :: (rabbit_types:info_keys(), reference(), pid()) -> 'ok').
-spec(refresh_config_local/0 :: () -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
-spec(force_event_refresh/1 :: (reference()) -> 'ok').
@ -325,6 +327,11 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
info_all(Items, Ref, AggregatorPid) ->
rabbit_control_main:emitting_map_with_wrapper_fun(
AggregatorPid, Ref, fun(C) -> info(C, Items) end,
fun(F, L) -> rabbit_misc:filter_exit_map(F, L) end, list()).
refresh_config_local() ->
rabbit_misc:upmap(
fun (C) -> gen_server2:call(C, refresh_config, infinity) end,

View File

@ -21,7 +21,7 @@
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
connection_info_all/0, connection_info_all/1, connection_info_all/3,
close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
%%used by TCP-based transports, e.g. STOMP adapter
@ -82,6 +82,8 @@
-spec(connection_info_all/0 :: () -> [rabbit_types:infos()]).
-spec(connection_info_all/1 ::
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(connection_info_all/3 ::
(rabbit_types:info_keys(), reference(), pid()) -> 'ok').
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(force_connection_event_refresh/1 :: (reference()) -> 'ok').
@ -440,6 +442,11 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
connection_info_all(Items, Ref, AggregatorPid) ->
rabbit_control_main:emitting_map_with_wrapper_fun(
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
fun(F, L) -> rabbit_misc:filter_exit_map(F, L) end, connections()).
close_connection(Pid, Explanation) ->
rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
case lists:member(Pid, connections()) of