Add advertised host/port settings

This allows to communicate whatever it's appropriate to clients when
they issue the metadata command.
This commit is contained in:
Arnaud Cogoluègnes 2020-05-28 10:47:30 +02:00
parent 2f8559ae41
commit efc6fbc089
4 changed files with 65 additions and 21 deletions

View File

@ -12,7 +12,9 @@ define PROJECT_ENV
{initial_credits, 50000},
{credits_required_for_unblocking, 12500},
{frame_max, 131072},
{heartbeat, 600}
{heartbeat, 600},
{advertised_host, undefined},
{advertised_port, undefined}
]
endef

View File

@ -17,7 +17,7 @@
-module(rabbit_stream).
-behaviour(application).
-export([start/2, port/0]).
-export([start/2, host/0, port/0]).
-export([stop/1]).
-include_lib("rabbit_common/include/rabbit.hrl").
@ -25,7 +25,24 @@
start(_Type, _Args) ->
rabbit_stream_sup:start_link().
host() ->
case application:get_env(rabbitmq_stream, advertised_host, undefined) of
undefined ->
{ok, Host} = inet:gethostname(),
list_to_binary(Host);
Host ->
rabbit_data_coercion:to_binary(Host)
end.
port() ->
case application:get_env(rabbitmq_stream, advertised_port, undefined) of
undefined ->
port_from_listener();
Port ->
Port
end.
port_from_listener() ->
Listeners = rabbit_networking:node_listeners(node()),
Port = lists:foldl(fun(#listener{port = Port, protocol = stream}, _Acc) ->
Port;
@ -34,6 +51,5 @@ port() ->
end, undefined, Listeners),
Port.
stop(_State) ->
ok.

View File

@ -21,7 +21,7 @@
%% API
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([start_link/1, create/4, register/0, delete/3, lookup/2, unregister/0]).
-export([start_link/1, create/4, register/0, delete/3, lookup/2, topology/2, unregister/0]).
-record(state, {
configuration, listeners, monitors
@ -48,6 +48,9 @@ unregister() ->
lookup(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup, VirtualHost, Stream}).
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
@ -135,6 +138,20 @@ handle_call({lookup, VirtualHost, Stream}, _From, State) ->
_ ->
cluster_not_found
end,
{reply, Res, State};
handle_call({topology, VirtualHost, Stream}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
{ok, maps:with([leader_node, replica_nodes], amqqueue:get_type_state(Q))};
_ ->
{error, stream_not_found}
end;
_ ->
{error, stream_not_found}
end,
{reply, Res, State}.
handle_cast(_, State) ->

View File

@ -544,7 +544,7 @@ handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers} = St
end;
handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username}} = State,
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,
_ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) ->
_ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) ->
Arguments = parse_arguments(ArgumentsBinary),
case rabbit_stream_manager:create(VirtualHost, Stream, Arguments, Username) of
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
@ -582,14 +582,26 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
response(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR),
{State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost} = State,
<<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, Rest) ->
%% FIXME: rely only on rabbit_networking to discover the listeners
Nodes = rabbit_mnesia:cluster_nodes(all),
Streams = extract_stream_list(BinaryStreams, []),
%% get the nodes involved in the streams
NodesMap = lists:foldl(fun(Stream, Acc) ->
case rabbit_stream_manager:topology(VirtualHost, Stream) of
{ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
Acc1 = maps:put(LeaderNode, ok, Acc),
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);
{error, _} ->
Acc
end
end, #{}, Streams),
Nodes = maps:keys(NodesMap),
{NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
{ok, Host} = rpc:call(Node, inet, gethostname, []),
Host = rpc:call(Node, rabbit_stream, host, []),
Port = rpc:call(Node, rabbit_stream, port, []),
{Acc#{Node => {{index, Index}, {host, list_to_binary(Host)}, {port, Port}}}, Index + 1}
{Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}}, Index + 1}
end, {#{}, 0}, Nodes),
BrokersCount = length(Nodes),
@ -598,25 +610,22 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
<<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
end, <<BrokersCount:32>>, NodesInfo),
Streams = extract_stream_list(BinaryStreams, []),
MetadataBin = lists:foldl(fun(Stream, Acc) ->
StreamLength = byte_size(Stream),
case lookup_cluster(Stream, State) of
cluster_not_found ->
case rabbit_stream_manager:topology(VirtualHost, Stream) of
{error, stream_not_found} ->
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
-1:16, 0:32>>;
{Cluster, _} ->
%% FIXME get replicas and leader from amqqueue record
LeaderNode = node(Cluster),
{ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} ->
#{LeaderNode := NodeInfo} = NodesInfo,
{{index, LeaderIndex}, {host, _}, {port, _}} = NodeInfo,
Replicas = maps:without([LeaderNode], NodesInfo),
ReplicasBinary = lists:foldl(fun(NI, Bin) ->
ReplicasBinary = lists:foldl(fun(Replica, Bin) ->
#{Replica := NI} = NodesInfo,
{{index, ReplicaIndex}, {host, _}, {port, _}} = NI,
<<Bin/binary, ReplicaIndex:16>>
end, <<>>, maps:values(Replicas)),
ReplicasCount = maps:size(Replicas),
end, <<>>, Replicas),
ReplicasCount = length(Replicas),
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16,
LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
@ -653,7 +662,7 @@ parse_arguments(Arguments) ->
parse_arguments(Acc, <<>>) ->
Acc;
parse_arguments(Acc, <<KeySize:16,Key:KeySize/binary,ValueSize:16,Value:ValueSize/binary, Rest/binary>>) ->
parse_arguments(Acc, <<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>) ->
parse_arguments(maps:put(Key, Value, Acc), Rest).