Stream Coordinator refactor

This commit is contained in:
kjnilsson 2021-01-13 17:12:14 +00:00
parent 2517aec971
commit 9fb2e6d2dd
9 changed files with 2368 additions and 915 deletions

View File

@ -131,10 +131,12 @@ endef
APPS_DIR := $(CURDIR)/apps
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl
BUILD_DEPS = rabbitmq_cli
DEPS = cuttlefish ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}
PLT_APPS += mnesia
dep_cuttlefish = git https://github.com/Kyorai/cuttlefish master

View File

@ -1157,7 +1157,9 @@ list_local_mirrored_classic_without_synchronised_mirrors_for_cli() ->
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
Node =:= Leader.
Node =:= Leader;
is_local_to_node(_QPid, _Node) ->
false.
-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()].

View File

@ -1786,6 +1786,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E;
{{error, global_qos_not_supported_for_queue_type} = E, _Q} ->
E;
{{error, no_local_stream_replica_available} = E, _Q} ->
E;
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.

View File

@ -63,32 +63,32 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
maps:map(
fun ({osiris_writer, QName}, #{offset := Offs,
first_offset := FstOffs}) ->
COffs = Offs + 1 - FstOffs,
rabbit_core_metrics:queue_stats(QName, COffs, 0, COffs, 0),
Infos = try
%% TODO complete stats!
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
rabbit_stream_queue:info(Q, ?STATISTICS_KEYS);
_ ->
[]
end
catch
_:_ ->
%% It's possible that the writer has died but
%% it's still on the amqqueue record, so the
%% `erlang:process_info/2` calls will return
%% `undefined` and crash with a badmatch.
%% At least for now, skipping the metrics might
%% be the best option. Otherwise this brings
%% down `rabbit_sup` and the whole `rabbit` app.
[]
end,
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, COffs},
{messages_ready, COffs},
{messages_unacknowledged, 0}]),
COffs = Offs + 1 - FstOffs,
rabbit_core_metrics:queue_stats(QName, COffs, 0, COffs, 0),
Infos = try
%% TODO complete stats!
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
rabbit_stream_queue:info(Q, ?STATISTICS_KEYS);
_ ->
[]
end
catch
_:_ ->
%% It's possible that the writer has died but
%% it's still on the amqqueue record, so the
%% `erlang:process_info/2` calls will return
%% `undefined` and crash with a badmatch.
%% At least for now, skipping the metrics might
%% be the best option. Otherwise this brings
%% down `rabbit_sup` and the whole `rabbit` app.
[]
end,
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, COffs},
{messages_ready, COffs},
{messages_unacknowledged, 0}]),
ok;
(_, _V) ->
ok

File diff suppressed because it is too large Load Diff

View File

