Fix restart of stream coordinator when there are no stream queues

Recovering from an existing queue is fine but if a node is restarted when
there are no longer stream queues on the system, the recovery process won't
restart the pre-existing coordinator as that's only performed on queue recovery.
The first attempt to declare a new stream queue on this cluster will crash with
`coordinator unavailable` error, as it only restarts the local coordinator
and not the whole ra cluster, thus lacking quorum.

Recovering the coordinator during the boot process ensures that a pre-existing
coordinator cluster is restarted in any case, and does nothing if there was
never a coordinator on the node.
This commit is contained in:
dcorbacho 2021-06-23 17:15:45 +02:00
parent c39fa741b8
commit deaa42ecac
3 changed files with 156 additions and 127 deletions

View File

@ -37,6 +37,12 @@
-export([log_overview/1]).
-export([replay/1]).
-rabbit_boot_step({?MODULE,
[{description, "Restart stream coordinator"},
{mfa, {?MODULE, recover, []}},
{requires, core_initialized},
{enables, recovery}]}).
%% exported for unit tests only
-ifdef(TEST).
-export([update_stream/3,
@ -81,8 +87,15 @@
recover() ->
case erlang:whereis(?MODULE) of
undefined ->
ra:restart_server(?RA_SYSTEM, {?MODULE, node()});
_Pid ->
case ra:restart_server(?RA_SYSTEM, {?MODULE, node()}) of
{error, Reason} when Reason == not_started;
Reason == name_not_registered ->
%% First boot, do nothing and wait until the first `declare`
ok;
_ ->
ok
end;
_ ->
ok
end.
@ -271,7 +284,7 @@ ensure_coordinator_started() ->
ok ->
AllNodes;
{error, {already_started, _}} ->
AllNodes;
AllNodes;
_ ->
AllNodes
end,

View File

@ -645,7 +645,7 @@ init(Q) when ?is_amqqueue(Q) ->
writer_id = WriterId,
soft_limit = SoftLimit}};
{error, coordinator_unavailable} = E ->
rabbit_log:warning("Failed to start stream queue ~p: coordinator unavailable",
rabbit_log:warning("Failed to start stream client ~p: coordinator unavailable",
[rabbit_misc:rs(QName)]),
E
end.
@ -819,7 +819,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp/binary>>)).
recover(Q) ->
rabbit_stream_coordinator:recover(),
{ok, Q}.
check_queue_exists_in_local_node(Q) ->

View File

