Make dlx_worker terminate itself if leader is down

If source quorum queue leader is down (e.g. process crashes),
co-located rabbit_fifo_dlx_worker will terminate itself.

The new leader (on same node or different node) will re-create the
rabbit_fifo_dlx_worker.

That's cleaner compared to the previous approach where the new Ra server
process on same node took care of the rabbit_fifo_dlx_worker
termination.
This commit is contained in:
David Ansari 2021-12-13 20:45:09 +01:00
parent 9873b24340
commit cd785fcea2
3 changed files with 35 additions and 11 deletions

View File

@ -16,7 +16,6 @@
checkout/1,
state_enter/4,
start_worker/2,
terminate_worker/1,
cleanup/1,
purge/1,
consumer_pid/1,

View File

@ -66,6 +66,8 @@
%% There is one rabbit_fifo_dlx_worker per source quorum queue
%% (if dead-letter-strategy at-least-once is used).
queue_ref :: rabbit_amqqueue:name(),
%% monitors source queue
monitor_ref :: reference(),
%% configured (x-)dead-letter-exchange of source queue
exchange_ref,
%% configured (x-)dead-letter-routing-key of source queue
@ -120,7 +122,9 @@ handle_continue({QRef, RegName}, undefined) ->
QRef,
{ClusterName, node()},
Prefetch),
{noreply, State#state{dlx_client_state = ConsumerState}}.
MonitorRef = erlang:monitor(process, ClusterName),
{noreply, State#state{dlx_client_state = ConsumerState,
monitor_ref = MonitorRef}}.
terminate(_Reason, _State) ->
%%TODO cancel timer?
@ -173,8 +177,14 @@ redeliver_and_ack(State0) ->
State = maybe_set_timer(State2),
{noreply, State}.
%%TODO monitor source quorum queue upon init / handle_continue and terminate ourself if source quorum queue is DOWN
%% since new leader will re-create a worker
handle_info({'DOWN', Ref, process, _, _},
#state{monitor_ref = Ref,
queue_ref = QRef}) ->
%% Source quorum queue is down. Therefore, terminate ourself.
%% The new leader will re-create another dlx_worker.
rabbit_log:debug("~s terminating itself because leader of ~s is down...",
[?MODULE, rabbit_misc:rs(QRef)]),
supervisor:terminate_child(rabbit_fifo_dlx_sup, self());
handle_info({'DOWN', _MRef, process, QPid, Reason},
#state{queue_type_state = QTypeState0} = State0) ->
%% received from target classic queue

View File

@ -600,20 +600,35 @@ single_dlx_worker(Config) ->
[_, {active, 0}, _, _],
[_, {active, 0}, _, _]],
rabbit_ct_broker_helpers:rpc_all(Config, supervisor, count_children, [rabbit_fifo_dlx_sup])),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
RaName = ra_name(SourceQ),
{ok, _, {_, Leader}} = ra:members({RaName, Server2}),
?assertNotEqual(Server1, Leader),
[Follower] = Servers -- [Server1, Leader],
assert_active_dlx_workers(1, Config, Leader),
assert_active_dlx_workers(0, Config, Follower),
{ok, _, {_, Leader0}} = ra:members({RaName, Server2}),
?assertNotEqual(Server1, Leader0),
[Follower0] = Servers -- [Server1, Leader0],
assert_active_dlx_workers(1, Config, Leader0),
assert_active_dlx_workers(0, Config, Follower0),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
assert_active_dlx_workers(0, Config, Server1).
consistently(
?_assertMatch(
[_, {active, 0}, _, _],
rabbit_ct_broker_helpers:rpc(Config, Server1, supervisor, count_children, [rabbit_fifo_dlx_sup], 1000))),
Pid = rabbit_ct_broker_helpers:rpc(Config, Leader0, erlang, whereis, [RaName]),
true = rabbit_ct_broker_helpers:rpc(Config, Leader0, erlang, exit, [Pid, kill]),
{ok, _, {_, Leader1}} = ?awaitMatch({ok, _, _},
ra:members({RaName, Follower0}),
1000),
?assertNotEqual(Leader0, Leader1),
[Follower1, Follower2] = Servers -- [Leader1],
assert_active_dlx_workers(0, Config, Follower1),
assert_active_dlx_workers(0, Config, Follower2),
assert_active_dlx_workers(1, Config, Leader1).
assert_active_dlx_workers(N, Config, Server) ->
?assertMatch(
[_, {active, N}, _, _],
rabbit_ct_broker_helpers:rpc(Config, Server, supervisor, count_children, [rabbit_fifo_dlx_sup])).
rabbit_ct_broker_helpers:rpc(Config, Server, supervisor, count_children, [rabbit_fifo_dlx_sup], 1000)).
%%TODO move to rabbitmq_ct_helpers/include/rabbit_assert.hrl
consistently(TestObj) ->