@ -65,8 +65,10 @@
listening_offset = 0 :: non_neg_integer(),
log :: undefined | osiris_log:state()}).
-record(stream_client, {name :: term(),
-record(stream_client, {stream_id :: string(),
name :: term(),
leader :: pid(),
local_pid :: undefined | pid(),
next_seq = 1 :: non_neg_integer(),
correlation = #{} :: #{appender_seq() => {msg_id(), msg()}},
soft_limit :: non_neg_integer(),
@ -93,42 +95,42 @@ declare(Q0, Node) when ?amqqueue_is_stream(Q0) ->
fun rabbit_queue_type_util:check_non_durable/1],
Q0) of
ok ->
start_cluster(Q0, Node);
create_stream(Q0, Node);
Err ->
Err
end.
start_cluster(Q0, Node) ->
create_stream(Q0, Node) ->
Arguments = amqqueue:get_arguments(Q0),
QName = amqqueue:get_name(Q0),
Opts = amqqueue:get_options(Q0),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
Conf0 = make_stream_conf(Node, Q0),
case rabbit_stream_coordinator:start_cluster(
amqqueue:set_type_state(Q0, Conf0)) of
{ok, {error, already_started}, _} ->
{protocol_error, precondition_failed, "safe queue name already in use '~s'",
[Node]};
{ok, {created, Q}, _} ->
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{user_who_performed_action,
ActingUser}]),
{new, Q};
{ok, {error, Error}, _} ->
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
{protocol_error, internal_error, "Cannot declare a queue '~s' on node '~s': ~255p",
[rabbit_misc:rs(QName), node(), Error]};
{ok, {existing, Q}, _} ->
{existing, Q};
{error, coordinator_unavailable} ->
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
{protocol_error, internal_error,
"Cannot declare a queue '~s' on node '~s': coordinator unavailable",
[rabbit_misc:rs(QName), node()]}
Conf = apply_leader_locator_strategy(Conf0),
#{leader_node := LeaderNode} = Conf,
Q1 = amqqueue:set_type_state(Q0, Conf),
case rabbit_amqqueue:internal_declare(Q1, false) of
{created, Q} ->
case rabbit_stream_coordinator:new_stream(Q, LeaderNode) of
{ok, {ok, LeaderPid}, _} ->
%% update record with leader pid
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{user_who_performed_action,
ActingUser}]),
{new, Q};
Error ->
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
{protocol_error, internal_error, "Cannot declare a queue '~s' on node '~s': ~255p",
[rabbit_misc:rs(QName), node(), Error]}
end;
{existing, Q} ->
{existing, Q}
end.
-spec delete(amqqueue:amqqueue(), boolean(),
@ -136,8 +138,7 @@ start_cluster(Q0, Node) ->
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(in_use | not_empty).
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
Name = maps:get(name, amqqueue:get_type_state(Q)),
{ok, Reply, _} = rabbit_stream_coordinator:delete_cluster(Name, ActingUser),
{ok, Reply} = rabbit_stream_coordinator:delete_stream(Q, ActingUser),
Reply.
-spec purge(amqqueue:amqqueue()) ->
@ -199,42 +200,57 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
%% really it should be sent by the stream queue process like classic queues
%% do
maybe_send_reply(ChPid, OkMsg),
QState = begin_stream(QState0, Q, ConsumerTag, Offset,
ConsumerPrefetchCount),
{ok, QState, []};
begin_stream(QState0, Q, ConsumerTag, Offset, ConsumerPrefetchCount);
Err ->
Err
end.
get_local_pid(#{leader_pid := Pid}) when node(Pid) == node() ->
Pid;
get_local_pid(#{replica_pids := ReplicaPids}) ->
[Local | _] = lists:filter(fun(Pid) ->
node(Pid) == node()
end, ReplicaPids),
Local.
get_local_pid(#stream_client{local_pid = Pid} = State)
when is_pid(Pid) ->
{Pid, State};
get_local_pid(#stream_client{leader = Pid} = State)
when is_pid(Pid) andalso node(Pid) == node() ->
{Pid, State#stream_client{local_pid = Pid}};
get_local_pid(#stream_client{stream_id = StreamId,
local_pid = undefined} = State) ->
%% query local coordinator to get pid
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
{Pid, State#stream_client{local_pid = Pid}};
{error, not_found} ->
{undefined, State}
end.
begin_stream(#stream_client{readers = Readers0} = State,
begin_stream(#stream_client{readers = Readers0} = State0,
Q, Tag, Offset, Max) ->
LocalPid = get_local_pid(amqqueue:get_type_state(Q)),
{ok, Seg0} = osiris:init_reader(LocalPid, Offset),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{name = amqqueue:get_name(Q),
credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max},
State#stream_client{readers = Readers0#{Tag => Str0}}.
{LocalPid, State} = get_local_pid(State0),
case LocalPid of
undefined ->
{error, no_local_stream_replica_available};
_ ->
{ok, Seg0} = osiris:init_reader(LocalPid, Offset),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{name = amqqueue:get_name(Q),
credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max},
Actions = [],
%% TODO: we need to monitor the local pid in case the stream is
%% restarted
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
end.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
name = QName} = State) ->
@ -326,6 +342,7 @@ handle_event({osiris_written, From, _WriterId, Corrs},
fun (_Seq, {I, _M}, Acc) ->
[I | Acc]
end, [], maps:with(Corrs, Correlation0))),
Correlation = maps:without(Corrs, Correlation0),
Slow = case maps:size(Correlation) < SftLmt of
true when Slow0 ->
@ -401,10 +418,13 @@ i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);
i(arguments, Q) when ?is_amqqueue(Q) -> amqqueue:get_arguments(Q);
i(leader, Q) when ?is_amqqueue(Q) ->
#{leader_node := Leader} = amqqueue:get_type_state(Q),
Leader;
case amqqueue:get_pid(Q) of
none ->
undefined;
Pid -> node(Pid)
end;
i(members, Q) when ?is_amqqueue(Q) ->
#{replica_nodes := Nodes} = amqqueue:get_type_state(Q),
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes;
i(online, Q) ->
#{replica_pids := ReplicaPids,
@ -464,11 +484,14 @@ i(_, _) ->
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
#{name := StreamId} = amqqueue:get_type_state(Q),
%% tell us about leader changes so we can fail over
{ok, ok, _} = rabbit_stream_coordinator:register_listener(Q),
Prefix = erlang:pid_to_list(self()) ++ "_",
WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
{ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
#stream_client{name = amqqueue:get_name(Q),
#stream_client{stream_id = StreamId,
name = amqqueue:get_name(Q),
leader = Leader,
writer_id = WriterId,
soft_limit = SoftLimit}.
@ -487,6 +510,7 @@ update(Q, State)
update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->
State;
update_leader_pid(Pid, #stream_client{} = State) ->
rabbit_log:debug("stream client: new leader detected ~w", [Pid]),
resend_all(State#stream_client{leader = Pid}).
state_info(_) ->
@ -553,16 +577,18 @@ delete_replica(VHost, Name, Node) ->
make_stream_conf(Node, Q) ->
QName = amqqueue:get_name(Q),
Name = queue_name(QName),
Name = stream_name(QName),
%% MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
fun res_arg/2, Q)),
InitialClusterSize = initial_cluster_size(args_policy_lookup(<<"initial-cluster-size">>,
fun res_arg/2, Q)),
InitialClusterSize = initial_cluster_size(
args_policy_lookup(<<"initial-cluster-size">>,
fun res_arg/2, Q)),
Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
%% TODO: try to avoid nodes that are not connected
Replicas = select_stream_nodes(InitialClusterSize - 1, Replicas0),
Formatter = {?MODULE, format_osiris_event, [QName]},
Retention = lists:filter(fun({_, R}) ->
@ -572,6 +598,7 @@ make_stream_conf(Node, Q) ->
add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName,
name => Name,
retention => Retention,
nodes => [Node | Replicas],
leader_locator_strategy => LeaderLocator,
leader_node => Node,
replica_nodes => Replicas,
@ -637,7 +664,7 @@ initial_cluster_size(Val) ->
res_arg(PolVal, undefined) -> PolVal;
res_arg(_, ArgVal) -> ArgVal.
queue_name(#resource{virtual_host = VHost, name = Name}) ->
stream_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
Timestamp/binary>>)).
@ -648,7 +675,8 @@ recover(Q) ->
check_queue_exists_in_local_node(Q) ->
Conf = amqqueue:get_type_state(Q),
AllNodes = [maps:get(leader_node, Conf) | maps:get(replica_nodes, Conf)],
AllNodes = [maps:get(leader_node, Conf) |
maps:get(replica_nodes, Conf)],
case lists:member(node(), AllNodes) of
true ->
ok;
@ -682,6 +710,9 @@ stream_entries(Name, LeaderPid,
false ->
{Str0#stream{log = Seg}, MsgIn}
end;
{error, Err} ->
rabbit_log:debug("stream client: error reading chunk ~w", [Err]),
exit(Err);
{Records, Seg} ->
Msgs = [begin
Msg0 = binary_to_msg(QName, B),
@ -770,7 +801,75 @@ resend_all(#stream_client{leader = LeaderPid,
writer_id = WriterId,
correlation = Corrs} = State) ->
Msgs = lists:sort(maps:values(Corrs)),
case Msgs of
[] -> ok;
[{Seq, _} | _] ->
rabbit_log:debug("stream client: resending from seq ~w num ~b",
[Seq, maps:size(Corrs)])
end,
[begin
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
end || {Seq, Msg} <- Msgs],
State.
set_leader_pid(Pid, QName) ->
Fun = fun (Q) ->
amqqueue:set_pid(Q, Pid)
end,
case rabbit_misc:execute_mnesia_transaction(
fun() ->
rabbit_amqqueue:update(QName, Fun)
end) of
not_found ->
%% This can happen during recovery
[Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q));
_ ->
ok
end.
apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf) ->
Conf;
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"random">>,
name := StreamId} = Conf) ->
Replicas = [Leader | Replicas0],
ClusterSize = length(Replicas),
Hash = erlang:phash2(StreamId),
Pos = (Hash rem ClusterSize) + 1,
NewLeader = lists:nth(Pos, Replicas),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas};
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"least-leaders">>} = Conf) ->
Replicas = [Leader | Replicas0],
Counters0 = maps:from_list([{R, 0} || R <- Replicas]),
Counters = maps:to_list(
lists:foldl(fun(Q, Acc) ->
P = amqqueue:get_pid(Q),
case amqqueue:get_type(Q) of
?MODULE when is_pid(P) ->
maps:update_with(node(P), fun(V) -> V + 1 end, 1, Acc);
_ ->
Acc
end
end, Counters0, rabbit_amqqueue:list())),
Ordered = lists:sort(fun({_, V1}, {_, V2}) ->
V1 =< V2
end, Counters),
%% We could have potentially introduced nodes that are not in the list of replicas if
%% initial cluster size is smaller than the cluster size. Let's select the first one
%% that is on the list of replicas
NewLeader = select_first_matching_node(Ordered, Replicas),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas}.
select_first_matching_node([{N, _} | Rest], Replicas) ->
case lists:member(N, Replicas) of
true -> N;
false -> select_first_matching_node(Rest, Replicas)
end.