@ -38,8 +38,9 @@ groups() ->
{single_node_parallel, [parallel], all_tests()},
{cluster_size_2, [], [recover]},
{cluster_size_2_parallel, [parallel], all_tests()},
{cluster_size_3, [], [recover]},
{cluster_size_3, [],
[recover,
[restart_coordinator_without_queues,
delete_down_replica,
replica_recovery,
leader_failover,
@ -177,7 +178,8 @@ merge_app_env(Config) ->
{rabbit, [{core_metrics_gc_interval, 100}]}).
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(
Q = ?config(queue_name, Config),
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
@ -241,8 +243,7 @@ declare_invalid_properties(Config) ->
rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare'{queue = Q,
durable = false,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})).
declare_server_named(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@ -354,7 +355,7 @@ add_replicas(Config) ->
% [<<"/">>, Q, Server2])),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
ok.
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
add_replica(Config) ->
[Server0, Server1, Server2] =
@ -412,7 +413,7 @@ add_replica(Config) ->
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, Q, Server2])),
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
delete_replica(Config) ->
[Server0, Server1, Server2] =
@ -455,7 +456,7 @@ delete_last_replica(Config) ->
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server2])),
%% check they're gone
check_leader_and_replicas(Config, [Server0]),
check_leader_and_replicas(Config, [Server0], members),
%% delete the last one
?assertEqual({error, last_stream_member},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
@ -558,7 +559,7 @@ delete_down_replica(Config) ->
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
%% check it isn't gone
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
check_leader_and_replicas(Config, [Server0, Server1, Server2], members),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
rabbit_ct_helpers:await_condition(
fun() ->
@ -596,7 +597,11 @@ publish_coordinator_unavailable(Config) ->
end),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
publish(Ch1, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
publish(Ch1, Q),
amqp_channel:wait_for_confirms(Ch1, 30),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
publish(Config) ->
@ -655,27 +660,55 @@ recover(Config) ->
publish(Ch, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
[begin
ct:pal("recover: running stop start for permuation ~w", [Servers]),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
[rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
ct:pal("recover: running stop waiting for messages ~w", [Servers]),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 120)
end || Servers <- permute(Servers0)],
Perm0 = permute(Servers0),
Servers = lists:nth(rand:uniform(length(Perm0)), Perm0),
%% Such a slow test, let's select a single random permutation and trust that over enough
%% ci rounds any failure will eventually show up
ct:pal("recover: running stop start for permutation ~w", [Servers]),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
[rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
ct:pal("recover: running stop waiting for messages ~w", [Servers]),
check_leader_and_replicas(Config, Servers0),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 60),
%% Another single random permutation
Perm1 = permute(Servers0),
Servers1 = lists:nth(rand:uniform(length(Perm1)), Perm1),
ct:pal("recover: running app stop start for permuation ~w", [Servers1]),
[rabbit_control_helper:command(stop_app, S) || S <- Servers1],
[rabbit_control_helper:command(start_app, S) || S <- lists:reverse(Servers1)],
ct:pal("recover: running app stop waiting for messages ~w", [Servers1]),
check_leader_and_replicas(Config, Servers0),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 60),
[begin
ct:pal("recover: running app stop start for permuation ~w", [Servers]),
[rabbit_control_helper:command(stop_app, S) || S <- Servers],
[rabbit_control_helper:command(start_app, S) || S <- lists:reverse(Servers)],
ct:pal("recover: running app stop waiting for messages ~w", [Servers]),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 120)
end || Servers <- permute(Servers0)],
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
publish(Ch1, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
restart_coordinator_without_queues(Config) ->
%% The coordinator failed to restart if stream queues were not present anymore, as
%% they wouldn't call recover in all nodes - only the local one was restarted so
%% the election wouldn't succeed. Fixed now, but this test checks for that failure
[Server | _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
publish_confirm(Ch, Q, [<<"msg">>]),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers0],
[rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers0)],
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
consume_without_qos(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -717,10 +750,7 @@ consume(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -745,11 +775,8 @@ consume_offset(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 1000)]),
run_proper(
fun () ->
@ -770,7 +797,9 @@ consume_offset(Config) ->
amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
true
end)
end, [], 25),
end, [], 5), %% Run it only 5 times. This test times out quite often, not in the receive
%% clause but ct itself. Consume so many messages so many times could take too long
%% in some CPU configurations. Let's trust that many rounds of CI could find any real failure.
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
consume_timestamp_offset(Config) ->
@ -781,12 +810,7 @@ consume_timestamp_offset(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Payload = <<"111">>,
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"111">> || _ <- lists:seq(1, 100)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -818,11 +842,7 @@ consume_timestamp_last_offset(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"111">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"111">> || _ <- lists:seq(1, 100)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -896,10 +916,7 @@ consume_and_nack(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -927,10 +944,7 @@ basic_cancel(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -968,10 +982,7 @@ consume_and_reject(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -998,10 +1009,7 @@ consume_and_ack(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -1031,10 +1039,7 @@ consume_from_last(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -1062,8 +1067,10 @@ consume_from_last(Config) ->
ok
end,
%% And receive the messages from the last committed offset to the end of the stream
receive_batch(Ch1, CommittedOffset, 99),
%% Check that the first received offset is greater than or equal than the committed
%% offset. It could have moved since we checked it out - it flakes sometimes!
%% Usually when the CommittedOffset detected is 1
receive_batch_min_offset(Ch1, CommittedOffset, 99),
%% Publish a few more
[publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
@ -1088,10 +1095,7 @@ consume_from_next(Config, Args) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -1136,10 +1140,7 @@ consume_from_relative_time_offset(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@ -1166,10 +1167,7 @@ consume_from_replica(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
rabbit_ct_helpers:await_condition(
fun () ->
@ -1198,12 +1196,9 @@ consume_credit(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, NumMsgs)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -1259,12 +1254,9 @@ consume_credit_out_of_order_ack(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
amqp_channel:wait_for_confirms(Ch, 5),
%% Let's publish a big batch, to ensure we have more than a chunk available
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, NumMsgs)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -1321,12 +1313,9 @@ consume_credit_multiple_ack(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [<<"msg1">> || _ <- lists:seq(1, NumMsgs)]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -1358,15 +1347,13 @@ max_length_bytes(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-max-length-bytes">>, long, 500},
{<<"x-stream-max-segment-size-bytes">>, long, 250}])),
{<<"x-max-length-bytes">>, long, 10000},
{<<"x-stream-max-segment-size-bytes">>, long, 1000}])),
Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
Payload = << <<"1">> || _ <- lists:seq(1, 100) >>,
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 500)]), %% 100 bytes/msg * 500 = 50000 bytes
ensure_retention_applied(Config, Server),
%% We don't yet have reliable metrics, as the committed offset doesn't work
%% as a counter once we start applying retention policies.
@ -1375,7 +1362,9 @@ max_length_bytes(Config) ->
qos(Ch1, 100, false),
subscribe(Ch1, Q, false, 0),
?assert(length(receive_batch()) < 100),
%% There should be ~100 messages in ~10 segments, but less check that the retention
%% cleared just a big bunch
?assert(length(receive_batch()) < 200),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
max_age(Config) ->
@ -1390,10 +1379,7 @@ max_age(Config) ->
Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
publish_confirm(Ch, Q, [Payload || _ <- lists:seq(1, 100)]),
timer:sleep(10000),
@ -1417,10 +1403,7 @@ replica_recovery(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
amqp_channel:close(Ch1),
[begin
@ -1446,7 +1429,6 @@ replica_recovery(Config) ->
receive_batch(Ch2, 0, 99),
amqp_channel:close(Ch2)
end || PNodes <- permute(Nodes)],
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_failover(Config) ->
@ -1458,12 +1440,8 @@ leader_failover(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
check_leader_and_replicas(Config, [Server1, Server2, Server3]),
publish_confirm(Ch1, Q, [<<"msg">> || _ <- lists:seq(1, 100)]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
timer:sleep(30000),
@ -1550,7 +1528,6 @@ leader_failover_dedupe(Config) ->
qos(Ch2, 100, false),
subscribe(Ch2, Q, false, 0),
validate_dedupe(Ch2, 1, N),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
initial_cluster_size_one(Config) ->
@ -1800,7 +1777,7 @@ max_age_policy(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
update_retention_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
@ -1808,18 +1785,22 @@ update_retention_policy(Config) ->
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-stream-max-segment-size-bytes">>, long, 200}
])),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"0">>, <<"0">>, <<"0">>]]),
[publish(Ch, Q, <<"msg">>) || _ <- lists:seq(1, 10000)],
quorum_queue_utils:wait_for_min_messages(Config, Q, 10000),
check_leader_and_replicas(Config, Servers),
Msgs = [<<"msg">> || _ <- lists:seq(1, 10000)], %% 3 bytes * 10000 = 30000 bytes
publish_confirm(Ch, Q, Msgs),
{ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
[rabbit_misc:r(<<"/">>, queue, Q)]),
timer:sleep(2000),
%% Don't use time based retention, it's really hard to get those tests right
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"retention">>, <<"update_retention_policy.*">>, <<"queues">>,
[{<<"max-age">>, <<"1s">>}]),
[{<<"max-length-bytes">>, 10000}]),
ensure_retention_applied(Config, Server),
quorum_queue_utils:wait_for_max_messages(Config, Q, 3000),
%% Retention policy should clear approximately 2/3 of the messages, but just to be safe
%% let's simply check that it removed half of them
quorum_queue_utils:wait_for_max_messages(Config, Q, 5000),
{ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
[rabbit_misc:r(<<"/">>, queue, Q)]),
@ -1912,12 +1893,15 @@ get_queue_type(Server, Q0) ->
amqqueue:get_type(Q1).
check_leader_and_replicas(Config, Members) ->
check_leader_and_replicas(Config, Members, online).
check_leader_and_replicas(Config, Members, Tag) ->
rabbit_ct_helpers:await_condition(
fun() ->
Info = find_queue_info(Config, [leader, members]),
Info = find_queue_info(Config, [leader, Tag]),
ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]),
lists:member(proplists:get_value(leader, Info), Members)
andalso (lists:sort(Members) == lists:sort(proplists:get_value(members, Info)))
andalso (lists:sort(Members) == lists:sort(proplists:get_value(Tag, Info)))
end, 60000).
publish(Ch, Queue) ->
@ -1929,6 +1913,12 @@ publish(Ch, Queue, Msg) ->
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = Msg}).
publish_confirm(Ch, Q, Msgs) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, Msg) || Msg <- Msgs],
amqp_channel:wait_for_confirms(Ch, 5).
subscribe(Ch, Queue, NoAck, Offset) ->
subscribe(Ch, Queue, NoAck, Offset, <<"ctag">>).
@ -1974,6 +1964,26 @@ validate_dedupe(Ch, N, M) ->
exit({missing_record, N})
end.
receive_batch_min_offset(Ch, N, M) ->
%% We are expecting values from the last committed offset - which might have increased
%% since we queried it. Accept as first offset anything greater than the last known
%% committed offset
receive
{_,
#amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
when S < N ->
exit({unexpected_offset, S});
{#'basic.deliver'{delivery_tag = DeliveryTag},
#amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} ->
ct:pal("Committed offset is ~p but as first offset got ~p", [N, S]),
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false}),
receive_batch(Ch, S + 1, M)
after 60000 ->
flush(),
exit({missing_offset, N})
end.
receive_batch(Ch, N, N) ->
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
@ -2032,3 +2042,10 @@ flush() ->
permute([]) -> [[]];
permute(L) -> [[H|T] || H <- L, T <- permute(L--[H])].
ensure_retention_applied(Config, Server) ->
%% Retention is asynchronous, so committing all messages doesn't mean old segments have been
%% cleared up.
%% Let's force a call on the retention gen_server, any pending retention would have been
%% processed when this call returns.
rabbit_ct_broker_helpers:rpc(Config, Server, gen_server, call, [osiris_retention, test]).