Merge pull request #12753 from rabbitmq/md/khepri-0-17
Bump Khepri to 0.17.0
This commit is contained in:
commit
dc5a703c23
|
|
@ -57,7 +57,8 @@ jobs:
|
|||
uses: dsaltares/fetch-gh-release-asset@master
|
||||
if: inputs.mixed_clusters
|
||||
with:
|
||||
version: 'tags/v4.0.5'
|
||||
repo: 'rabbitmq/server-packages'
|
||||
version: 'tags/alphas.1744021065493'
|
||||
regex: true
|
||||
file: "rabbitmq-server-generic-unix-\\d.+\\.tar\\.xz"
|
||||
target: ./
|
||||
|
|
|
|||
|
|
@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
|
|||
end,
|
||||
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
|
||||
|
||||
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
|
||||
Path = khepri_route_path(
|
||||
VHost,
|
||||
Name,
|
||||
_Kind = ?KHEPRI_WILDCARD_STAR,
|
||||
_DstName = ?KHEPRI_WILDCARD_STAR,
|
||||
_RoutingKey = #if_has_data{}),
|
||||
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
|
||||
maps:fold(fun(_P, #{data := Set}, Acc) ->
|
||||
sets:to_list(Set) ++ Acc
|
||||
end, [], Bindings).
|
||||
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
|
||||
Pattern = khepri_route_path(
|
||||
VHost,
|
||||
SrcName,
|
||||
?KHEPRI_WILDCARD_STAR, %% Kind
|
||||
?KHEPRI_WILDCARD_STAR, %% DstName
|
||||
#if_has_data{}), %% RoutingKey
|
||||
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
|
||||
maps:fold(
|
||||
fun(Path, Props, Acc) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_ROUTE_PATH(
|
||||
VHost, SrcName, _Kind, _Name, _RoutingKey),
|
||||
#{data := Set}} ->
|
||||
sets:to_list(Set) ++ Acc;
|
||||
{_, _} ->
|
||||
Acc
|
||||
end
|
||||
end, [], Bindings).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% delete_for_destination_in_mnesia().
|
||||
|
|
@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
|
|||
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
|
||||
Pattern = khepri_route_path(
|
||||
VHost,
|
||||
_SrcName = ?KHEPRI_WILDCARD_STAR,
|
||||
?KHEPRI_WILDCARD_STAR, %% SrcName
|
||||
Kind,
|
||||
Name,
|
||||
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
|
||||
?KHEPRI_WILDCARD_STAR), %% RoutingKey
|
||||
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
|
||||
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
|
||||
sets:to_list(Set) ++ Acc
|
||||
end, [], BindingsMap),
|
||||
Bindings = maps:fold(
|
||||
fun(Path, Props, Acc) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_ROUTE_PATH(
|
||||
VHost, _SrcName, Kind, Name, _RoutingKey),
|
||||
#{data := Set}} ->
|
||||
sets:to_list(Set) ++ Acc;
|
||||
{_, _} ->
|
||||
Acc
|
||||
end
|
||||
end, [], BindingsMap),
|
||||
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
|
||||
lists:keysort(#binding.source, Bindings), OnlyDurable).
|
||||
|
||||
|
|
|
|||
|
|
@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
|
|||
Path = khepri_exchange_path(XName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
case Ret1 of
|
||||
{ok, #{data := X, payload_version := Vsn}} ->
|
||||
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
|
||||
X1 = Fun(X),
|
||||
UpdatePath =
|
||||
khepri_path:combine_with_conditions(
|
||||
|
|
@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
|
|||
Path = khepri_exchange_serial_path(XName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
case Ret1 of
|
||||
{ok, #{data := Serial,
|
||||
payload_version := Vsn}} ->
|
||||
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
|
||||
UpdatePath =
|
||||
khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
|
|
@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
|
|||
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
|
||||
Deletions =
|
||||
maps:fold(
|
||||
fun(_Path, #{data := X}, Deletions) ->
|
||||
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
|
||||
rabbit_db_binding:delete_all_for_exchange_in_khepri(
|
||||
X, false, true),
|
||||
Deletions1 = rabbit_binding:add_deletion(
|
||||
XName, X, deleted, Bindings, XDeletions),
|
||||
rabbit_binding:combine_deletions(Deletions, Deletions1)
|
||||
fun(Path, Props, Deletions) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
|
||||
#{data := X}} ->
|
||||
{deleted,
|
||||
#exchange{name = XName}, Bindings, XDeletions} =
|
||||
rabbit_db_binding:delete_all_for_exchange_in_khepri(
|
||||
X, false, true),
|
||||
Deletions1 = rabbit_binding:add_deletion(
|
||||
XName, X, deleted, Bindings, XDeletions),
|
||||
rabbit_binding:combine_deletions(Deletions, Deletions1);
|
||||
{_, _} ->
|
||||
Deletions
|
||||
end
|
||||
end, rabbit_binding:new_deletions(), NodeProps),
|
||||
{ok, Deletions}.
|
||||
|
||||
|
|
|
|||
|
|
@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
|
|||
mirroring_pid = Overall,
|
||||
childspec = ChildSpec},
|
||||
case rabbit_khepri:adv_get(Path) of
|
||||
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
|
||||
payload_version := Vsn}} ->
|
||||
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
|
||||
payload_version := Vsn}}} ->
|
||||
case Overall of
|
||||
Pid ->
|
||||
Delegate;
|
||||
|
|
@ -160,6 +160,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
|
|||
end
|
||||
end;
|
||||
_ ->
|
||||
%% FIXME: Not atomic with the get above.
|
||||
ok = rabbit_khepri:put(Path, S),
|
||||
start
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -411,8 +411,18 @@ delete_in_khepri(QueueName, OnlyDurable) ->
|
|||
rabbit_khepri:transaction(
|
||||
fun () ->
|
||||
Path = khepri_queue_path(QueueName),
|
||||
UsesUniformWriteRet = try
|
||||
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||
catch
|
||||
error:undef ->
|
||||
false
|
||||
end,
|
||||
case khepri_tx_adv:delete(Path) of
|
||||
{ok, #{data := _}} ->
|
||||
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
|
||||
%% we want to execute some things, as decided by rabbit_exchange,
|
||||
%% after the transaction.
|
||||
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
|
||||
{ok, #{data := _}} when not UsesUniformWriteRet ->
|
||||
%% we want to execute some things, as decided by rabbit_exchange,
|
||||
%% after the transaction.
|
||||
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
|
||||
|
|
@ -607,7 +617,7 @@ update_in_khepri(QName, Fun) ->
|
|||
Path = khepri_queue_path(QName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
case Ret1 of
|
||||
{ok, #{data := Q, payload_version := Vsn}} ->
|
||||
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
|
||||
UpdatePath = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
Q1 = Fun(Q),
|
||||
|
|
@ -658,7 +668,7 @@ update_decorators_in_khepri(QName, Decorators) ->
|
|||
Path = khepri_queue_path(QName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
case Ret1 of
|
||||
{ok, #{data := Q1, payload_version := Vsn}} ->
|
||||
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
|
||||
Q2 = amqqueue:set_decorators(Q1, Decorators),
|
||||
UpdatePath = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
|
|
@ -1098,15 +1108,12 @@ delete_transient_in_khepri(FilterFun) ->
|
|||
case rabbit_khepri:adv_get_many(PathPattern) of
|
||||
{ok, Props} ->
|
||||
Qs = maps:fold(
|
||||
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
|
||||
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
|
||||
when ?is_amqqueue(Q) ->
|
||||
case FilterFun(Q) of
|
||||
true ->
|
||||
Path = khepri_path:combine_with_conditions(
|
||||
Path0,
|
||||
[#if_payload_version{version = Vsn}]),
|
||||
QName = amqqueue:get_name(Q),
|
||||
[{Path, QName} | Acc];
|
||||
[{Path, Vsn, QName} | Acc];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
|
|
@ -1125,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
|
|||
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
|
||||
Res = rabbit_khepri:transaction(
|
||||
fun() ->
|
||||
rabbit_misc:fold_while_ok(
|
||||
fun({Path, QName}, Acc) ->
|
||||
%% Also see `delete_in_khepri/2'.
|
||||
case khepri_tx_adv:delete(Path) of
|
||||
{ok, #{data := _}} ->
|
||||
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
||||
QName, false),
|
||||
{ok, [{QName, Deletions} | Acc]};
|
||||
{ok, _} ->
|
||||
{ok, Acc};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end
|
||||
end, [], Qs)
|
||||
do_delete_transient_queues_in_khepri_tx(Qs, [])
|
||||
end),
|
||||
case Res of
|
||||
{ok, Items} ->
|
||||
|
|
@ -1152,6 +1146,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
|
|||
Error
|
||||
end.
|
||||
|
||||
do_delete_transient_queues_in_khepri_tx([], Acc) ->
|
||||
{ok, Acc};
|
||||
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
|
||||
%% Also see `delete_in_khepri/2'.
|
||||
VersionedPath = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
UsesUniformWriteRet = try
|
||||
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||
catch
|
||||
error:undef ->
|
||||
false
|
||||
end,
|
||||
case khepri_tx_adv:delete(VersionedPath) of
|
||||
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
|
||||
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
||||
QName, false),
|
||||
Acc1 = [{QName, Deletions} | Acc],
|
||||
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
|
||||
{ok, #{data := _}} when not UsesUniformWriteRet ->
|
||||
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
||||
QName, false),
|
||||
Acc1 = [{QName, Deletions} | Acc],
|
||||
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
|
||||
{ok, _} ->
|
||||
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% foreach_transient().
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
|
|||
Record = #runtime_parameters{key = Key,
|
||||
value = Term},
|
||||
case rabbit_khepri:adv_put(Path, Record) of
|
||||
{ok, #{data := Params}} ->
|
||||
{ok, #{Path := #{data := Params}}} ->
|
||||
{old, Params#runtime_parameters.value};
|
||||
{ok, _} ->
|
||||
new
|
||||
|
|
@ -113,8 +113,16 @@ set_in_khepri_tx(Key, Term) ->
|
|||
Path = khepri_rp_path(Key),
|
||||
Record = #runtime_parameters{key = Key,
|
||||
value = Term},
|
||||
UsesUniformWriteRet = try
|
||||
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||
catch
|
||||
error:undef ->
|
||||
false
|
||||
end,
|
||||
case khepri_tx_adv:put(Path, Record) of
|
||||
{ok, #{data := Params}} ->
|
||||
{ok, #{Path := #{data := Params}}} when UsesUniformWriteRet ->
|
||||
{old, Params#runtime_parameters.value};
|
||||
{ok, #{data := Params}} when not UsesUniformWriteRet ->
|
||||
{old, Params#runtime_parameters.value};
|
||||
{ok, _} ->
|
||||
new
|
||||
|
|
@ -347,11 +355,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
|
|||
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
|
||||
|
||||
delete_vhost_in_khepri(VHostName) ->
|
||||
Path = khepri_vhost_rp_path(
|
||||
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
|
||||
case rabbit_khepri:adv_delete_many(Path) of
|
||||
{ok, Props} ->
|
||||
{ok, rabbit_khepri:collect_payloads(Props)};
|
||||
Pattern = khepri_vhost_rp_path(
|
||||
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
|
||||
case rabbit_khepri:adv_delete_many(Pattern) of
|
||||
{ok, NodePropsMap} ->
|
||||
RTParams =
|
||||
maps:fold(
|
||||
fun(Path, Props, Acc) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
|
||||
VHostName, _, _),
|
||||
#{data := RTParam}} ->
|
||||
[RTParam | Acc];
|
||||
{_, _} ->
|
||||
Acc
|
||||
end
|
||||
end, [], NodePropsMap),
|
||||
{ok, RTParams};
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
|
|||
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
|
||||
rabbit_khepri:transaction(
|
||||
fun() ->
|
||||
UserPermissionsPath = khepri_user_permission_path(
|
||||
?KHEPRI_WILDCARD_STAR, VHostName),
|
||||
TopicPermissionsPath = khepri_topic_permission_path(
|
||||
?KHEPRI_WILDCARD_STAR, VHostName,
|
||||
?KHEPRI_WILDCARD_STAR),
|
||||
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
|
||||
{ok, TopicProps} = khepri_tx_adv:delete_many(
|
||||
TopicPermissionsPath),
|
||||
Deletions = rabbit_khepri:collect_payloads(
|
||||
TopicProps,
|
||||
rabbit_khepri:collect_payloads(UserProps)),
|
||||
{ok, Deletions}
|
||||
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
|
||||
end, rw, #{timeout => infinity}).
|
||||
|
||||
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
|
||||
UserPermissionsPattern = khepri_user_permission_path(
|
||||
?KHEPRI_WILDCARD_STAR, VHostName),
|
||||
TopicPermissionsPattern = khepri_topic_permission_path(
|
||||
?KHEPRI_WILDCARD_STAR, VHostName,
|
||||
?KHEPRI_WILDCARD_STAR),
|
||||
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
|
||||
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
|
||||
TopicPermissionsPattern),
|
||||
Deletions0 =
|
||||
maps:fold(
|
||||
fun(Path, Props, Acc) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
|
||||
#{data := Permission}} ->
|
||||
[Permission | Acc];
|
||||
{_, _} ->
|
||||
Acc
|
||||
end
|
||||
end, [], UserNodePropsMap),
|
||||
Deletions1 =
|
||||
maps:fold(
|
||||
fun(Path, Props, Acc) ->
|
||||
case {Path, Props} of
|
||||
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
|
||||
#{data := Permission}} ->
|
||||
[Permission | Acc];
|
||||
{_, _} ->
|
||||
Acc
|
||||
end
|
||||
end, Deletions0, TopicNodePropsMap),
|
||||
{ok, Deletions1}.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% get_topic_permissions().
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
|
|||
Path = khepri_vhost_path(VHostName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
case Ret1 of
|
||||
{ok, #{data := VHost0, payload_version := DVersion}} ->
|
||||
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
|
||||
VHost = vhost:merge_metadata(VHost0, Metadata),
|
||||
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
|
||||
Path1 = khepri_path:combine_with_conditions(
|
||||
|
|
@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
|
|||
update_in_khepri(VHostName, UpdateFun) ->
|
||||
Path = khepri_vhost_path(VHostName),
|
||||
case rabbit_khepri:adv_get(Path) of
|
||||
{ok, #{data := V, payload_version := DVersion}} ->
|
||||
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
|
||||
V1 = UpdateFun(V),
|
||||
Path1 = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = DVersion}]),
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
case rabbit_khepri:put(Path1, V1) of
|
||||
ok ->
|
||||
V1;
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@
|
|||
get_ra_cluster_name/0,
|
||||
get_store_id/0,
|
||||
transfer_leadership/1,
|
||||
fence/1,
|
||||
|
||||
is_empty/0,
|
||||
create/2,
|
||||
|
|
@ -174,10 +175,6 @@
|
|||
|
||||
-export([force_shrink_member_to_current_member/0]).
|
||||
|
||||
%% Helpers for working with the Khepri API / types.
|
||||
-export([collect_payloads/1,
|
||||
collect_payloads/2]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([force_metadata_store/1,
|
||||
clear_forced_metadata_store/0]).
|
||||
|
|
@ -620,7 +617,7 @@ members() ->
|
|||
%% The returned list is empty if there was an error.
|
||||
|
||||
locally_known_members() ->
|
||||
case khepri_cluster:locally_known_members(?RA_CLUSTER_NAME) of
|
||||
case khepri_cluster:members(?RA_CLUSTER_NAME, #{favor => low_latency}) of
|
||||
{ok, Members} -> Members;
|
||||
{error, _Reason} -> []
|
||||
end.
|
||||
|
|
@ -650,7 +647,7 @@ nodes() ->
|
|||
%% The returned list is empty if there was an error.
|
||||
|
||||
locally_known_nodes() ->
|
||||
case khepri_cluster:locally_known_nodes(?RA_CLUSTER_NAME) of
|
||||
case khepri_cluster:nodes(?RA_CLUSTER_NAME, #{favor => low_latency}) of
|
||||
{ok, Nodes} -> Nodes;
|
||||
{error, _Reason} -> []
|
||||
end.
|
||||
|
|
@ -1020,12 +1017,14 @@ delete(Path, Options0) ->
|
|||
|
||||
delete_or_fail(Path) ->
|
||||
case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of
|
||||
{ok, Result} ->
|
||||
case maps:size(Result) of
|
||||
{ok, #{Path := NodeProps}} ->
|
||||
case maps:size(NodeProps) of
|
||||
0 -> {error, {node_not_found, #{}}};
|
||||
_ -> ok
|
||||
end;
|
||||
Error ->
|
||||
{ok, #{} = NodePropsMap} when NodePropsMap =:= #{} ->
|
||||
{error, {node_not_found, #{}}};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
|
|
@ -1072,48 +1071,6 @@ handle_async_ret(RaEvent) ->
|
|||
fence(Timeout) ->
|
||||
khepri:fence(?STORE_ID, Timeout).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% collect_payloads().
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec collect_payloads(Props) -> Ret when
|
||||
Props :: khepri:node_props(),
|
||||
Ret :: [Payload],
|
||||
Payload :: term().
|
||||
|
||||
%% @doc Collects all payloads from a node props map.
|
||||
%%
|
||||
%% This is the same as calling `collect_payloads(Props, [])'.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
collect_payloads(Props) when is_map(Props) ->
|
||||
collect_payloads(Props, []).
|
||||
|
||||
-spec collect_payloads(Props, Acc0) -> Ret when
|
||||
Props :: khepri:node_props(),
|
||||
Acc0 :: [Payload],
|
||||
Ret :: [Payload],
|
||||
Payload :: term().
|
||||
|
||||
%% @doc Collects all payloads from a node props map into the accumulator list.
|
||||
%%
|
||||
%% This is meant to be used with the `khepri_adv' API to easily collect the
|
||||
%% payloads from the return value of `khepri_adv:delete_many/4' for example.
|
||||
%%
|
||||
%% @returns all payloads in the node props map collected into a list, with
|
||||
%% `Acc0' as the tail.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) ->
|
||||
maps:fold(
|
||||
fun (_Path, #{data := Payload}, Acc) ->
|
||||
[Payload | Acc];
|
||||
(_Path, _NoPayload, Acc) ->
|
||||
Acc
|
||||
end, Acc0, Props).
|
||||
|
||||
-spec unregister_legacy_projections() -> Ret when
|
||||
Ret :: ok | timeout_error().
|
||||
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x
|
||||
|
|
@ -1557,19 +1514,31 @@ get_feature_state(Node) ->
|
|||
%% @private
|
||||
|
||||
khepri_db_migration_enable(#{feature_name := FeatureName}) ->
|
||||
maybe
|
||||
ok ?= sync_cluster_membership_from_mnesia(FeatureName),
|
||||
?LOG_INFO(
|
||||
"Feature flag `~s`: unregistering legacy projections",
|
||||
[FeatureName],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
ok ?= unregister_legacy_projections(),
|
||||
?LOG_INFO(
|
||||
"Feature flag `~s`: registering projections",
|
||||
[FeatureName],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
ok ?= register_projections(),
|
||||
migrate_mnesia_tables(FeatureName)
|
||||
Members = locally_known_members(),
|
||||
case length(Members) < 2 of
|
||||
true ->
|
||||
maybe
|
||||
ok ?= sync_cluster_membership_from_mnesia(FeatureName),
|
||||
?LOG_INFO(
|
||||
"Feature flag `~s`: unregistering legacy projections",
|
||||
[FeatureName],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
ok ?= unregister_legacy_projections(),
|
||||
?LOG_INFO(
|
||||
"Feature flag `~s`: registering projections",
|
||||
[FeatureName],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
ok ?= register_projections(),
|
||||
migrate_mnesia_tables(FeatureName)
|
||||
end;
|
||||
false ->
|
||||
?LOG_INFO(
|
||||
"Feature flag `~s`: node ~0p already clustered (feature flag "
|
||||
"enabled as part of clustering?); "
|
||||
"skipping Mnesia->Khepri migration",
|
||||
[node()],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
ok
|
||||
end.
|
||||
|
||||
%% @private
|
||||
|
|
|
|||
|
|
@ -9,14 +9,14 @@
|
|||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
||||
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, client_operations},
|
||||
{group, cluster_operation_add},
|
||||
{group, cluster_operation_remove}
|
||||
{group, cluster_operation}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
|
@ -42,8 +42,10 @@ groups() ->
|
|||
delete_policy,
|
||||
export_definitions
|
||||
]},
|
||||
{cluster_operation_add, [], [add_node]},
|
||||
{cluster_operation_remove, [], [remove_node]},
|
||||
{cluster_operation, [], [add_node_when_seed_node_is_leader,
|
||||
add_node_when_seed_node_is_follower,
|
||||
remove_node_when_seed_node_is_leader,
|
||||
remove_node_when_seed_node_is_follower]},
|
||||
{feature_flags, [], [enable_feature_flag]}
|
||||
].
|
||||
|
||||
|
|
@ -127,26 +129,49 @@ init_per_group(Group, Config0) when Group == client_operations;
|
|||
partition_5_node_cluster(Config1),
|
||||
Config1
|
||||
end;
|
||||
init_per_group(Group, Config0) ->
|
||||
init_per_group(_Group, Config0) ->
|
||||
Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 5},
|
||||
{rmq_nodename_suffix, Group},
|
||||
{rmq_nodes_clustered, false},
|
||||
{tcp_ports_base},
|
||||
{net_ticktime, 5}]),
|
||||
Config1 = rabbit_ct_helpers:merge_app_env(
|
||||
Config, {rabbit, [{forced_feature_flags_on_init, []}]}),
|
||||
rabbit_ct_helpers:run_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()).
|
||||
Config, {rabbit, [{forced_feature_flags_on_init, []},
|
||||
{khepri_leader_wait_retry_timeout, 30000}]}),
|
||||
Config1.
|
||||
|
||||
end_per_group(_, Config) ->
|
||||
end_per_group(Group, Config) when Group == client_operations;
|
||||
Group == feature_flags ->
|
||||
rabbit_ct_helpers:run_steps(Config,
|
||||
rabbit_ct_client_helpers:teardown_steps() ++
|
||||
rabbit_ct_broker_helpers:teardown_steps()).
|
||||
rabbit_ct_broker_helpers:teardown_steps());
|
||||
end_per_group(_Group, Config) ->
|
||||
Config.
|
||||
|
||||
init_per_testcase(Testcase, Config)
|
||||
when Testcase =:= add_node_when_seed_node_is_leader orelse
|
||||
Testcase =:= add_node_when_seed_node_is_follower orelse
|
||||
Testcase =:= remove_node_when_seed_node_is_leader orelse
|
||||
Testcase =:= remove_node_when_seed_node_is_follower ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
Config1 = rabbit_ct_helpers:set_config(
|
||||
Config, [{rmq_nodename_suffix, Testcase}]),
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps());
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||
|
||||
end_per_testcase(Testcase, Config)
|
||||
when Testcase =:= add_node_when_seed_node_is_leader orelse
|
||||
Testcase =:= add_node_when_seed_node_is_follower orelse
|
||||
Testcase =:= remove_node_when_seed_node_is_leader orelse
|
||||
Testcase =:= remove_node_when_seed_node_is_follower ->
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config,
|
||||
rabbit_ct_client_helpers:teardown_steps() ++
|
||||
rabbit_ct_broker_helpers:teardown_steps()),
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase);
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
|
|
@ -271,53 +296,153 @@ set_policy(Config) ->
|
|||
delete_policy(Config) ->
|
||||
?assertError(_, rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy-to-delete">>)).
|
||||
|
||||
add_node(Config) ->
|
||||
[A, B, C, D, _E] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
add_node_when_seed_node_is_leader(Config) ->
|
||||
[A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
%% Three node cluster: A, B, C
|
||||
ok = rabbit_control_helper:command(stop_app, B),
|
||||
ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []),
|
||||
rabbit_control_helper:command(start_app, B),
|
||||
Cluster = [A, B, C],
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, C),
|
||||
ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []),
|
||||
rabbit_control_helper:command(start_app, C),
|
||||
AMember = {rabbit_khepri:get_store_id(), A},
|
||||
_ = ra:transfer_leadership(AMember, AMember),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
|
||||
|
||||
%% Minority partition: A
|
||||
partition_3_node_cluster(Config1),
|
||||
|
||||
Pong = ra:ping(AMember, 10000),
|
||||
ct:pal("Member A state: ~0p", [Pong]),
|
||||
case Pong of
|
||||
{pong, State} when State =/= follower andalso State =/= candidate ->
|
||||
Ret = rabbit_control_helper:command(
|
||||
join_cluster, E, [atom_to_list(A)], []),
|
||||
?assertMatch({error, _, _}, Ret),
|
||||
{error, _, Msg} = Ret,
|
||||
?assertEqual(
|
||||
match,
|
||||
re:run(
|
||||
Msg, "(Khepri cluster could be in minority|\\{:rabbit, \\{\\{:error, :timeout\\})",
|
||||
[{capture, none}]));
|
||||
Ret ->
|
||||
ct:pal("A is not the expected leader: ~p", [Ret]),
|
||||
{skip, "Node A was not elected leader"}
|
||||
end.
|
||||
|
||||
add_node_when_seed_node_is_follower(Config) ->
|
||||
[A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
%% Three node cluster: A, B, C
|
||||
Cluster = [A, B, C],
|
||||
partition_3_node_cluster(Config),
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, D),
|
||||
%% The command is appended to the log, but it will be dropped once the connectivity
|
||||
%% is restored
|
||||
?assertMatch(ok,
|
||||
rabbit_control_helper:command(join_cluster, D, [atom_to_list(A)], [])),
|
||||
timer:sleep(10000),
|
||||
join_3_node_cluster(Config),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster).
|
||||
CMember = {rabbit_khepri:get_store_id(), C},
|
||||
ra:transfer_leadership(CMember, CMember),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
|
||||
|
||||
remove_node(Config) ->
|
||||
%% Minority partition: A
|
||||
partition_3_node_cluster(Config1),
|
||||
|
||||
AMember = {rabbit_khepri:get_store_id(), A},
|
||||
Pong = ra:ping(AMember, 10000),
|
||||
ct:pal("Member A state: ~0p", [Pong]),
|
||||
case Pong of
|
||||
{pong, State}
|
||||
when State =:= follower orelse State =:= pre_vote ->
|
||||
Ret = rabbit_control_helper:command(
|
||||
join_cluster, E, [atom_to_list(A)], []),
|
||||
?assertMatch({error, _, _}, Ret),
|
||||
{error, _, Msg} = Ret,
|
||||
?assertEqual(
|
||||
match,
|
||||
re:run(
|
||||
Msg, "Khepri cluster could be in minority",
|
||||
[{capture, none}]));
|
||||
{pong, await_condition} ->
|
||||
Ret = rabbit_control_helper:command(
|
||||
join_cluster, E, [atom_to_list(A)], []),
|
||||
?assertMatch({error, _, _}, Ret),
|
||||
{error, _, Msg} = Ret,
|
||||
?assertEqual(
|
||||
match,
|
||||
re:run(
|
||||
Msg, "\\{:rabbit, \\{\\{:error, :timeout\\}",
|
||||
[{capture, none}])),
|
||||
clustering_utils:assert_cluster_status(
|
||||
{Cluster, Cluster}, Cluster);
|
||||
Ret ->
|
||||
ct:pal("A is not the expected follower: ~p", [Ret]),
|
||||
{skip, "Node A was not a follower"}
|
||||
end.
|
||||
|
||||
remove_node_when_seed_node_is_leader(Config) ->
|
||||
[A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
%% Three node cluster: A, B, C
|
||||
ok = rabbit_control_helper:command(stop_app, B),
|
||||
ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []),
|
||||
rabbit_control_helper:command(start_app, B),
|
||||
Cluster = [A, B, C],
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, C),
|
||||
ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []),
|
||||
rabbit_control_helper:command(start_app, C),
|
||||
AMember = {rabbit_khepri:get_store_id(), A},
|
||||
ra:transfer_leadership(AMember, AMember),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
|
||||
|
||||
%% Minority partition: A
|
||||
partition_3_node_cluster(Config),
|
||||
Cluster = [A, B, C],
|
||||
partition_3_node_cluster(Config1),
|
||||
|
||||
ok = rabbit_control_helper:command(forget_cluster_node, A, [atom_to_list(B)], []),
|
||||
timer:sleep(10000),
|
||||
join_3_node_cluster(Config),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster).
|
||||
Pong = ra:ping(AMember, 10000),
|
||||
ct:pal("Member A state: ~0p", [Pong]),
|
||||
case Pong of
|
||||
{pong, leader} ->
|
||||
?awaitMatch(
|
||||
ok,
|
||||
rabbit_control_helper:command(
|
||||
forget_cluster_node, A, [atom_to_list(B)], []),
|
||||
60000);
|
||||
Ret ->
|
||||
ct:pal("A is not the expected leader: ~p", [Ret]),
|
||||
{skip, "Node A was not a leader"}
|
||||
end.
|
||||
|
||||
remove_node_when_seed_node_is_follower(Config) ->
|
||||
[A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
%% Three node cluster: A, B, C
|
||||
Cluster = [A, B, C],
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),
|
||||
|
||||
CMember = {rabbit_khepri:get_store_id(), C},
|
||||
ra:transfer_leadership(CMember, CMember),
|
||||
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
|
||||
|
||||
%% Minority partition: A
|
||||
partition_3_node_cluster(Config1),
|
||||
|
||||
AMember = {rabbit_khepri:get_store_id(), A},
|
||||
Pong = ra:ping(AMember, 10000),
|
||||
ct:pal("Member A state: ~0p", [Pong]),
|
||||
case Pong of
|
||||
{pong, State}
|
||||
when State =:= follower orelse State =:= pre_vote ->
|
||||
Ret = rabbit_control_helper:command(
|
||||
forget_cluster_node, A, [atom_to_list(B)], []),
|
||||
?assertMatch({error, _, _}, Ret),
|
||||
{error, _, Msg} = Ret,
|
||||
?assertEqual(
|
||||
match,
|
||||
re:run(
|
||||
Msg, "Khepri cluster could be in minority",
|
||||
[{capture, none}]));
|
||||
{pong, await_condition} ->
|
||||
Ret = rabbit_control_helper:command(
|
||||
forget_cluster_node, A, [atom_to_list(B)], []),
|
||||
?assertMatch(ok, Ret);
|
||||
Ret ->
|
||||
ct:pal("A is not the expected leader: ~p", [Ret]),
|
||||
{skip, "Node A was not a leader"}
|
||||
end.
|
||||
|
||||
enable_feature_flag(Config) ->
|
||||
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
|
|
|||
|
|
@ -745,13 +745,13 @@ is_in_minority(Ret) ->
|
|||
?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])).
|
||||
|
||||
reset_last_disc_node(Config) ->
|
||||
Servers = [Rabbit, Hare | _] = cluster_members(Config),
|
||||
[Rabbit, Hare | _] = cluster_members(Config),
|
||||
|
||||
stop_app(Config, Hare),
|
||||
?assertEqual(ok, change_cluster_node_type(Config, Hare, ram)),
|
||||
start_app(Config, Hare),
|
||||
|
||||
case rabbit_ct_broker_helpers:enable_feature_flag(Config, Servers, khepri_db) of
|
||||
case rabbit_ct_broker_helpers:enable_feature_flag(Config, [Rabbit], khepri_db) of
|
||||
ok ->
|
||||
%% The reset works after the switch to Khepri because the RAM node was
|
||||
%% implicitly converted to a disc one as Khepri always writes data on disc.
|
||||
|
|
|
|||
|
|
@ -21,9 +21,7 @@
|
|||
all() ->
|
||||
[
|
||||
{group, non_parallel},
|
||||
{group, cluster_size_3},
|
||||
{group, cluster_size_5},
|
||||
{group, cluster_size_7}
|
||||
{group, discovery}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
|
@ -31,18 +29,24 @@ groups() ->
|
|||
{non_parallel, [], [
|
||||
no_nodes_configured
|
||||
]},
|
||||
{cluster_size_3, [], [
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]},
|
||||
{cluster_size_5, [], [
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]},
|
||||
{cluster_size_7, [], [
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]}
|
||||
{discovery, [],
|
||||
[
|
||||
{cluster_size_3, [],
|
||||
[
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]},
|
||||
{cluster_size_5, [],
|
||||
[
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]},
|
||||
{cluster_size_7, [],
|
||||
[
|
||||
successful_discovery,
|
||||
successful_discovery_with_a_subset_of_nodes_coming_online
|
||||
]}
|
||||
]}
|
||||
].
|
||||
|
||||
suite() ->
|
||||
|
|
@ -63,6 +67,24 @@ init_per_suite(Config) ->
|
|||
end_per_suite(Config) ->
|
||||
rabbit_ct_helpers:run_teardown_steps(Config).
|
||||
|
||||
init_per_group(discovery, Config) ->
|
||||
case rabbit_ct_helpers:is_mixed_versions(Config) of
|
||||
false ->
|
||||
Config;
|
||||
true ->
|
||||
%% We can't support the creation of a cluster because peer
|
||||
%% discovery might select a newer node as the seed node and ask an
|
||||
%% older node to join it. The creation of the cluster may fail of
|
||||
%% the cluster might be degraded. Examples:
|
||||
%% - a feature flag is enabled by the newer node but the older
|
||||
%% node doesn't know it
|
||||
%% - the newer node uses a newer Khepri machine version and the
|
||||
%% older node can join but won't be able to apply Khepri
|
||||
%% commands and progress.
|
||||
{skip,
|
||||
"Peer discovery is unsupported with a mix of old and new "
|
||||
"RabbitMQ versions"}
|
||||
end;
|
||||
init_per_group(cluster_size_3 = Group, Config) ->
|
||||
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}, {group, Group}]);
|
||||
init_per_group(cluster_size_5 = Group, Config) ->
|
||||
|
|
|
|||
|
|
@ -298,6 +298,9 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
|
|||
init_per_testcase(Testcase, Config) ->
|
||||
ClusterSize = ?config(rmq_nodes_count, Config),
|
||||
IsMixed = rabbit_ct_helpers:is_mixed_versions(),
|
||||
SameKhepriMacVers = (
|
||||
rabbit_ct_broker_helpers:do_nodes_run_same_ra_machine_version(
|
||||
Config, khepri_machine)),
|
||||
case Testcase of
|
||||
node_removal_is_not_quorum_critical when IsMixed ->
|
||||
{skip, "node_removal_is_not_quorum_critical isn't mixed versions compatible"};
|
||||
|
|
@ -325,6 +328,9 @@ init_per_testcase(Testcase, Config) ->
|
|||
leader_locator_balanced_random_maintenance when IsMixed ->
|
||||
{skip, "leader_locator_balanced_random_maintenance isn't mixed versions compatible because "
|
||||
"delete_declare isn't mixed versions reliable"};
|
||||
leadership_takeover when not SameKhepriMacVers ->
|
||||
{skip, "leadership_takeover will fail with a mix of Khepri state "
|
||||
"machine versions"};
|
||||
reclaim_memory_with_wrong_queue_type when IsMixed ->
|
||||
{skip, "reclaim_memory_with_wrong_queue_type isn't mixed versions compatible"};
|
||||
peek_with_wrong_queue_type when IsMixed ->
|
||||
|
|
@ -2063,7 +2069,7 @@ recover_from_single_failure(Config) ->
|
|||
wait_for_messages_pending_ack(Servers, RaName, 0).
|
||||
|
||||
recover_from_multiple_failures(Config) ->
|
||||
[Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
[Server1, Server, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||
QQ = ?config(queue_name, Config),
|
||||
|
|
@ -2360,7 +2366,7 @@ channel_handles_ra_event(Config) ->
|
|||
?assertEqual(2, basic_get_tag(Ch1, Q2, false)).
|
||||
|
||||
declare_during_node_down(Config) ->
|
||||
[Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
|
||||
[DownServer, Server, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
stop_node(Config, DownServer),
|
||||
|
|
@ -2692,7 +2698,7 @@ delete_member_member_already_deleted(Config) ->
|
|||
ok.
|
||||
|
||||
delete_member_during_node_down(Config) ->
|
||||
[Server, DownServer, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs(
|
||||
[DownServer, Server, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
|
||||
stop_node(Config, DownServer),
|
||||
|
|
@ -2747,7 +2753,7 @@ cleanup_data_dir(Config) ->
|
|||
%% trying to delete a queue in minority. A case clause there had gone
|
||||
%% previously unnoticed.
|
||||
|
||||
[Server1, Server2, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
[Server2, Server1, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
|
||||
QQ = ?config(queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
|
|
@ -3594,7 +3600,12 @@ format(Config) ->
|
|||
%% tests rabbit_quorum_queue:format/2
|
||||
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
||||
Server = hd(Nodes),
|
||||
Server = case Nodes of
|
||||
[N] ->
|
||||
N;
|
||||
[_, N | _] ->
|
||||
N
|
||||
end,
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||
Q = ?config(queue_name, Config),
|
||||
|
|
@ -3613,7 +3624,9 @@ format(Config) ->
|
|||
?FUNCTION_NAME, [QRecord, #{}]),
|
||||
|
||||
%% test all up case
|
||||
?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt)),
|
||||
?assertMatch(
|
||||
T when T =:= <<"quorum">> orelse T =:= quorum,
|
||||
proplists:get_value(type, Fmt)),
|
||||
?assertEqual(running, proplists:get_value(state, Fmt)),
|
||||
?assertEqual(Server, proplists:get_value(leader, Fmt)),
|
||||
?assertEqual(Server, proplists:get_value(node, Fmt)),
|
||||
|
|
@ -3622,15 +3635,17 @@ format(Config) ->
|
|||
|
||||
case length(Nodes) of
|
||||
3 ->
|
||||
[_, Server2, Server3] = Nodes,
|
||||
ok = rabbit_control_helper:command(stop_app, Server2),
|
||||
[Server1, _Server2, Server3] = Nodes,
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(stop_app, Server3),
|
||||
|
||||
Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue,
|
||||
?FUNCTION_NAME, [QRecord, #{}]),
|
||||
ok = rabbit_control_helper:command(start_app, Server2),
|
||||
ok = rabbit_control_helper:command(start_app, Server1),
|
||||
ok = rabbit_control_helper:command(start_app, Server3),
|
||||
?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt2)),
|
||||
?assertMatch(
|
||||
T when T =:= <<"quorum">> orelse T =:= quorum,
|
||||
proplists:get_value(type, Fmt2)),
|
||||
?assertEqual(minority, proplists:get_value(state, Fmt2)),
|
||||
?assertEqual(Server, proplists:get_value(leader, Fmt2)),
|
||||
?assertEqual(Server, proplists:get_value(node, Fmt2)),
|
||||
|
|
|
|||
|
|
@ -540,50 +540,48 @@ add_replica(Config) ->
|
|||
QQuorum = <<Q/binary, "_quorum">>,
|
||||
|
||||
?assertEqual({'queue.declare_ok', Q, 0, 0},
|
||||
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
?assertEqual({'queue.declare_ok', QClassic, 0, 0},
|
||||
declare(Config, Server0, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
|
||||
declare(Config, Server1, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
|
||||
?assertEqual({'queue.declare_ok', QQuorum, 0, 0},
|
||||
declare(Config, Server0, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
declare(Config, Server1, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
|
||||
%% Not a member of the cluster, what would happen?
|
||||
?assertEqual({error, node_not_running},
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server0])),
|
||||
?assertEqual({error, classic_queue_not_supported},
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QClassic, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QClassic, Server0])),
|
||||
?assertEqual({error, quorum_queue_not_supported},
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QQuorum, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QQuorum, Server0])),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
|
||||
rabbit_control_helper:command(start_app, Server1),
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(
|
||||
Config, Server1, [Server0]),
|
||||
timer:sleep(1000),
|
||||
?assertEqual({error, classic_queue_not_supported},
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QClassic, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QClassic, Server0])),
|
||||
?assertEqual({error, quorum_queue_not_supported},
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QQuorum, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, QQuorum, Server0])),
|
||||
?assertEqual(ok,
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server1])),
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server0])),
|
||||
%% replicas must be recorded on the state, and if we publish messages then they must
|
||||
%% be stored on disk
|
||||
check_leader_and_replicas(Config, [Server0, Server1]),
|
||||
check_leader_and_replicas(Config1, [Server1, Server0]),
|
||||
%% And if we try again? Idempotent
|
||||
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server1])),
|
||||
?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server0])),
|
||||
%% Add another node
|
||||
ok = rabbit_control_helper:command(stop_app, Server2),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []),
|
||||
rabbit_control_helper:command(start_app, Server2),
|
||||
?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
Config2 = rabbit_ct_broker_helpers:cluster_nodes(
|
||||
Config1, Server1, [Server2]),
|
||||
?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server2])),
|
||||
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
check_leader_and_replicas(Config2, [Server0, Server1, Server2]),
|
||||
rabbit_ct_broker_helpers:rpc(Config2, Server1, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
||||
delete_replica(Config) ->
|
||||
[Server0, Server1, Server2] =
|
||||
|
|
@ -641,14 +639,9 @@ grow_then_shrink_coordinator_cluster(Config) ->
|
|||
Q = ?config(queue_name, Config),
|
||||
|
||||
?assertEqual({'queue.declare_ok', Q, 0, 0},
|
||||
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
|
||||
ok = rabbit_control_helper:command(start_app, Server1),
|
||||
ok = rabbit_control_helper:command(stop_app, Server2),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []),
|
||||
ok = rabbit_control_helper:command(start_app, Server2),
|
||||
_Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0, Server2]),
|
||||
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
|
|
@ -662,17 +655,17 @@ grow_then_shrink_coordinator_cluster(Config) ->
|
|||
end
|
||||
end, 60000),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server1)], []),
|
||||
ok = rabbit_control_helper:command(stop_app, Server0),
|
||||
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server0)], []),
|
||||
ok = rabbit_control_helper:command(stop_app, Server2),
|
||||
ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []),
|
||||
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)], []),
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
case rpc:call(Server0, ra, members,
|
||||
[{rabbit_stream_coordinator, Server0}]) of
|
||||
case rpc:call(Server1, ra, members,
|
||||
[{rabbit_stream_coordinator, Server1}]) of
|
||||
{_, Members, _} ->
|
||||
Nodes = lists:sort([N || {_, N} <- Members]),
|
||||
lists:sort([Server0]) == Nodes;
|
||||
lists:sort([Server1]) == Nodes;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
|
|
@ -685,29 +678,27 @@ grow_coordinator_cluster(Config) ->
|
|||
Q = ?config(queue_name, Config),
|
||||
|
||||
?assertEqual({'queue.declare_ok', Q, 0, 0},
|
||||
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
|
||||
rabbit_control_helper:command(start_app, Server1),
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]),
|
||||
%% at this point there _probably_ won't be a stream coordinator member on
|
||||
%% Server1
|
||||
|
||||
%% check we can add a new stream replica for the previously declare stream
|
||||
?assertEqual(ok,
|
||||
rpc:call(Server1, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server1])),
|
||||
rpc:call(Server0, rabbit_stream_queue, add_replica,
|
||||
[<<"/">>, Q, Server0])),
|
||||
%% also check we can declare a new stream when calling Server1
|
||||
Q2 = unicode:characters_to_binary([Q, <<"_2">>]),
|
||||
?assertEqual({'queue.declare_ok', Q2, 0, 0},
|
||||
declare(Config, Server1, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
declare(Config1, Server0, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
|
||||
%% wait until the stream coordinator detects there is a new rabbit node
|
||||
%% and adds a new member on the new node
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
case rpc:call(Server0, ra, members,
|
||||
[{rabbit_stream_coordinator, Server0}]) of
|
||||
case rpc:call(Server1, ra, members,
|
||||
[{rabbit_stream_coordinator, Server1}]) of
|
||||
{_, Members, _} ->
|
||||
Nodes = lists:sort([N || {_, N} <- Members]),
|
||||
lists:sort([Server0, Server1]) == Nodes;
|
||||
|
|
@ -715,7 +706,7 @@ grow_coordinator_cluster(Config) ->
|
|||
false
|
||||
end
|
||||
end, 60000),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
||||
shrink_coordinator_cluster(Config) ->
|
||||
[Server0, Server1, Server2] =
|
||||
|
|
@ -981,19 +972,17 @@ consume_without_local_replica(Config) ->
|
|||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Q = ?config(queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', Q, 0, 0},
|
||||
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
%% Add another node to the cluster, but it won't have a replica
|
||||
ok = rabbit_control_helper:command(stop_app, Server1),
|
||||
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
|
||||
rabbit_control_helper:command(start_app, Server1),
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]),
|
||||
timer:sleep(1000),
|
||||
|
||||
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
|
||||
Ch1 = rabbit_ct_client_helpers:open_channel(Config1, Server0),
|
||||
qos(Ch1, 10, false),
|
||||
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
|
||||
amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
|
||||
self())),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
||||
consume(Config) ->
|
||||
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
|
|
|||
|
|
@ -104,13 +104,13 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) ->
|
|||
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
|
||||
Path = khepri_consistent_hash_path(Src),
|
||||
case rabbit_khepri:adv_get(Path) of
|
||||
{ok, #{data := Chx0, payload_version := DVersion}} ->
|
||||
{ok, #{Path := #{data := Chx0, payload_version := Vsn}}} ->
|
||||
case UpdateFun(Chx0, Dst, Weight) of
|
||||
already_exists ->
|
||||
already_exists;
|
||||
Chx ->
|
||||
Path1 = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = DVersion}]),
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
Ret2 = rabbit_khepri:put(Path1, Chx),
|
||||
case Ret2 of
|
||||
ok ->
|
||||
|
|
|
|||
|
|
@ -981,12 +981,17 @@ cluster_nodes(Config, Nodes) when is_list(Nodes) ->
|
|||
[Nodename]),
|
||||
cluster_nodes1(Config, SecNodeConfig, NodeConfigs1);
|
||||
false ->
|
||||
[NodeConfig | NodeConfigs1] = NodeConfigs,
|
||||
Nodename = ?config(nodename, NodeConfig),
|
||||
ct:pal(
|
||||
"Using node ~s as the cluster seed node",
|
||||
[Nodename]),
|
||||
cluster_nodes1(Config, NodeConfig, NodeConfigs1)
|
||||
case NodeConfigs of
|
||||
[NodeConfig, SeedNodeConfig | NodeConfigs1] ->
|
||||
Nodename = ?config(nodename, SeedNodeConfig),
|
||||
ct:pal(
|
||||
"Using node ~s as the cluster seed node",
|
||||
[Nodename]),
|
||||
cluster_nodes1(
|
||||
Config, SeedNodeConfig, [NodeConfig | NodeConfigs1]);
|
||||
[_] ->
|
||||
Config
|
||||
end
|
||||
end;
|
||||
cluster_nodes(Config, SeedNode) ->
|
||||
Nodenames = get_node_configs(Config, nodename),
|
||||
|
|
|
|||
|
|
@ -660,7 +660,7 @@ child_id_format(Config) ->
|
|||
%%
|
||||
%% After that, the supervisors run on the new code.
|
||||
Config2 = rabbit_ct_broker_helpers:cluster_nodes(
|
||||
Config1, [OldNodeA, NewNodeB, NewNodeD]),
|
||||
Config1, OldNodeA, [NewNodeB, NewNodeD]),
|
||||
ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeA),
|
||||
ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNodeA),
|
||||
ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeC),
|
||||
|
|
|
|||
|
|
@ -108,9 +108,9 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
|
|||
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
|
||||
Path = khepri_jms_topic_exchange_path(XName),
|
||||
case rabbit_khepri:adv_get(Path) of
|
||||
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
|
||||
{ok, #{Path := #{data := BindingFuns, payload_version := Vsn}}} ->
|
||||
Path1 = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = DVersion}]),
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
|
||||
case Ret of
|
||||
ok -> ok;
|
||||
|
|
|
|||
|
|
@ -222,9 +222,14 @@ end_per_testcase(Testcase, Config) ->
|
|||
end_per_testcase0(Testcase, Config) ->
|
||||
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
|
||||
%% Assert that every testcase cleaned up their MQTT sessions.
|
||||
_ = rpc(Config, ?MODULE, delete_queues, []),
|
||||
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
delete_queues() ->
|
||||
[catch rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|
||||
|| Q <- rabbit_amqqueue:list()].
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Testsuite cases
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
@ -315,7 +320,7 @@ decode_basic_properties(Config) ->
|
|||
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
|
||||
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
|
||||
?assertEqual(1, length(QuorumQueues)),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
|
||||
routing_key = Topic},
|
||||
#amqp_msg{payload = Payload}),
|
||||
|
|
@ -323,7 +328,8 @@ decode_basic_properties(Config) ->
|
|||
ok = emqtt:disconnect(C1),
|
||||
C2 = connect(ClientId, Config, [{clean_start, true}]),
|
||||
ok = emqtt:disconnect(C2),
|
||||
ok = rpc(Config, application, unset_env, [App, Par]).
|
||||
ok = rpc(Config, application, unset_env, [App, Par]),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
quorum_queue_rejects(Config) ->
|
||||
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
|
|
@ -376,7 +382,7 @@ publish_to_all_queue_types_qos1(Config) ->
|
|||
publish_to_all_queue_types(Config, qos1).
|
||||
|
||||
publish_to_all_queue_types(Config, QoS) ->
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
|
||||
CQ = <<"classic-queue">>,
|
||||
QQ = <<"quorum-queue">>,
|
||||
|
|
@ -428,7 +434,8 @@ publish_to_all_queue_types(Config, QoS) ->
|
|||
delete_queue(Ch, [CQ, QQ, SQ]),
|
||||
ok = emqtt:disconnect(C),
|
||||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000).
|
||||
all_connection_pids(Config), 10_000, 1000),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
publish_to_all_non_deprecated_queue_types_qos0(Config) ->
|
||||
publish_to_all_non_deprecated_queue_types(Config, qos0).
|
||||
|
|
@ -437,7 +444,7 @@ publish_to_all_non_deprecated_queue_types_qos1(Config) ->
|
|||
publish_to_all_non_deprecated_queue_types(Config, qos1).
|
||||
|
||||
publish_to_all_non_deprecated_queue_types(Config, QoS) ->
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
|
||||
CQ = <<"classic-queue">>,
|
||||
QQ = <<"quorum-queue">>,
|
||||
|
|
@ -487,7 +494,8 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
|
|||
delete_queue(Ch, [CQ, QQ, SQ]),
|
||||
ok = emqtt:disconnect(C),
|
||||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000).
|
||||
all_connection_pids(Config), 10_000, 1000),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
%% This test case does not require multiple nodes
|
||||
%% but it is grouped together with flow test cases for other queue types
|
||||
|
|
@ -519,7 +527,7 @@ flow(Config, {App, Par, Val}, QueueType)
|
|||
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
|
||||
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
QueueName = Topic = atom_to_binary(?FUNCTION_NAME),
|
||||
declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]),
|
||||
bind(Ch, QueueName, Topic),
|
||||
|
|
@ -547,7 +555,8 @@ flow(Config, {App, Par, Val}, QueueType)
|
|||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000),
|
||||
?assertEqual(Result,
|
||||
rpc_all(Config, application, set_env, [App, Par, DefaultVal])).
|
||||
rpc_all(Config, application, set_env, [App, Par, DefaultVal])),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
events(Config) ->
|
||||
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
|
||||
|
|
@ -791,9 +800,10 @@ queue_down_qos1(Config) ->
|
|||
ok = rabbit_ct_broker_helpers:start_node(Config, 1)
|
||||
end,
|
||||
|
||||
Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
{Conn, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
delete_queue(Ch0, CQ),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch0).
|
||||
|
||||
%% Consuming classic queue on a different node goes down.
|
||||
consuming_classic_queue_down(Config) ->
|
||||
|
|
@ -832,7 +842,7 @@ consuming_classic_queue_down(Config) ->
|
|||
ok.
|
||||
|
||||
delete_create_queue(Config) ->
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
CQ1 = <<"classic-queue-1-delete-create">>,
|
||||
CQ2 = <<"classic-queue-2-delete-create">>,
|
||||
QQ = <<"quorum-queue-delete-create">>,
|
||||
|
|
@ -892,7 +902,8 @@ delete_create_queue(Config) ->
|
|||
1000, 10),
|
||||
|
||||
delete_queue(Ch, [CQ1, CQ2, QQ]),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
session_expiry(Config) ->
|
||||
App = rabbitmq_mqtt,
|
||||
|
|
@ -1088,7 +1099,7 @@ large_message_amqp_to_mqtt(Config) ->
|
|||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
Payload0 = binary:copy(<<"x">>, 8_000_000),
|
||||
Payload = <<Payload0/binary, "y">>,
|
||||
amqp_channel:call(Ch,
|
||||
|
|
@ -1096,20 +1107,22 @@ large_message_amqp_to_mqtt(Config) ->
|
|||
routing_key = Topic},
|
||||
#amqp_msg{payload = Payload}),
|
||||
ok = expect_publishes(C, Topic, [Payload]),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
amqp_to_mqtt_qos0(Config) ->
|
||||
Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
amqp_channel:call(Ch,
|
||||
#'basic.publish'{exchange = <<"amq.topic">>,
|
||||
routing_key = Topic},
|
||||
#amqp_msg{payload = Payload}),
|
||||
ok = expect_publishes(C, Topic, [Payload]),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
%% Packet identifier is a non zero two byte integer.
|
||||
%% Test that the server wraps around the packet identifier.
|
||||
|
|
@ -1590,7 +1603,7 @@ rabbit_status_connection_count(Config) ->
|
|||
trace(Config) ->
|
||||
Server = atom_to_binary(get_node_config(Config, 0, nodename)),
|
||||
Topic = Payload = TraceQ = atom_to_binary(?FUNCTION_NAME),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
declare_queue(Ch, TraceQ, []),
|
||||
#'queue.bind_ok'{} = amqp_channel:call(
|
||||
Ch, #'queue.bind'{queue = TraceQ,
|
||||
|
|
@ -1645,11 +1658,12 @@ trace(Config) ->
|
|||
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
|
||||
|
||||
delete_queue(Ch, TraceQ),
|
||||
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]].
|
||||
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]],
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
trace_large_message(Config) ->
|
||||
TraceQ = <<"trace-queue">>,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
declare_queue(Ch, TraceQ, []),
|
||||
#'queue.bind_ok'{} = amqp_channel:call(
|
||||
Ch, #'queue.bind'{queue = TraceQ,
|
||||
|
|
@ -1674,7 +1688,8 @@ trace_large_message(Config) ->
|
|||
|
||||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]),
|
||||
delete_queue(Ch, TraceQ),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
max_packet_size_unauthenticated(Config) ->
|
||||
ClientId = ?FUNCTION_NAME,
|
||||
|
|
@ -1765,7 +1780,7 @@ default_queue_type(Config) ->
|
|||
incoming_message_interceptors(Config) ->
|
||||
Key = ?FUNCTION_NAME,
|
||||
ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
|
||||
CQName = <<"my classic queue">>,
|
||||
Stream = <<"my stream">>,
|
||||
|
|
@ -1813,7 +1828,8 @@ incoming_message_interceptors(Config) ->
|
|||
delete_queue(Ch, Stream),
|
||||
delete_queue(Ch, CQName),
|
||||
true = rpc(Config, persistent_term, erase, [Key]),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
%% This test makes sure that a retained message that got written in 3.12 or earlier
|
||||
%% can be consumed in 3.13 or later.
|
||||
|
|
@ -1853,7 +1869,7 @@ bind_exchange_to_exchange(Config) ->
|
|||
SourceX = <<"amq.topic">>,
|
||||
DestinationX = <<"destination">>,
|
||||
Q = <<"q">>,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
|
||||
durable = true,
|
||||
auto_delete = true}),
|
||||
|
|
@ -1871,13 +1887,14 @@ bind_exchange_to_exchange(Config) ->
|
|||
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
|
||||
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
|
||||
#'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
bind_exchange_to_exchange_single_message(Config) ->
|
||||
SourceX = <<"amq.topic">>,
|
||||
DestinationX = <<"destination">>,
|
||||
Q = <<"q">>,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
|
||||
durable = true,
|
||||
auto_delete = true}),
|
||||
|
|
@ -1904,7 +1921,8 @@ bind_exchange_to_exchange_single_message(Config) ->
|
|||
timer:sleep(10),
|
||||
?assertEqual(#'queue.delete_ok'{message_count = 0},
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
|
||||
ok = emqtt:disconnect(C).
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Internal helpers
|
||||
|
|
@ -1936,7 +1954,7 @@ await_confirms_unordered(From, Left) ->
|
|||
end.
|
||||
|
||||
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||
QueueName = rabbit_mqtt_util:queue_name_bin(
|
||||
rabbit_data_coercion:to_binary(ClientId), QoS),
|
||||
eventually(
|
||||
|
|
|
|||
|
|
@ -106,10 +106,10 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
|
|||
insert_in_khepri(XName, Message, Length) ->
|
||||
Path = khepri_recent_history_path(XName),
|
||||
case rabbit_khepri:adv_get(Path) of
|
||||
{ok, #{data := Cached0, payload_version := DVersion}} ->
|
||||
{ok, #{Path := #{data := Cached0, payload_version := Vsn}}} ->
|
||||
Cached = add_to_cache(Cached0, Message, Length),
|
||||
Path1 = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = DVersion}]),
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
Ret = rabbit_khepri:put(Path1, Cached),
|
||||
case Ret of
|
||||
ok ->
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ child_id_format(Config) ->
|
|||
%% Node 4: the secondary umbrella
|
||||
%% ...
|
||||
%%
|
||||
%% Therefore, `Pouet' will use the primary copy, `OldNode' the secondary
|
||||
%% Therefore, `NewNode' will use the primary copy, `OldNode' the secondary
|
||||
%% umbrella, `NewRefNode' the primary copy, and `NodeWithQueues' the
|
||||
%% secondary umbrella.
|
||||
|
||||
|
|
@ -221,7 +221,7 @@ child_id_format(Config) ->
|
|||
%% After that, the supervisors run on the new code.
|
||||
ct:pal("Clustering nodes ~s and ~s", [OldNode, NewNode]),
|
||||
Config1 = rabbit_ct_broker_helpers:cluster_nodes(
|
||||
Config, [OldNode, NewNode]),
|
||||
Config, OldNode, [NewNode]),
|
||||
ok = rabbit_ct_broker_helpers:stop_broker(Config1, OldNode),
|
||||
ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNode),
|
||||
|
||||
|
|
|
|||
|
|
@ -46,8 +46,8 @@ dep_credentials_obfuscation = hex 3.5.0
|
|||
dep_cuttlefish = hex 3.4.0
|
||||
dep_gen_batch_server = hex 0.8.8
|
||||
dep_jose = hex 1.11.10
|
||||
dep_khepri = hex 0.16.0
|
||||
dep_khepri_mnesia_migration = hex 0.7.2
|
||||
dep_khepri = hex 0.17.1
|
||||
dep_khepri_mnesia_migration = hex 0.8.0
|
||||
dep_meck = hex 1.0.0
|
||||
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.6
|
||||
dep_prometheus = hex 4.11.0
|
||||
|
|
|
|||
Loading…
Reference in New Issue