View File

@ -7,6 +7,7 @@
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
wait_for_messages/2,
wait_for_messages/3,
wait_for_min_messages/3,
wait_for_max_messages/3,
dirty_query/3,

View File

@ -46,6 +46,7 @@ groups() ->
delete_classic_replica,
delete_quorum_replica,
consume_from_replica,
replica_recovery,
leader_failover,
leader_failover_dedupe,
initial_cluster_size_one,
@ -253,13 +254,15 @@ declare_queue(Config) ->
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
?assertMatch([_], rpc:call(Server, supervisor, which_children,
[osiris_server_sup])),
%% Test declare an existing queue with different arguments
?assertExit(_, declare(Ch, Q, [])).
?assertExit(_, declare(Ch, Q, [])),
ok.
delete_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -316,7 +319,8 @@ add_replica(Config) ->
[<<"/">>, Q, Server1])),
%% replicas must be recorded on the state, and if we publish messages then they must
%% be stored on disk
check_leader_and_replicas(Config, Q, Server0, [Server1]),
timer:sleep(2000),
check_leader_and_replicas(Config, Q, [Server0, Server1]),
%% And if we try again? Idempotent
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, Q, Server1])),
@ -326,7 +330,8 @@ add_replica(Config) ->
rabbit_control_helper:command(start_app, Server2),
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, Q, Server2])),
check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]).
timer:sleep(2000),
check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]).
delete_replica(Config) ->
[Server0, Server1, Server2] =
@ -335,7 +340,7 @@ delete_replica(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]),
%% Not a member of the cluster, what would happen?
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
@ -343,15 +348,17 @@ delete_replica(Config) ->
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
timer:sleep(2000),
%% check it's gone
check_leader_and_replicas(Config, Q, Server0, [Server2]),
check_leader_and_replicas(Config, Q, [Server0, Server2]),
%% And if we try again? Idempotent
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
%% Delete the last replica
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server2])),
check_leader_and_replicas(Config, Q, Server0, []).
timer:sleep(2000),
check_leader_and_replicas(Config, Q, [Server0]).
grow_coordinator_cluster(Config) ->
[Server0, Server1, _Server2] =
@ -437,14 +444,15 @@ delete_down_replica(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
%% check it isn't gone
check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
timer:sleep(5000),
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])).
@ -493,7 +501,7 @@ restart_single_node(Config) ->
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server | _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
@ -502,13 +510,23 @@ recover(Config) ->
publish(Ch, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
[rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
% [A, B, C] = Servers0,
% Servers = [A, C, B],
% [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
% [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
% quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
[begin
ct:pal("recover: running stop start for permuation ~w", [Servers]),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
[rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
ct:pal("recover: running stop waiting for messages ~w", [Servers]),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 120)
end || Servers <- permute(Servers0)],
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
publish(Ch1, Q),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]),
ok.
consume_without_qos(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -517,7 +535,7 @@ consume_without_qos(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
self())).
@ -941,7 +959,7 @@ consume_from_replica(Config) ->
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99).
@ -1165,6 +1183,31 @@ max_age(Config) ->
subscribe(Ch1, Q, false, 0),
?assertEqual(100, length(receive_batch())).
replica_recovery(Config) ->
[Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
timer:sleep(2000),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
ok.
leader_failover(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1179,7 +1222,7 @@ leader_failover(Config) ->
[publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch1, 5),
check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
check_leader_and_replicas(Config, Q, [Server1, Server2, Server3]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
timer:sleep(30000),
@ -1198,16 +1241,22 @@ leader_failover(Config) ->
leader_failover_dedupe(Config) ->
%% tests that in-flight messages are automatically handled in the case where
%% a leader change happens during publishing
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
PermNodes = permute(
rabbit_ct_broker_helpers:get_node_configs(Config, nodename)),
%% pick a random node order for this test
%% realle we should run all permuations
Nodes = lists:nth(rand:uniform(length(PermNodes)), PermNodes),
ct:pal("~s running with nodes ~w", [?FUNCTION_NAME, Nodes]),
[_Server1, DownNode, PubNode] = Nodes,
Ch1 = rabbit_ct_client_helpers:open_channel(Config, DownNode),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
check_leader_and_replicas(Config, Q, Nodes),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, PubNode),
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
Self= self(),
@ -1234,23 +1283,27 @@ leader_failover_dedupe(Config) ->
erlang:monitor(process, Pid),
Pid ! go,
timer:sleep(10),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, DownNode),
%% this should cause a new leader to be elected and the channel on node 2
%% to have to resend any pending messages to ensure none is lost
timer:sleep(30000),
ct:pal("preinfo", []),
[Info] = lists:filter(
fun(Props) ->
QName = rabbit_misc:r(<<"/">>, queue, Q),
lists:member({name, QName}, Props)
end,
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader, members]])),
rabbit_ct_broker_helpers:rpc(Config, PubNode, rabbit_amqqueue,
info_all,
[<<"/">>, [name, leader, members]])),
ct:pal("info ~p", [Info]),
NewLeader = proplists:get_value(leader, Info),
?assert(NewLeader =/= Server1),
?assert(NewLeader =/= DownNode),
flush(),
?assert(erlang:is_process_alive(Pid)),
ct:pal("stopping"),
Pid ! stop,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, DownNode),
N = receive
{last_msg, X} -> X
@ -1274,7 +1327,7 @@ initial_cluster_size_one(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 1}])),
check_leader_and_replicas(Config, Q, Server1, []),
check_leader_and_replicas(Config, Q, [Server1]),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
@ -1296,7 +1349,7 @@ initial_cluster_size_two(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader, members]])),
?assertEqual(Server1, proplists:get_value(leader, Info)),
?assertEqual(1, length(proplists:get_value(members, Info))),
?assertEqual(2, length(proplists:get_value(members, Info))),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
@ -1314,7 +1367,7 @@ initial_cluster_size_one_policy(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 1}])),
check_leader_and_replicas(Config, Q, Server1, []),
check_leader_and_replicas(Config, Q, [Server1]),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
@ -1632,17 +1685,18 @@ get_queue_type(Server, Q0) ->
{ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
amqqueue:get_type(Q1).
check_leader_and_replicas(Config, Name, Leader, Replicas0) ->
check_leader_and_replicas(Config, Name, Members) ->
QNameRes = rabbit_misc:r(<<"/">>, queue, Name),
[Info] = lists:filter(
fun(Props) ->
lists:member({name, QNameRes}, Props)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader, members]])),
?assertEqual(Leader, proplists:get_value(leader, Info)),
Replicas = lists:sort(Replicas0),
?assertEqual(Replicas, lists:sort(proplists:get_value(members, Info))).
info_all, [<<"/">>, [name, leader,
members]])),
ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]),
?assert(lists:member(proplists:get_value(leader, Info), Members)),
?assertEqual(lists:sort(Members), lists:sort(proplists:get_value(members, Info))).
publish(Ch, Queue) ->
publish(Ch, Queue, <<"msg">>).
@ -1678,7 +1732,8 @@ validate_dedupe(Ch, N, N) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 60000 ->
exit({missing_record, N})
flush(),
exit({missing_record, N})
end;
validate_dedupe(Ch, N, M) ->
receive
@ -1690,7 +1745,8 @@ validate_dedupe(Ch, N, M) ->
multiple = false}),
validate_dedupe(Ch, N + 1, M)
after 60000 ->
exit({missing_record, N})
flush(),
exit({missing_record, N})
end.
receive_batch(Ch, N, N) ->
@ -1700,7 +1756,8 @@ receive_batch(Ch, N, N) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 60000 ->
exit({missing_offset, N})
flush(),
exit({missing_offset, N})
end;
receive_batch(Ch, N, M) ->
receive
@ -1714,7 +1771,8 @@ receive_batch(Ch, N, M) ->
multiple = false}),
receive_batch(Ch, N + 1, M)
after 60000 ->
exit({missing_offset, N})
flush(),
exit({missing_offset, N})
end.
receive_batch() ->
@ -1746,3 +1804,6 @@ flush() ->
after 0 ->
ok
end.
permute([]) -> [[]];
permute(L) -> [[H|T] || H <- L, T <- permute(L--[H])].

View File

@ -6,7 +6,7 @@
fake_pid(Node) ->
NodeBin = rabbit_data_coercion:to_binary(Node),
NodeBin = atom_to_binary(Node),
ThisNodeSize = size(term_to_binary(node())) + 1,
Pid = spawn(fun () -> ok end),
%% drop the local node data from a local pid