imports rabbit_misc:format
This commit is contained in:
parent
9d1d1f6580
commit
61b63f7876
|
|
@ -10,14 +10,16 @@
|
|||
-export([consumer_count/1]).
|
||||
|
||||
-import(rabbit_sharding_util, [a2b/1, shards_per_node/1]).
|
||||
-import(rabbit_misc, [r/3]).
|
||||
-import(rabbit_misc, [r/3, format/2]).
|
||||
|
||||
-rabbit_boot_step({?MODULE,
|
||||
[{description, "sharding interceptor"},
|
||||
{mfa, {rabbit_registry, register,
|
||||
[channel_interceptor, <<"sharding interceptor">>, ?MODULE]}},
|
||||
[channel_interceptor,
|
||||
<<"sharding interceptor">>, ?MODULE]}},
|
||||
{cleanup, {rabbit_registry, unregister,
|
||||
[channel_interceptor, <<"sharding interceptor">>]}},
|
||||
[channel_interceptor,
|
||||
<<"sharding interceptor">>]}},
|
||||
{requires, rabbit_registry},
|
||||
{enables, recovery}]}).
|
||||
|
||||
|
|
@ -29,7 +31,7 @@ intercept(#'basic.consume'{queue = QName} = Method, VHost) ->
|
|||
{ok, QName2} ->
|
||||
{ok, Method#'basic.consume'{queue = QName2}};
|
||||
{error, QName} ->
|
||||
{error, rabbit_misc:format("Error finding sharded queue for: ~p", [QName])}
|
||||
{error, format("Error finding sharded queue for: ~p", [QName])}
|
||||
end;
|
||||
|
||||
intercept(#'basic.get'{queue = QName} = Method, VHost) ->
|
||||
|
|
@ -37,13 +39,13 @@ intercept(#'basic.get'{queue = QName} = Method, VHost) ->
|
|||
{ok, QName2} ->
|
||||
{ok, Method#'basic.get'{queue = QName2}};
|
||||
{error, QName} ->
|
||||
{error, rabbit_misc:format("Error finding sharded queue for: ~p", [QName])}
|
||||
{error, format("Error finding sharded queue for: ~p", [QName])}
|
||||
end;
|
||||
|
||||
intercept(#'queue.delete'{queue = QName} = Method, VHost) ->
|
||||
case is_sharded(VHost, QName) of
|
||||
true ->
|
||||
{error, rabbit_misc:format("Can't delete sharded queue: ~p", [QName])};
|
||||
{error, format("Can't delete sharded queue: ~p", [QName])};
|
||||
_ ->
|
||||
{ok, Method}
|
||||
end;
|
||||
|
|
@ -51,7 +53,7 @@ intercept(#'queue.delete'{queue = QName} = Method, VHost) ->
|
|||
intercept(#'queue.declare'{queue = QName} = Method, VHost) ->
|
||||
case is_sharded(VHost, QName) of
|
||||
true ->
|
||||
{error, rabbit_misc:format("Can't declare sharded queue: ~p", [QName])};
|
||||
{error, format("Can't declare sharded queue: ~p", [QName])};
|
||||
_ ->
|
||||
{ok, Method}
|
||||
end;
|
||||
|
|
@ -59,7 +61,7 @@ intercept(#'queue.declare'{queue = QName} = Method, VHost) ->
|
|||
intercept(#'queue.bind'{queue = QName} = Method, VHost) ->
|
||||
case is_sharded(VHost, QName) of
|
||||
true ->
|
||||
{error, rabbit_misc:format("Can't bind sharded queue: ~p", [QName])};
|
||||
{error, format("Can't bind sharded queue: ~p", [QName])};
|
||||
_ ->
|
||||
{ok, Method}
|
||||
end;
|
||||
|
|
@ -67,7 +69,7 @@ intercept(#'queue.bind'{queue = QName} = Method, VHost) ->
|
|||
intercept(#'queue.unbind'{queue = QName} = Method, VHost) ->
|
||||
case is_sharded(VHost, QName) of
|
||||
true ->
|
||||
{error, rabbit_misc:format("Can't unbind sharded queue: ~p", [QName])};
|
||||
{error, format("Can't unbind sharded queue: ~p", [QName])};
|
||||
_ ->
|
||||
{ok, Method}
|
||||
end;
|
||||
|
|
@ -75,7 +77,7 @@ intercept(#'queue.unbind'{queue = QName} = Method, VHost) ->
|
|||
intercept(#'queue.purge'{queue = QName} = Method, VHost) ->
|
||||
case is_sharded(VHost, QName) of
|
||||
true ->
|
||||
{error, rabbit_misc:format("Can't purge sharded queue: ~p", [QName])};
|
||||
{error, format("Can't purge sharded queue: ~p", [QName])};
|
||||
_ ->
|
||||
{ok, Method}
|
||||
end.
|
||||
|
|
@ -109,13 +111,13 @@ is_sharded(VHost, QBin) ->
|
|||
end.
|
||||
|
||||
lookup_exchange(VHost, QBin) ->
|
||||
rabbit_exchange:lookup(rabbit_misc:r(VHost, exchange, QBin)).
|
||||
rabbit_exchange:lookup(r(VHost, exchange, QBin)).
|
||||
|
||||
least_consumers(VHost, QBin, N) ->
|
||||
F = fun(QNum) ->
|
||||
QBin2 = rabbit_sharding_util:make_queue_name(
|
||||
QBin, a2b(node()), QNum),
|
||||
case consumer_count(rabbit_misc:r(VHost, queue, QBin2)) of
|
||||
case consumer_count(r(VHost, queue, QBin2)) of
|
||||
{error, E} -> {error, E};
|
||||
[{consumers, C}] -> {C, QBin2}
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in New Issue