Disallow removal of the last stream member

This commit is contained in:
dcorbacho 2021-04-30 17:25:06 +02:00
parent a20cd9ea38
commit bcac37d442
2 changed files with 67 additions and 19 deletions

View File

@ -341,26 +341,31 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
Meta = maps:without([term, machine_version], Meta0),
Stream1 = update_stream(Meta, Cmd, Stream0),
Reply = case Stream1 of
#stream{reply_to = undefined} ->
ok;
case filter_command(Meta, Cmd, Stream0) of
ok ->
Stream1 = update_stream(Meta, Cmd, Stream0),
Reply = case Stream1 of
#stream{reply_to = undefined} ->
ok;
_ ->
%% reply_to is set so we'll reply later
'$ra_no_reply'
end,
case Stream1 of
undefined ->
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
Reply, []);
_ ->
%% reply_to is set so we'll reply later
'$ra_no_reply'
end,
case Stream1 of
undefined ->
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
Reply, []);
_ ->
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
return(Meta,
State0#?MODULE{streams = Streams0#{StreamId => Stream},
monitors = Monitors}, Reply, Effects)
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
return(Meta,
State0#?MODULE{streams = Streams0#{StreamId => Stream},
monitors = Monitors}, Reply, Effects)
end;
Reply ->
return(Meta, State0, Reply, [])
end;
apply(Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
@ -874,6 +879,23 @@ make_ra_conf(Node, Nodes) ->
machine => {module, ?MODULE, #{}},
ra_event_formatter => Formatter}.
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{members = Members0}) ->
Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted ->
true;
(_, _) ->
false
end, Members0),
case maps:size(Members) =< 1 of
true ->
rabbit_log:warning(
"~s failed to delete ~p replica, last cluster member",
[?MODULE, Node]),
{error, last_stream_member};
false ->
ok
end;
filter_command(_, _, _) ->
ok.
update_stream(Meta, Cmd, Stream) ->
try
@ -1536,6 +1558,7 @@ set_running_to_stopped(Members) ->
(_, M) ->
M
end, Members).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

View File

@ -53,6 +53,7 @@ groups() ->
leader_failover_dedupe,
add_replicas]},
{cluster_size_3_parallel, [parallel], [delete_replica,
delete_last_replica,
delete_classic_replica,
delete_quorum_replica,
consume_from_replica,
@ -444,6 +445,30 @@ delete_replica(Config) ->
check_leader_and_replicas(Config, [Server0]),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
delete_last_replica(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server2])),
%% check they're gone
check_leader_and_replicas(Config, [Server0]),
%% delete the last one
?assertEqual({error, last_stream_member},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server0])),
%% It's still here
check_leader_and_replicas(Config, [Server0]),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
grow_coordinator_cluster(Config) ->
[Server0, Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),