Queues with plugins - sync with queue topologies updates

This commit is contained in:
Iliia Khaprov 2025-05-07 12:55:24 +02:00
parent 34f0d12dab
commit c12c76ae45
No known key found for this signature in database
GPG Key ID: 4DCFF8F358E49AED
3 changed files with 18 additions and 14 deletions

View File

@ -702,7 +702,7 @@ send_queue_event(Pid, QName, Event) ->
gen_server:cast(Pid, {queue_event, QName, Event}). gen_server:cast(Pid, {queue_event, QName, Event}).
-spec queue_topology(amqqueue:amqqueue()) -> -spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}. {Leader :: node() | none, Replicas :: [node(),...]}.
queue_topology(Q) -> queue_topology(Q) ->
Pid = amqqueue:get_pid(Q), Pid = amqqueue:get_pid(Q),
Node = node(Pid), Node = node(Pid),

View File

@ -2253,7 +2253,7 @@ maybe_log_leader_health_check_result(Result) ->
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]). rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
-spec queue_topology(amqqueue:amqqueue()) -> -spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}. {Leader :: node() | none, Replicas :: [node(),...]}.
queue_topology(Q) -> queue_topology(Q) ->
Leader = case amqqueue:get_pid(Q) of Leader = case amqqueue:get_pid(Q) of
{_RaName, Node} -> {_RaName, Node} ->

View File

@ -1430,19 +1430,23 @@ delivery_count_add(Count, N) ->
serial_number:add(Count, N). serial_number:add(Count, N).
-spec queue_topology(amqqueue:amqqueue()) -> -spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}. {Leader :: node() | none, Replicas :: [node(),...]}.
queue_topology(Q) -> queue_topology(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q), Leader = case amqqueue:get_pid(Q) of
case rabbit_stream_coordinator:members(StreamId) of {_RaName, Node} ->
{ok, Members} -> Node;
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) -> none ->
{Node, [Node | Replicas]}; none;
(Node, {_Pid, replica}, {Writer, Replicas}) -> Pid ->
{Writer, [Node | Replicas]} node(Pid)
end, {undefined, []}, Members); end,
{error, _} -> Replicas = case amqqueue:get_type_state(Q) of
{undefined, undefined} #{nodes := Nodes} ->
end. Nodes;
_ ->
[Leader]
end,
{Leader, Replicas}.
policy_apply_to_name() -> policy_apply_to_name() ->
<<"streams">>. <<"streams">>.