Handle races with quorum_queue feature flag migration fun
In a few places, the migration of the `rabbit_queue` and `rabbit_durable_queue` Mnesia tables might conflict with accesses to those tables. [#159298729]
This commit is contained in:
parent
93168ae5cc
commit
a87f5b1916
4
Makefile
4
Makefile
|
@ -231,6 +231,10 @@ ifdef CREDIT_FLOW_TRACING
|
|||
RMQ_ERLC_OPTS += -DCREDIT_FLOW_TRACING=true
|
||||
endif
|
||||
|
||||
ifdef DEBUG_FF
|
||||
RMQ_ERLC_OPTS += -DDEBUG_QUORUM_QUEUE_FF=true
|
||||
endif
|
||||
|
||||
ifndef USE_PROPER_QC
|
||||
# PropEr needs to be installed for property checking
|
||||
# http://proper.softlab.ntua.gr/
|
||||
|
|
|
@ -111,3 +111,26 @@
|
|||
?amqqueue_v2_vhost(Q) =:= VHost) orelse
|
||||
(?is_amqqueue_v1(Q) andalso
|
||||
?amqqueue_v1_vhost(Q) =:= VHost))).
|
||||
|
||||
-ifdef(DEBUG_QUORUM_QUEUE_FF).
|
||||
-define(enable_quorum_queue_if_debug,
|
||||
begin
|
||||
rabbit_log:info(
|
||||
"---- ENABLING quorum_queue as part of "
|
||||
"?try_mnesia_tx_or_upgrade_amqqueue_and_retry() ----"),
|
||||
ok = rabbit_feature_flags:enable(quorum_queue)
|
||||
end).
|
||||
-else.
|
||||
-define(enable_quorum_queue_if_debug, noop).
|
||||
-endif.
|
||||
|
||||
-define(try_mnesia_tx_or_upgrade_amqqueue_and_retry(Expr1, Expr2),
|
||||
try
|
||||
?enable_quorum_queue_if_debug,
|
||||
Expr1
|
||||
catch
|
||||
throw:{error, {bad_type, T}} when ?is_amqqueue(T) ->
|
||||
Expr2;
|
||||
throw:{aborted, {bad_type, T}} when ?is_amqqueue(T) ->
|
||||
Expr2
|
||||
end).
|
||||
|
|
|
@ -30,7 +30,8 @@
|
|||
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
|
||||
emit_info_all/5, list_local/1, info_local/1,
|
||||
emit_info_local/4, emit_info_down/4]).
|
||||
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
|
||||
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
|
||||
list_with_possible_retry/1]).
|
||||
-export([list_by_type/1]).
|
||||
-export([notify_policy_changed/1]).
|
||||
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
|
||||
|
@ -297,6 +298,11 @@ start(Qs) ->
|
|||
ok.
|
||||
|
||||
mark_local_durable_queues_stopped(VHost) ->
|
||||
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
|
||||
do_mark_local_durable_queues_stopped(VHost),
|
||||
do_mark_local_durable_queues_stopped(VHost)).
|
||||
|
||||
do_mark_local_durable_queues_stopped(VHost) ->
|
||||
Qs = find_local_durable_classic_queues(VHost),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
|
@ -426,13 +432,21 @@ get_queue_type(Args) ->
|
|||
erlang:binary_to_existing_atom(V, utf8)
|
||||
end.
|
||||
|
||||
internal_declare(Q, true) ->
|
||||
internal_declare(Q, Recover) ->
|
||||
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
|
||||
do_internal_declare(Q, Recover),
|
||||
begin
|
||||
Q1 = amqqueue:upgrade(Q),
|
||||
do_internal_declare(Q1, Recover)
|
||||
end).
|
||||
|
||||
do_internal_declare(Q, true) ->
|
||||
rabbit_misc:execute_mnesia_tx_with_tail(
|
||||
fun () ->
|
||||
ok = store_queue(amqqueue:set_state(Q, live)),
|
||||
rabbit_misc:const({created, Q})
|
||||
end);
|
||||
internal_declare(Q, false) ->
|
||||
do_internal_declare(Q, false) ->
|
||||
QueueName = amqqueue:get_name(Q),
|
||||
rabbit_misc:execute_mnesia_tx_with_tail(
|
||||
fun () ->
|
||||
|
@ -468,6 +482,14 @@ update(Name, Fun) ->
|
|||
%% only really used for quorum queues to ensure the rabbit_queue record
|
||||
%% is initialised
|
||||
ensure_rabbit_queue_record_is_initialized(Q) ->
|
||||
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
|
||||
do_ensure_rabbit_queue_record_is_initialized(Q),
|
||||
begin
|
||||
Q1 = amqqueue:upgrade(Q),
|
||||
do_ensure_rabbit_queue_record_is_initialized(Q1)
|
||||
end).
|
||||
|
||||
do_ensure_rabbit_queue_record_is_initialized(Q) ->
|
||||
rabbit_misc:execute_mnesia_tx_with_tail(
|
||||
fun () ->
|
||||
ok = store_queue(Q),
|
||||
|
@ -794,7 +816,11 @@ check_queue_type({Type, _}, _Args) ->
|
|||
{error, {unacceptable_type, Type}}.
|
||||
|
||||
|
||||
list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
|
||||
list() ->
|
||||
list_with_possible_retry(fun do_list/0).
|
||||
|
||||
do_list() ->
|
||||
mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
|
||||
|
||||
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
|
||||
|
||||
|
@ -828,9 +854,12 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
|
|||
list(VHostPath) ->
|
||||
list(VHostPath, rabbit_queue).
|
||||
|
||||
list(VHostPath, TableName) ->
|
||||
list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end).
|
||||
|
||||
%% Not dirty_match_object since that would not be transactional when used in a
|
||||
%% tx context
|
||||
list(VHostPath, TableName) ->
|
||||
do_list(VHostPath, TableName) ->
|
||||
mnesia:async_dirty(
|
||||
fun () ->
|
||||
mnesia:match_object(
|
||||
|
@ -839,6 +868,38 @@ list(VHostPath, TableName) ->
|
|||
read)
|
||||
end).
|
||||
|
||||
list_with_possible_retry(Fun) ->
|
||||
%% amqqueue migration:
|
||||
%% The `rabbit_queue` or `rabbit_durable_queue` tables
|
||||
%% might be migrated between the time we query the pattern
|
||||
%% (with the `amqqueue` module) and the time we call
|
||||
%% `mnesia:dirty_match_object()`. This would lead to an empty list
|
||||
%% (no object matching the now incorrect pattern), not a Mnesia
|
||||
%% error.
|
||||
%%
|
||||
%% So if the result is an empty list and the version of the
|
||||
%% `amqqueue` record changed in between, we retry the operation.
|
||||
%%
|
||||
%% However, we don't do this if inside a Mnesia transaction: we
|
||||
%% could end up with a live lock between this started transaction
|
||||
%% and the Mnesia table migration which is blocked (but the
|
||||
%% rabbit_feature_flags lock is held).
|
||||
AmqqueueRecordVersion = amqqueue:record_version_to_use(),
|
||||
case Fun() of
|
||||
[] ->
|
||||
case mnesia:is_transaction() of
|
||||
true ->
|
||||
[];
|
||||
false ->
|
||||
case amqqueue:record_version_to_use() of
|
||||
AmqqueueRecordVersion -> [];
|
||||
_ -> Fun()
|
||||
end
|
||||
end;
|
||||
Ret ->
|
||||
Ret
|
||||
end.
|
||||
|
||||
list_down(VHostPath) ->
|
||||
case rabbit_vhost:exists(VHostPath) of
|
||||
false -> [];
|
||||
|
@ -859,13 +920,21 @@ count(VHost) ->
|
|||
%% won't work here because with master migration of mirrored queues
|
||||
%% the "ownership" of queues by nodes becomes a non-trivial problem
|
||||
%% that requires a proper consensus algorithm.
|
||||
length(mnesia:dirty_index_read(rabbit_queue, VHost, amqqueue:field_vhost()))
|
||||
length(list_for_count(VHost))
|
||||
catch _:Err ->
|
||||
rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n",
|
||||
[VHost, Err]),
|
||||
0
|
||||
end.
|
||||
|
||||
list_for_count(VHost) ->
|
||||
list_with_possible_retry(
|
||||
fun() ->
|
||||
mnesia:dirty_index_read(rabbit_queue,
|
||||
VHost,
|
||||
amqqueue:field_vhost())
|
||||
end).
|
||||
|
||||
info_keys() -> rabbit_amqqueue_process:info_keys().
|
||||
|
||||
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
|
||||
|
|
|
@ -294,6 +294,9 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
|
|||
fun() ->
|
||||
[Q] = mnesia:read({rabbit_queue, QName}),
|
||||
Q2 = amqqueue:set_state(Q, stopped),
|
||||
%% amqqueue migration:
|
||||
%% The amqqueue was read from this transaction, no need
|
||||
%% to handle migration.
|
||||
rabbit_amqqueue:store_queue(Q2)
|
||||
end),
|
||||
BQ:terminate(R, BQS)
|
||||
|
@ -320,7 +323,12 @@ terminate(_Reason, State = #q{q = Q}) ->
|
|||
Q2 = amqqueue:set_state(Q, crashed),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
rabbit_amqqueue:store_queue(Q2)
|
||||
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
|
||||
rabbit_amqqueue:store_queue(Q2),
|
||||
begin
|
||||
Q3 = amqqueue:upgrade(Q2),
|
||||
rabbit_amqqueue:store_queue(Q3)
|
||||
end)
|
||||
end),
|
||||
BQS
|
||||
end, State).
|
||||
|
|
|
@ -115,6 +115,9 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
|
|||
GMPids1 = [{GM, Self} | GMPids0],
|
||||
Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
|
||||
Q3 = amqqueue:set_state(Q2, live),
|
||||
%% amqqueue migration:
|
||||
%% The amqqueue was read from this transaction, no
|
||||
%% need to handle migration.
|
||||
ok = rabbit_amqqueue:store_queue(Q3)
|
||||
end,
|
||||
ok = rabbit_misc:execute_mnesia_transaction(Fun),
|
||||
|
|
|
@ -300,6 +300,9 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
|
|||
RS1 = update_recoverable(SPids, RS0),
|
||||
Q2 = amqqueue:set_recoverable_slaves(Q1, RS1),
|
||||
Q3 = amqqueue:set_state(Q2, live),
|
||||
%% amqqueue migration:
|
||||
%% The amqqueue was read from this transaction, no need to handle
|
||||
%% migration.
|
||||
ok = rabbit_amqqueue:store_queue(Q3),
|
||||
%% Wake it up so that we emit a stats event
|
||||
rabbit_amqqueue:notify_policy_changed(Q3),
|
||||
|
|
|
@ -184,7 +184,11 @@ recover() ->
|
|||
%% variants.
|
||||
recover0() ->
|
||||
Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
|
||||
Qs = mnesia:dirty_match_object(rabbit_durable_queue, amqqueue:pattern_match_all()),
|
||||
Qs = rabbit_amqqueue:list_with_possible_retry(
|
||||
fun() ->
|
||||
mnesia:dirty_match_object(
|
||||
rabbit_durable_queue, amqqueue:pattern_match_all())
|
||||
end),
|
||||
Policies = list(),
|
||||
OpPolicies = list_op(),
|
||||
[rabbit_misc:execute_mnesia_transaction(
|
||||
|
@ -203,10 +207,18 @@ recover0() ->
|
|||
OpPolicy1 = match(QName, OpPolicies),
|
||||
Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
|
||||
Q3 = rabbit_queue_decorator:set(Q2),
|
||||
F = fun () ->
|
||||
mnesia:write(rabbit_durable_queue, Q3, write)
|
||||
end,
|
||||
rabbit_misc:execute_mnesia_transaction(F)
|
||||
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
mnesia:write(rabbit_durable_queue, Q3, write)
|
||||
end),
|
||||
begin
|
||||
Q4 = amqqueue:upgrade(Q3),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
mnesia:write(rabbit_durable_queue, Q4, write)
|
||||
end)
|
||||
end)
|
||||
end || Q0 <- Qs],
|
||||
ok.
|
||||
|
||||
|
|
Loading…
Reference in New Issue