From 35b5ab3cdcc5b5b3950a108e3de707d2208c421a Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 7 Apr 2025 14:50:48 +0200 Subject: [PATCH] Determine queue topology without checking queue type ## What? This commit determines the queue topology without checking the queue type. ## Why? This way, checking leader and replicas works the same across all queue types without the need to introduce other rabbit_queue_type behaviour as suggested in other PRs. ## How? pid is the leader, nodes in queue_type_states are the members/replicas. This commit results in an unknown stream leader during queue declaration. However the correct leader will be returned eventually when calling GET on the stream. --- deps/rabbit/src/rabbit_amqp_management.erl | 61 +++++++------------ .../test/management_SUITE.erl | 24 ++++++-- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 0c4459678b..027821898c 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -444,55 +444,40 @@ encode_queue(Q, NumMsgs, NumConsumers) -> ShortName -> ShortName end}}, - {{utf8, <<"arguments">>}, QArgs} + {{utf8, <<"arguments">>}, QArgs}, + {{utf8, <<"replicas">>}, + {array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]} + } ], - KVList1 = if is_list(Replicas) -> - [{{utf8, <<"replicas">>}, - {array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]} - } | KVList0]; - Replicas =:= undefined -> - KVList0 - end, KVList = case Leader of - undefined -> - KVList1; + none -> + KVList0; _ -> [{{utf8, <<"leader">>}, {utf8, atom_to_binary(Leader)} - } | KVList1] + } | KVList0] end, {map, KVList}. %% The returned Replicas contain both online and offline replicas. -spec queue_topology(amqqueue:amqqueue()) -> - {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}. + {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> - case amqqueue:get_type(Q) of - rabbit_quorum_queue -> - [{leader, Leader0}, - {members, Members}] = rabbit_queue_type:info(Q, [leader, members]), - Leader = case Leader0 of - '' -> undefined; - _ -> Leader0 - end, - {Leader, Members}; - rabbit_stream_queue -> - #{name := StreamId} = amqqueue:get_type_state(Q), - case rabbit_stream_coordinator:members(StreamId) of - {ok, Members} -> - maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) -> - {Node, [Node | Replicas]}; - (Node, {_Pid, replica}, {Writer, Replicas}) -> - {Writer, [Node | Replicas]} - end, {undefined, []}, Members); - {error, _} -> - {undefined, undefined} - end; - _ -> - Pid = amqqueue:get_pid(Q), - Node = node(Pid), - {Node, [Node]} - end. + Leader = case amqqueue:get_pid(Q) of + {_RaName, Node} -> + Node; + none -> + none; + Pid -> + node(Pid) + end, + Replicas = case amqqueue:get_type_state(Q) of + #{nodes := Nodes} -> + Nodes; + _ -> + [Leader] + end, + {Leader, Replicas}. decode_exchange({map, KVList}) -> M = lists:foldl( diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index 42343270d5..952c659e97 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -803,17 +803,29 @@ queue_topology(Config) -> {ok, QQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, QQName, QQProps), {ok, SQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, SQName, SQProps), - %% The default queue leader strategy is client-local. - ?assertEqual({ok, N0}, maps:find(leader, CQInfo0)), - ?assertEqual({ok, N0}, maps:find(leader, QQInfo0)), - ?assertEqual({ok, N0}, maps:find(leader, SQInfo0)), - ?assertEqual({ok, [N0]}, maps:find(replicas, CQInfo0)), {ok, QQReplicas0} = maps:find(replicas, QQInfo0), ?assertEqual(Nodes, lists:usort(QQReplicas0)), {ok, SQReplicas0} = maps:find(replicas, SQInfo0), ?assertEqual(Nodes, lists:usort(SQReplicas0)), + %% The default queue leader strategy is client-local. + ?assertEqual({ok, N0}, maps:find(leader, CQInfo0)), + eventually( + ?_assert( + begin + {ok, QQInfo1} = rabbitmq_amqp_client:get_queue(LinkPair0, QQName), + {ok, SQInfo1} = rabbitmq_amqp_client:get_queue(LinkPair0, SQName), + QQLeader = maps:get(leader, QQInfo1), + SQLeader = maps:get(leader, SQInfo1), + ct:pal("quorum queue leader: ~s~n" + "stream leader: ~s", + [QQLeader, SQLeader]), + QQLeader =:= N0 andalso + SQLeader =:= N0 + end + ), 2000, 5), + ok = cleanup(Init0), ok = rabbit_ct_broker_helpers:stop_node(Config, 0), @@ -841,7 +853,7 @@ queue_topology(Config) -> (QQLeader =:= N1 orelse QQLeader =:= N2) andalso (SQLeader =:= N1 orelse SQLeader =:= N2) end - ), 1000, 5), + ), 2000, 5), ok = rabbit_ct_broker_helpers:start_node(Config, 0), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, CQName),