Manually recover ring state from durable exchanges and bindings

Only relevant for some use cases post-3.8.4 which has moved
plugin activation to a later point in the boot cycle, after
topology recovery.

Previously this state was naturally recovered during the topology
recovery boot step.

Closes #45.
This commit is contained in:
Michael Klishin 2020-06-12 10:39:53 +03:00
parent 6df9370c6c
commit 4cb9b001d7
1 changed files with 50 additions and 8 deletions

View File

@ -56,6 +56,15 @@
%% OTP 19.3 does not support exs1024s
-define(SEED_ALGORITHM, exs1024).
init() ->
mnesia:create_table(?HASH_RING_STATE_TABLE, [{record_name, chx_hash_ring},
{attributes, record_info(fields, chx_hash_ring)},
{type, ordered_set}]),
mnesia:add_table_copy(?HASH_RING_STATE_TABLE, node(), ram_copies),
mnesia:wait_for_tables([?HASH_RING_STATE_TABLE], 30000),
recover(),
ok.
info(_X) -> [].
info(_X, _) -> [].
@ -123,6 +132,7 @@ maybe_initialise_hash_ring_state(transaction, X = #resource{}) ->
case mnesia:read(?HASH_RING_STATE_TABLE, X) of
[_] -> ok;
[] ->
rabbit_log:debug("Consistent hashing exchange: will initialise hashing ring schema database record"),
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
ok = mnesia:write(?HASH_RING_STATE_TABLE, #chx_hash_ring{
exchange = X,
@ -134,6 +144,42 @@ maybe_initialise_hash_ring_state(_, X) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> maybe_initialise_hash_ring_state(transaction, X) end).
recover() ->
%% topology recovery has already happened, recover state for any durable
%% consistent hash exchanges
case list_exchanges() of
{ok, Xs} ->
rabbit_log:debug("Consistent hashing exchange: have ~b durable exchanges to recover", [length(Xs)]),
[recover_exchange_and_bindings(X) || X <- lists:usort(Xs)];
{aborted, Reason} ->
rabbit_log:error(
"Consistent hashing exchange: failed to recover durable exchange ring state, reason: ~p",
[Reason])
end.
list_exchanges() ->
case mnesia:transaction(fun () ->
mnesia:match_object(rabbit_exchange, #exchange{durable = true, type = 'x-consistent-hash', _ = '_'}, write) end) of
{atomic, Xs} ->
{ok, Xs};
{aborted, Reason} ->
{aborted, Reason}
end.
recover_exchange_and_bindings(#exchange{name = XName} = X) ->
mnesia:transaction(
fun () ->
rabbit_log:debug("Consistent hashing exchange: will recover exchange ~s", [rabbit_misc:rs(XName)]),
create(transaction, X),
rabbit_log:debug("Consistent hashing exchange: recovered exchange ~s", [rabbit_misc:rs(XName)]),
Bindings = rabbit_binding:list_for_source(XName),
rabbit_log:debug("Consistent hashing exchange: have ~b bindings to recover for exchange ~s",
[length(Bindings), rabbit_misc:rs(XName)]),
[add_binding(transaction, X, B) || B <- lists:usort(Bindings)],
rabbit_log:debug("Consistent hashing exchange: recovered bindings for exchange ~s",
[rabbit_misc:rs(XName)])
end).
create(transaction, X) ->
maybe_initialise_hash_ring_state(transaction, X);
create(Tx, X) ->
@ -151,6 +197,8 @@ policy_changed(_X1, _X2) -> ok.
add_binding(transaction, X,
B = #binding{source = S, destination = D, key = K}) ->
Weight = rabbit_data_coercion:to_integer(K),
rabbit_log:debug("Consistent hashing exchange: adding binding from "
"exchange ~s to destination ~s with routing key '~s'", [rabbit_misc:rs(S), rabbit_misc:rs(D), K]),
case mnesia:read(?HASH_RING_STATE_TABLE, S) of
[State0 = #chx_hash_ring{bucket_map = BM0,
@ -184,6 +232,8 @@ remove_bindings(none, X, Bindings) ->
remove_binding(#binding{source = S, destination = D, key = RK}) ->
Weight = rabbit_data_coercion:to_integer(RK),
rabbit_log:debug("Consistent hashing exchange: removing binding "
"from exchange '~p' to destination '~p' with routing key '~s'", [S, D, RK]),
case mnesia:read(?HASH_RING_STATE_TABLE, S) of
[State0 = #chx_hash_ring{bucket_map = BM0,
@ -223,14 +273,6 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
init() ->
mnesia:create_table(?HASH_RING_STATE_TABLE, [{record_name, chx_hash_ring},
{attributes, record_info(fields, chx_hash_ring)},
{type, ordered_set}]),
mnesia:add_table_copy(?HASH_RING_STATE_TABLE, node(), ram_copies),
mnesia:wait_for_tables([?HASH_RING_STATE_TABLE], 30000),
ok.
%%
%% Jump-consistent hashing.
%%