Adapt federation and shovel to changes in exchange behaviour and mirrored_supervisor
This commit is contained in:
parent
452152469d
commit
90aaf3a87d
|
@ -21,7 +21,7 @@
|
||||||
-behaviour(rabbit_exchange_decorator).
|
-behaviour(rabbit_exchange_decorator).
|
||||||
|
|
||||||
-export([description/0, serialise_events/1]).
|
-export([description/0, serialise_events/1]).
|
||||||
-export([create/2, delete/3, policy_changed/2,
|
-export([create/2, delete/2, policy_changed/2,
|
||||||
add_binding/3, remove_bindings/3, route/2, active_for/1]).
|
add_binding/3, remove_bindings/3, route/2, active_for/1]).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
@ -31,22 +31,16 @@ description() ->
|
||||||
|
|
||||||
serialise_events(X) -> federate(X).
|
serialise_events(X) -> federate(X).
|
||||||
|
|
||||||
create(transaction, _X) ->
|
create(_Serial, X) ->
|
||||||
ok;
|
|
||||||
create(none, X) ->
|
|
||||||
maybe_start(X).
|
maybe_start(X).
|
||||||
|
|
||||||
delete(transaction, _X, _Bs) ->
|
delete(_Serial, X) ->
|
||||||
ok;
|
|
||||||
delete(none, X, _Bs) ->
|
|
||||||
maybe_stop(X).
|
maybe_stop(X).
|
||||||
|
|
||||||
policy_changed(OldX, NewX) ->
|
policy_changed(OldX, NewX) ->
|
||||||
maybe_stop(OldX),
|
maybe_stop(OldX),
|
||||||
maybe_start(NewX).
|
maybe_start(NewX).
|
||||||
|
|
||||||
add_binding(transaction, _X, _B) ->
|
|
||||||
ok;
|
|
||||||
add_binding(Serial, X = #exchange{name = XName}, B) ->
|
add_binding(Serial, X = #exchange{name = XName}, B) ->
|
||||||
case federate(X) of
|
case federate(X) of
|
||||||
true -> _ = rabbit_federation_exchange_link:add_binding(Serial, XName, B),
|
true -> _ = rabbit_federation_exchange_link:add_binding(Serial, XName, B),
|
||||||
|
@ -54,8 +48,6 @@ add_binding(Serial, X = #exchange{name = XName}, B) ->
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
remove_bindings(transaction, _X, _Bs) ->
|
|
||||||
ok;
|
|
||||||
remove_bindings(Serial, X = #exchange{name = XName}, Bs) ->
|
remove_bindings(Serial, X = #exchange{name = XName}, Bs) ->
|
||||||
case federate(X) of
|
case federate(X) of
|
||||||
true -> _ = rabbit_federation_exchange_link:remove_bindings(Serial, XName, Bs),
|
true -> _ = rabbit_federation_exchange_link:remove_bindings(Serial, XName, Bs),
|
||||||
|
|
|
@ -27,7 +27,6 @@ start_link() ->
|
||||||
%% The scope is stopped in stop/1.
|
%% The scope is stopped in stop/1.
|
||||||
_ = rabbit_federation_pg:start_scope(),
|
_ = rabbit_federation_pg:start_scope(),
|
||||||
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
|
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
|
||||||
fun rabbit_misc:execute_mnesia_transaction/1,
|
|
||||||
?MODULE, []).
|
?MODULE, []).
|
||||||
|
|
||||||
%% Note that the next supervisor down, rabbit_federation_link_sup, is common
|
%% Note that the next supervisor down, rabbit_federation_link_sup, is common
|
||||||
|
|
|
@ -28,7 +28,6 @@ start_link() ->
|
||||||
%% The scope is stopped in stop/1.
|
%% The scope is stopped in stop/1.
|
||||||
_ = rabbit_federation_pg:start_scope(),
|
_ = rabbit_federation_pg:start_scope(),
|
||||||
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
|
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
|
||||||
fun rabbit_misc:execute_mnesia_transaction/1,
|
|
||||||
?MODULE, []).
|
?MODULE, []).
|
||||||
|
|
||||||
%% Note that the next supervisor down, rabbit_federation_link_sup, is common
|
%% Note that the next supervisor down, rabbit_federation_link_sup, is common
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
-export([description/0, serialise_events/0, route/2]).
|
-export([description/0, serialise_events/0, route/2]).
|
||||||
-export([validate/1, validate_binding/2,
|
-export([validate/1, validate_binding/2,
|
||||||
create/2, delete/3, policy_changed/2,
|
create/2, delete/2, policy_changed/2,
|
||||||
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
|
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
|
||||||
-export([info/1, info/2]).
|
-export([info/1, info/2]).
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ validate(#exchange{arguments = Args}) ->
|
||||||
|
|
||||||
validate_binding(_X, _B) -> ok.
|
validate_binding(_X, _B) -> ok.
|
||||||
create(_Tx, _X) -> ok.
|
create(_Tx, _X) -> ok.
|
||||||
delete(_Tx, _X, _Bs) -> ok.
|
delete(_Tx, _X) -> ok.
|
||||||
policy_changed(_X1, _X2) -> ok.
|
policy_changed(_X1, _X2) -> ok.
|
||||||
add_binding(_Tx, _X, _B) -> ok.
|
add_binding(_Tx, _X, _B) -> ok.
|
||||||
remove_bindings(_Tx, _X, _Bs) -> ok.
|
remove_bindings(_Tx, _X, _Bs) -> ok.
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
Pid = case mirrored_supervisor:start_link(
|
Pid = case mirrored_supervisor:start_link(
|
||||||
{local, ?SUPERVISOR}, ?SUPERVISOR,
|
{local, ?SUPERVISOR}, ?SUPERVISOR,
|
||||||
fun rabbit_misc:execute_mnesia_transaction/1, ?MODULE, []) of
|
?MODULE, []) of
|
||||||
{ok, Pid0} -> Pid0;
|
{ok, Pid0} -> Pid0;
|
||||||
{error, {already_started, Pid0}} -> Pid0
|
{error, {already_started, Pid0}} -> Pid0
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
start_link(ShovelName, ShovelConfig) ->
|
start_link(ShovelName, ShovelConfig) ->
|
||||||
mirrored_supervisor:start_link({local, ShovelName}, ShovelName,
|
mirrored_supervisor:start_link({local, ShovelName}, ShovelName,
|
||||||
fun rabbit_misc:execute_mnesia_transaction/1,
|
|
||||||
?MODULE, [ShovelName, ShovelConfig]).
|
?MODULE, [ShovelName, ShovelConfig]).
|
||||||
|
|
||||||
init([Name, Config]) ->
|
init([Name, Config]) ->
|
||||||
|
|
Loading…
Reference in New Issue