Throw resource error when no local stream member

As well as some additional tests
This commit is contained in:
kjnilsson 2021-03-12 08:14:17 +00:00
parent e19aca8075
commit 1709208105
4 changed files with 68 additions and 29 deletions

View File

@ -1465,6 +1465,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
{error, global_qos_not_supported_for_queue_type} ->
rabbit_misc:protocol_error(
not_implemented, "~s does not support global qos",
[rabbit_misc:rs(QueueName)]);
{error, no_local_stream_replica_available} ->
rabbit_misc:protocol_error(
resource_error, "~s does not not have a running local replica",
[rabbit_misc:rs(QueueName)])
end;
{ok, _} ->

View File

@ -161,7 +161,8 @@ start_cluster(Q) ->
NewQ0 = amqqueue:set_pid(Q, Id),
NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}),
rabbit_log:debug("Will start up to ~p replicas for quorum queue ~s", [QuorumSize, rabbit_misc:rs(QName)]),
rabbit_log:debug("Will start up to ~w replicas for quorum queue ~s",
[QuorumSize, rabbit_misc:rs(QName)]),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
@ -169,12 +170,14 @@ start_cluster(Q) ->
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
%% as earlier versions may not understand all the config
%% keys
%% TODO: handle error - what should be done if the
%% config cannot be updated
ok = rabbit_fifo_client:update_machine_state(Id,
ra_machine_config(NewQ)),
%% force a policy change to ensure the latest config is
%% updated even when running the machine version from 0
notify_decorators(QName, startup),
rabbit_event:notify(queue_created,
[{name, QName},

View File

@ -312,8 +312,6 @@ apply(Meta, {down, Pid, Reason} = Cmd,
Effects0 = case Reason of
noconnection ->
[{monitor, node, node(Pid)}];
shutdown ->
[{monitor, node, node(Pid)}];
_ ->
[]
end,
@ -407,15 +405,24 @@ return(#{index := Idx}, State, Reply, Effects) ->
state_enter(recover, _) ->
put('$rabbit_vm_category', ?MODULE),
[];
state_enter(leader, #?MODULE{monitors = Monitors}) ->
state_enter(leader, #?MODULE{streams = Streams,
monitors = Monitors}) ->
Pids = maps:keys(Monitors),
Nodes = maps:from_list([{node(P), ok} || P <- Pids]),
NodeMons = [{monitor, node, N} || N <- maps:keys(Nodes)],
%% monitor all the known nodes
Nodes = all_member_nodes(Streams),
NodeMons = [{monitor, node, N} || N <- Nodes],
NodeMons ++ [{aux, fail_active_actions} |
[{monitor, process, P} || P <- Pids]];
state_enter(_S, _) ->
[].
all_member_nodes(Streams) ->
maps:keys(
maps:fold(
fun (_, #stream{members = M}, Acc) ->
maps:merge(Acc, M)
end, #{}, Streams)).
tick(_Ts, _State) ->
[{aux, maybe_resize_coordinator_cluster}].
@ -430,17 +437,17 @@ maybe_resize_coordinator_cluster() ->
[] ->
ok;
New ->
rabbit_log:warning("~s: New rabbit node(s) detected, "
"adding : ~w",
[?MODULE, New]),
rabbit_log:info("~s: New rabbit node(s) detected, "
"adding : ~w",
[?MODULE, New]),
add_members(Members, New)
end,
case MemberNodes -- All of
[] ->
ok;
Old ->
rabbit_log:warning("~s: Rabbit node(s) removed from the cluster, "
"deleting: ~w", [?MODULE, Old]),
rabbit_log:info("~s: Rabbit node(s) removed from the cluster, "
"deleting: ~w", [?MODULE, Old]),
remove_members(Members, Old)
end;
_ ->
@ -592,8 +599,8 @@ phase_start_replica(StreamId, #{epoch := Epoch,
fun() ->
try osiris_replica:start(Node, Conf0) of
{ok, Pid} ->
rabbit_log:debug("~s: ~s: replica started on ~s in ~b",
[?MODULE, StreamId, Node, Epoch]),
rabbit_log:debug("~s: ~s: replica started on ~s in ~b pid ~w",
[?MODULE, StreamId, Node, Epoch, Pid]),
send_self_command({member_started, StreamId,
Args#{pid => Pid}});
{error, already_present} ->
@ -1078,8 +1085,7 @@ update_stream(#{system_time := _Ts},
Stream0#stream{members = Members};
#{DownNode := #member{role = {replica, _},
state = {running, _, Pid}} = Member}
when Reason == noconnection orelse
Reason == shutdown ->
when Reason == noconnection ->
%% mark process as disconnected such that we don't set it to down until
%% the node is back and we can re-monitor
Members = Members0#{DownNode =>
@ -1375,10 +1381,13 @@ fail_action(StreamId, #member{role = {_, E},
ensure_monitors(#stream{id = StreamId,
members = Members}, Monitors, Effects) ->
maps:fold(
fun (_, #member{state = {running, _, Pid}}, {M, E})
fun
(_, #member{state = {running, _, Pid}}, {M, E})
when not is_map_key(Pid, M) ->
{M#{Pid => {StreamId, member}},
[{monitor, process, Pid} | E]};
[{monitor, process, Pid},
%% ensure we're always monitoring the node as well
{monitor, node, node(Pid)} | E]};
(_, _, Acc) ->
Acc
end, {Monitors, Effects}, Members).

View File

@ -516,6 +516,14 @@ recover(Config) ->
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 120)
end || Servers <- permute(Servers0)],
[begin
ct:pal("recover: running app stop start for permuation ~w", [Servers]),
[rabbit_control_helper:command(stop_app, S) || S <- Servers],
[rabbit_control_helper:command(start_app, S) || S <- lists:reverse(Servers)],
ct:pal("recover: running app stop waiting for messages ~w", [Servers]),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 120)
end || Servers <- permute(Servers0)],
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
publish(Ch1, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]),
@ -1177,28 +1185,43 @@ max_age(Config) ->
?assertEqual(100, length(receive_batch())).
replica_recovery(Config) ->
[Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server1 | _] = lists:reverse(Nodes),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
amqp_channel:close(Ch1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
timer:sleep(2000),
[begin
[DownNode | _] = PNodes,
rabbit_control_helper:command(stop_app, DownNode),
rabbit_control_helper:command(start_app, DownNode),
timer:sleep(6000),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, DownNode),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
amqp_channel:close(Ch2)
end || PNodes <- permute(Nodes)],
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
qos(Ch2, 10, false),
[begin
[DownNode | _] = PNodes,
ok = rabbit_ct_broker_helpers:stop_node(Config, DownNode),
ok = rabbit_ct_broker_helpers:start_node(Config, DownNode),
timer:sleep(6000),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, DownNode),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
amqp_channel:close(Ch2)
end || PNodes <- permute(Nodes)],
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
ok.
leader_failover(Config) ->