API change.
This commit is contained in:
parent
fb282d0a54
commit
989dcb984c
|
|
@ -19,39 +19,24 @@
|
|||
|
||||
-rabbit_boot_step({?MODULE,
|
||||
[{description, "federation exchange decorator"},
|
||||
{mfa, {?MODULE, recover, []}},
|
||||
{mfa, {rabbit_exchange_decorator, register,
|
||||
[<<"federation">>, ?MODULE]}},
|
||||
{requires, rabbit_registry},
|
||||
{requires, rabbit_federation_upstream_exchange},
|
||||
{cleanup, {rabbit_registry, unregister,
|
||||
[exchange_decorator, <<"federation">>]}},
|
||||
{cleanup, {rabbit_exchange_decorator, unregister,
|
||||
[<<"federation">>]}},
|
||||
{enables, recovery}]}).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
-behaviour(rabbit_exchange_decorator).
|
||||
|
||||
-export([description/0, serialise_events/1, recover/0]).
|
||||
-export([description/0, serialise_events/1]).
|
||||
-export([create/2, delete/3, policy_changed/2,
|
||||
add_binding/3, remove_bindings/3, route/2, active_for/1]).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
recover() ->
|
||||
rabbit_registry:register(exchange_decorator, <<"federation">>, ?MODULE),
|
||||
|
||||
%% When we're enabled at runtime, we must replicate some of the work
|
||||
%% that rabbit:recover/0 does during the boot sequence, since we need
|
||||
%% to establish links for any federated exchanges. During startup, this
|
||||
%% runs prior to exchange recovery, at which point rabbit_exchange is
|
||||
%% empty.
|
||||
rabbit_misc:table_filter(
|
||||
fun (Ex = #exchange{ type = Type }) ->
|
||||
Type /= 'x-federation-upstream'
|
||||
end,
|
||||
fun(Ex, Txn) -> create(map_create_tx(Txn), Ex) end,
|
||||
rabbit_exchange),
|
||||
ok.
|
||||
|
||||
map_create_tx(true) -> transaction;
|
||||
map_create_tx(false) -> none.
|
||||
|
||||
|
|
|
|||
|
|
@ -18,10 +18,11 @@
|
|||
|
||||
-rabbit_boot_step({?MODULE,
|
||||
[{description, "federation queue decorator"},
|
||||
{mfa, {?MODULE, recover, []}},
|
||||
{mfa, {rabbit_queue_decorator, register,
|
||||
[<<"federation">>, ?MODULE]}},
|
||||
{requires, rabbit_registry},
|
||||
{cleanup, {rabbit_registry, unregister,
|
||||
[queue_decorator, <<"federation">>]}},
|
||||
{cleanup, {rabbit_queue_decorator, unregister,
|
||||
[<<"federation">>]}},
|
||||
{enables, recovery}]}).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
|
@ -30,26 +31,13 @@
|
|||
-behaviour(rabbit_queue_decorator).
|
||||
|
||||
-export([startup/1, shutdown/1, policy_changed/2, active_for/1,
|
||||
consumer_state_changed/3, recover/0]).
|
||||
consumer_state_changed/3]).
|
||||
-export([policy_changed_local/2]).
|
||||
|
||||
-import(rabbit_misc, [pget/2]).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
recover() ->
|
||||
rabbit_registry:register(queue_decorator, <<"federation">>, ?MODULE),
|
||||
|
||||
%% When we're enabled at runtime, we must replicate some of the work
|
||||
%% that rabbit:recover/0 does during the boot sequence, since we need
|
||||
%% to establish links for any federated exchanges. During startup, this
|
||||
%% runs prior to exchange recovery, at which point rabbit_exchange is
|
||||
%% empty.
|
||||
|
||||
%% Q: can we get away with a dirty read here?
|
||||
[ startup(Q) || Q <- rabbit_misc:dirty_read_all(rabbit_queue) ],
|
||||
ok.
|
||||
|
||||
startup(Q) ->
|
||||
case active_for(Q) of
|
||||
true -> rabbit_federation_queue_link_sup_sup:start_child(Q);
|
||||
|
|
|
|||
Loading…
Reference in New Issue