Ignore non-responding brokers in metadata command
This commit is contained in:
parent
1d3978ae40
commit
4a05b6b62a
|
|
@ -745,10 +745,16 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
|
|||
{NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
|
||||
Host = rpc:call(Node, rabbit_stream, host, []),
|
||||
Port = rpc:call(Node, rabbit_stream, port, []),
|
||||
{Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}}, Index + 1}
|
||||
case {is_binary(Host), is_integer(Port)} of
|
||||
{true, true} ->
|
||||
{Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}}, Index + 1};
|
||||
_ ->
|
||||
rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n", [Host, Port]),
|
||||
{Acc, Index}
|
||||
end
|
||||
end, {#{}, 0}, Nodes),
|
||||
|
||||
BrokersCount = length(Nodes),
|
||||
BrokersCount = map_size(NodesInfo),
|
||||
BrokersBin = maps:fold(fun(_K, {{index, Index}, {host, Host}, {port, Port}}, Acc) ->
|
||||
HostLength = byte_size(Host),
|
||||
<<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
|
||||
|
|
@ -762,15 +768,24 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
|
|||
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
|
||||
-1:16, 0:32>>;
|
||||
{ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} ->
|
||||
#{LeaderNode := NodeInfo} = NodesInfo,
|
||||
{{index, LeaderIndex}, {host, _}, {port, _}} = NodeInfo,
|
||||
ReplicasBinary = lists:foldl(fun(Replica, Bin) ->
|
||||
#{Replica := NI} = NodesInfo,
|
||||
{{index, ReplicaIndex}, {host, _}, {port, _}} = NI,
|
||||
<<Bin/binary, ReplicaIndex:16>>
|
||||
end, <<>>, Replicas),
|
||||
ReplicasCount = length(Replicas),
|
||||
LeaderIndex = case NodesInfo of
|
||||
#{LeaderNode := NodeInfo} ->
|
||||
{{index, LeaderIdx}, {host, _}, {port, _}} = NodeInfo,
|
||||
LeaderIdx;
|
||||
_ ->
|
||||
-1
|
||||
end,
|
||||
{ReplicasBinary, ReplicasCount} = lists:foldl(fun(Replica, {Bin, Count}) ->
|
||||
case NodesInfo of
|
||||
#{Replica := NI} ->
|
||||
{{index, ReplicaIndex}, {host, _}, {port, _}} = NI,
|
||||
{<<Bin/binary, ReplicaIndex:16>>, Count + 1};
|
||||
_ ->
|
||||
{Bin, Count}
|
||||
end
|
||||
|
||||
|
||||
end, {<<>>, 0}, Replicas),
|
||||
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16,
|
||||
LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in New Issue