update to new rabbit_channel_interceptor api
This commit is contained in:
parent
e64fd71144
commit
41108985be
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
-behaviour(rabbit_channel_interceptor).
|
-behaviour(rabbit_channel_interceptor).
|
||||||
|
|
||||||
-export([description/0, intercept/2, applies_to/1]).
|
-export([description/0, intercept/3, applies_to/0, init/1]).
|
||||||
|
|
||||||
%% exported for tests
|
%% exported for tests
|
||||||
-export([consumer_count/1]).
|
-export([consumer_count/1]).
|
||||||
|
|
@ -23,34 +23,37 @@
|
||||||
{requires, rabbit_registry},
|
{requires, rabbit_registry},
|
||||||
{enables, recovery}]}).
|
{enables, recovery}]}).
|
||||||
|
|
||||||
|
init(Ch) ->
|
||||||
|
rabbit_channel:get_vhost(Ch).
|
||||||
|
|
||||||
description() ->
|
description() ->
|
||||||
[{description, <<"Sharding interceptor for channel methods">>}].
|
[{description, <<"Sharding interceptor for channel methods">>}].
|
||||||
|
|
||||||
intercept(#'basic.consume'{queue = QName} = Method, VHost) ->
|
intercept(#'basic.consume'{queue = QName} = Method, Content, VHost) ->
|
||||||
case queue_name(VHost, QName) of
|
case queue_name(VHost, QName) of
|
||||||
{ok, QName2} ->
|
{ok, QName2} ->
|
||||||
Method#'basic.consume'{queue = QName2};
|
{Method#'basic.consume'{queue = QName2}, Content};
|
||||||
{error, QName} ->
|
{error, QName} ->
|
||||||
precondition_failed("Error finding sharded queue for: ~p", [QName])
|
precondition_failed("Error finding sharded queue for: ~p", [QName])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'basic.get'{queue = QName} = Method, VHost) ->
|
intercept(#'basic.get'{queue = QName} = Method, Content, VHost) ->
|
||||||
case queue_name(VHost, QName) of
|
case queue_name(VHost, QName) of
|
||||||
{ok, QName2} ->
|
{ok, QName2} ->
|
||||||
Method#'basic.get'{queue = QName2};
|
{Method#'basic.get'{queue = QName2}, Content};
|
||||||
{error, QName} ->
|
{error, QName} ->
|
||||||
precondition_failed("Error finding sharded queue for: ~p", [QName])
|
precondition_failed("Error finding sharded queue for: ~p", [QName])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'queue.delete'{queue = QName} = Method, VHost) ->
|
intercept(#'queue.delete'{queue = QName} = Method, Content, VHost) ->
|
||||||
case is_sharded(VHost, QName) of
|
case is_sharded(VHost, QName) of
|
||||||
true ->
|
true ->
|
||||||
precondition_failed("Can't delete sharded queue: ~p", [QName]);
|
precondition_failed("Can't delete sharded queue: ~p", [QName]);
|
||||||
_ ->
|
_ ->
|
||||||
Method
|
{Method, Content}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'queue.declare'{queue = QName} = Method, VHost) ->
|
intercept(#'queue.declare'{queue = QName} = Method, Content, VHost) ->
|
||||||
case is_sharded(VHost, QName) of
|
case is_sharded(VHost, QName) of
|
||||||
true ->
|
true ->
|
||||||
%% Since as an interceptor we can't modify what the channel
|
%% Since as an interceptor we can't modify what the channel
|
||||||
|
|
@ -60,43 +63,41 @@ intercept(#'queue.declare'{queue = QName} = Method, VHost) ->
|
||||||
%% arbitrary.
|
%% arbitrary.
|
||||||
QName2 = rabbit_sharding_util:make_queue_name(
|
QName2 = rabbit_sharding_util:make_queue_name(
|
||||||
QName, a2b(node()), 0),
|
QName, a2b(node()), 0),
|
||||||
Method#'queue.declare'{queue = QName2};
|
{Method#'queue.declare'{queue = QName2}, Content};
|
||||||
_ ->
|
_ ->
|
||||||
Method
|
{Method, Content}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'queue.bind'{queue = QName} = Method, VHost) ->
|
intercept(#'queue.bind'{queue = QName} = Method, Content, VHost) ->
|
||||||
case is_sharded(VHost, QName) of
|
case is_sharded(VHost, QName) of
|
||||||
true ->
|
true ->
|
||||||
precondition_failed("Can't bind sharded queue: ~p", [QName]);
|
precondition_failed("Can't bind sharded queue: ~p", [QName]);
|
||||||
_ ->
|
_ ->
|
||||||
Method
|
{Method, Content}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'queue.unbind'{queue = QName} = Method, VHost) ->
|
intercept(#'queue.unbind'{queue = QName} = Method, Content, VHost) ->
|
||||||
case is_sharded(VHost, QName) of
|
case is_sharded(VHost, QName) of
|
||||||
true ->
|
true ->
|
||||||
precondition_failed("Can't unbind sharded queue: ~p", [QName]);
|
precondition_failed("Can't unbind sharded queue: ~p", [QName]);
|
||||||
_ ->
|
_ ->
|
||||||
Method
|
{Method, Content}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
intercept(#'queue.purge'{queue = QName} = Method, VHost) ->
|
intercept(#'queue.purge'{queue = QName} = Method, Content, VHost) ->
|
||||||
case is_sharded(VHost, QName) of
|
case is_sharded(VHost, QName) of
|
||||||
true ->
|
true ->
|
||||||
precondition_failed("Can't purge sharded queue: ~p", [QName]);
|
precondition_failed("Can't purge sharded queue: ~p", [QName]);
|
||||||
_ ->
|
_ ->
|
||||||
Method
|
{Method, Content}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
applies_to('basic.consume') -> true;
|
intercept(Method, Content, _VHost) ->
|
||||||
applies_to('basic.get') -> true;
|
{Method, Content}.
|
||||||
applies_to('queue.delete') -> true;
|
|
||||||
applies_to('queue.declare') -> true;
|
applies_to() ->
|
||||||
applies_to('queue.bind') -> true;
|
['basic.consume', 'basic.get', 'queue.delete', 'queue.declare',
|
||||||
applies_to('queue.unbind') -> true;
|
'queue.bind', 'queue.unbind', 'queue.purge'].
|
||||||
applies_to('queue.purge') -> true;
|
|
||||||
applies_to(_Other) -> false.
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue