Make some test suite 3 node instead of 2

QQs cannot be declared in two node clusters with different machine
versions. This setup should be an anomaly so changing some suites to
better represent sensible conigurations. This should also allow some
mixed-versions tests to pass.
This commit is contained in:
kjnilsson 2020-07-13 16:24:10 +01:00
parent ce230316d4
commit b5ec2249f6
5 changed files with 43 additions and 37 deletions

View File

@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason,
record_rejects(RejectMXs, State1)
end
end;
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) ->
%% this should never happen after the queue type refactoring in 3.9
State.
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,

View File

@ -144,29 +144,32 @@ enqueue(Correlation, Msg,
cfg = #cfg{}} = State0) ->
%% it is the first enqueue, check the version
{_, Node} = Server = pick_server(State0),
State =
case rpc:call(Node, rabbit_fifo, version, []) of
0 ->
%% the leader is running the old version
%% so we can't initialize the enqueuer session safely
State0#state{queue_status = go};
1 ->
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
case ra:process_command(Server, Reg) of
{ok, reject_publish, _} ->
State0#state{queue_status = reject_publish};
{ok, ok, _} ->
State0#state{queue_status = go};
Err ->
exit(Err)
end;
{badrpc, nodedown} ->
rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
State0#state{queue_status = go}
end,
enqueue(Correlation, Msg, State);
case rpc:call(Node, rabbit_fifo, version, []) of
0 ->
%% the leader is running the old version
%% so we can't initialize the enqueuer session safely
%% fall back on old behavour
enqueue(Correlation, Msg, State0#state{queue_status = go});
1 ->
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
case ra:process_command(Server, Reg) of
{ok, reject_publish, _} ->
{reject_publish, State0#state{queue_status = reject_publish}};
{ok, ok, _} ->
enqueue(Correlation, Msg, State0#state{queue_status = go});
{timeout, _} ->
%% if we timeout it is probably better to reject
%% the message than being uncertain
{reject_publish, State0};
Err ->
exit(Err)
end;
{badrpc, nodedown} ->
rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
State0#state{queue_status = go}
end;
enqueue(_Correlation, _Msg,
#state{queue_status = reject_publish,
cfg = #cfg{}} = State) ->

View File

@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}

View File

@ -24,15 +24,13 @@ all() ->
groups() ->
[
{clustered, [], [
{cluster_size_2, [], [
{cluster_size_3, [], [
recover_follower_after_standalone_restart,
vhost_deletion,
force_delete_if_no_consensus,
takeover_on_failure,
takeover_on_shutdown,
quorum_unaffected_after_vhost_failure
]},
{cluster_size_3, [], [
recover_follower_after_standalone_restart
]}
]}
].
@ -108,7 +106,7 @@ vhost_deletion(Config) ->
ok.
force_delete_if_no_consensus(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
Args = ?config(queue_args, Config),
@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
ok = rabbit_ct_broker_helpers:stop_node(Config, C),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
?assertMatch(
@ -140,7 +139,7 @@ takeover_on_shutdown(Config) ->
takeover_on(Config, stop_node).
takeover_on(Config, Fun) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
@ -152,6 +151,7 @@ takeover_on(Config, Fun) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:Fun(Config, C),
ok = rabbit_ct_broker_helpers:Fun(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
@ -170,7 +170,7 @@ takeover_on(Config, Fun) ->
ok.
quorum_unaffected_after_vhost_failure(Config) ->
[A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Servers = lists:sort(Servers0),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),

View File

@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@ -302,21 +302,23 @@ confirm_nack1(Config) ->
%% The closest to a nack behaviour that we can get on quorum queues is not answering while
%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued.
confirm_minority(Config) ->
[_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, C),
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
receive
#'basic.nack'{} -> throw(unexpected_nack);
#'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(unexpected_ack)
after 30000 ->
after 120000 ->
ok
end,
ok = rabbit_ct_broker_helpers:start_node(Config, B),
publish(Ch, QName, [<<"msg2">>]),
receive
#'basic.nack'{} -> throw(unexpected_nack);
#'basic.ack'{} -> ok