Recover bindings for all durable queues including failed to recover.
If a queue fails to recover it may still be restarted by the supervisor and eventually start. After that some bindings may be in rabbit_durable_route but not rabbit_route. This can cause binding not found errors. If bindings are recovered for failed queues, the behaviour will be the same as for the crashed queues. (which is currently broken but needs to be fixed separately) Addresses #1873 [#163919158]
This commit is contained in:
parent
b0dfe9352f
commit
0d0f39d8a3
|
|
@ -108,12 +108,16 @@ warn_file_limit() ->
|
|||
ok
|
||||
end.
|
||||
|
||||
-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
|
||||
-spec recover(rabbit_types:vhost()) ->
|
||||
{ClassicOk :: [amqqueue:amqqueue()],
|
||||
ClassicFailed :: [amqqueue:amqqueue()],
|
||||
Quorum :: [amqqueue:amqqueue()]}.
|
||||
|
||||
recover(VHost) ->
|
||||
Classic = find_local_durable_classic_queues(VHost),
|
||||
Quorum = find_local_quorum_queues(VHost),
|
||||
recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
|
||||
{ClassicOk, ClassicFailed} = recover_classic_queues(VHost, Classic),
|
||||
{ClassicOk, ClassicFailed, rabbit_quorum_queue:recover(Quorum)}.
|
||||
|
||||
recover_classic_queues(VHost, Queues) ->
|
||||
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
|
||||
|
|
@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
|
|||
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
|
||||
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
|
||||
{ok, _} ->
|
||||
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
|
||||
OkQueues = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
|
||||
OkQueuesNames = [amqqueue:get_name(Q) || Q <- OkQueues],
|
||||
FailedQueues = [Q || Q <- Queues,
|
||||
not lists:member(amqqueue:get_name(Q), OkQueuesNames)],
|
||||
{OkQueues, FailedQueues};
|
||||
{error, Reason} ->
|
||||
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
|
||||
throw({error, Reason})
|
||||
end.
|
||||
|
||||
filter_per_type(Queues) ->
|
||||
lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
|
||||
|
||||
filter_pid_per_type(QPids) ->
|
||||
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
|
||||
|
||||
|
|
@ -156,12 +161,14 @@ stop(VHost) ->
|
|||
-spec start([amqqueue:amqqueue()]) -> 'ok'.
|
||||
|
||||
start(Qs) ->
|
||||
{Classic, _Quorum} = filter_per_type(Qs),
|
||||
%% At this point all recovered queues and their bindings are
|
||||
%% visible to routing, so now it is safe for them to complete
|
||||
%% their initialisation (which may involve interacting with other
|
||||
%% queues).
|
||||
_ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
|
||||
_ = [amqqueue:get_pid(Q) ! {self(), go}
|
||||
|| Q <- Qs,
|
||||
%% All queues are supposed to be classic here.
|
||||
amqqueue:is_classic(Q)],
|
||||
ok.
|
||||
|
||||
mark_local_durable_queues_stopped(VHost) ->
|
||||
|
|
|
|||
|
|
@ -286,8 +286,7 @@ reductions(Name) ->
|
|||
0
|
||||
end.
|
||||
|
||||
-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
|
||||
{'absent', amqqueue:amqqueue(), atom()}].
|
||||
-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
|
||||
|
||||
recover(Queues) ->
|
||||
[begin
|
||||
|
|
|
|||
|
|
@ -53,10 +53,11 @@ recover(VHost) ->
|
|||
VHostStubFile = filename:join(VHostDir, ".vhost"),
|
||||
ok = rabbit_file:ensure_dir(VHostStubFile),
|
||||
ok = file:write_file(VHostStubFile, VHost),
|
||||
Qs = rabbit_amqqueue:recover(VHost),
|
||||
{ClassicOk, ClassicFailed, Quorum} = rabbit_amqqueue:recover(VHost),
|
||||
Qs = ClassicOk ++ ClassicFailed ++ Quorum,
|
||||
QNames = [amqqueue:get_name(Q) || Q <- Qs],
|
||||
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
|
||||
ok = rabbit_amqqueue:start(Qs),
|
||||
ok = rabbit_amqqueue:start(ClassicOk),
|
||||
%% Start queue mirrors.
|
||||
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
|
||||
ok.
|
||||
|
|
|
|||
|
|
@ -733,7 +733,8 @@ bq_queue_recover1(Config) ->
|
|||
after 10000 -> exit(timeout_waiting_for_queue_death)
|
||||
end,
|
||||
rabbit_amqqueue:stop(?VHOST),
|
||||
rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
|
||||
{Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
|
||||
rabbit_amqqueue:start(Recovered),
|
||||
{ok, Limiter} = rabbit_limiter:start_link(no_id),
|
||||
rabbit_amqqueue:with_or_die(
|
||||
QName,
|
||||
|
|
|
|||
Loading…
Reference in New Issue