diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74d400950e..1755b4b8e2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason, record_rejects(RejectMXs, State1) end end; -handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> - error(quorum_queues_should_never_be_monitored). +handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) -> + %% this should never happen after the queue type refactoring in 3.9 + State. handle_consuming_queue_down_or_eol(QRef, State = #ch{queue_consumers = QCons, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 83207d7bf9..9a6cd32a7b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -144,29 +144,32 @@ enqueue(Correlation, Msg, cfg = #cfg{}} = State0) -> %% it is the first enqueue, check the version {_, Node} = Server = pick_server(State0), - State = - case rpc:call(Node, rabbit_fifo, version, []) of - 0 -> - %% the leader is running the old version - %% so we can't initialize the enqueuer session safely - State0#state{queue_status = go}; - 1 -> - %% were running the new version on the leader do sync initialisation - %% of enqueuer session - Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg) of - {ok, reject_publish, _} -> - State0#state{queue_status = reject_publish}; - {ok, ok, _} -> - State0#state{queue_status = go}; - Err -> - exit(Err) - end; - {badrpc, nodedown} -> - rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), - State0#state{queue_status = go} - end, - enqueue(Correlation, Msg, State); + case rpc:call(Node, rabbit_fifo, version, []) of + 0 -> + %% the leader is running the old version + %% so we can't initialize the enqueuer session safely + %% fall back on old behavour + enqueue(Correlation, Msg, State0#state{queue_status = go}); + 1 -> + %% were running the new version on the leader do sync initialisation + %% of enqueuer session + Reg = rabbit_fifo:make_register_enqueuer(self()), + case ra:process_command(Server, Reg) of + {ok, reject_publish, _} -> + {reject_publish, State0#state{queue_status = reject_publish}}; + {ok, ok, _} -> + enqueue(Correlation, Msg, State0#state{queue_status = go}); + {timeout, _} -> + %% if we timeout it is probably better to reject + %% the message than being uncertain + {reject_publish, State0}; + Err -> + exit(Err) + end; + {badrpc, nodedown} -> + rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), + State0#state{queue_status = go} + end; enqueue(_Correlation, _Msg, #state{queue_status = reject_publish, cfg = #cfg{}} = State) -> diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index d1196e79fc..87b5566c57 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index 0376b2b838..c952ee822b 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -24,15 +24,13 @@ all() -> groups() -> [ {clustered, [], [ - {cluster_size_2, [], [ + {cluster_size_3, [], [ + recover_follower_after_standalone_restart, vhost_deletion, force_delete_if_no_consensus, takeover_on_failure, takeover_on_shutdown, quorum_unaffected_after_vhost_failure - ]}, - {cluster_size_3, [], [ - recover_follower_after_standalone_restart ]} ]} ]. @@ -108,7 +106,7 @@ vhost_deletion(Config) -> ok. force_delete_if_no_consensus(Config) -> - [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = ?config(queue_name, Config), Args = ?config(queue_args, Config), @@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) -> rabbit_ct_client_helpers:publish(ACh, QName, 10), ok = rabbit_ct_broker_helpers:restart_node(Config, B), ok = rabbit_ct_broker_helpers:stop_node(Config, A), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), BCh = rabbit_ct_client_helpers:open_channel(Config, B), ?assertMatch( @@ -140,7 +139,7 @@ takeover_on_shutdown(Config) -> takeover_on(Config, stop_node). takeover_on(Config, Fun) -> - [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = ?config(queue_name, Config), @@ -152,6 +151,7 @@ takeover_on(Config, Fun) -> rabbit_ct_client_helpers:publish(ACh, QName, 10), ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:Fun(Config, C), ok = rabbit_ct_broker_helpers:Fun(Config, A), BCh = rabbit_ct_client_helpers:open_channel(Config, B), @@ -170,7 +170,7 @@ takeover_on(Config, Fun) -> ok. quorum_unaffected_after_vhost_failure(Config) -> - [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Servers = lists:sort(Servers0), ACh = rabbit_ct_client_helpers:open_channel(Config, A), diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index 410dabb08a..c31527f0ba 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -302,21 +302,23 @@ confirm_nack1(Config) -> %% The closest to a nack behaviour that we can get on quorum queues is not answering while %% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued. confirm_minority(Config) -> - [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), ok = rabbit_ct_broker_helpers:stop_node(Config, B), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, QName, [<<"msg1">>]), receive - #'basic.nack'{} -> throw(unexpected_nack); + #'basic.nack'{} -> ok; #'basic.ack'{} -> throw(unexpected_ack) - after 30000 -> + after 120000 -> ok end, ok = rabbit_ct_broker_helpers:start_node(Config, B), + publish(Ch, QName, [<<"msg2">>]), receive #'basic.nack'{} -> throw(unexpected_nack); #'basic.ack'{} -> ok