Use advertised TLS host setting in metadata frame

The rabbitmq_stream.advertised_tls_host setting is not used in the
metadata frame of the stream protocol, even if it is set. This commit
makes sure the setting is used if set.

References rabbitmq/rabbitmq-stream-java-client#803
This commit is contained in:
Arnaud Cogoluègnes 2025-08-08 12:33:52 +00:00
parent 28839628f5
commit 22a959331b
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
4 changed files with 198 additions and 91 deletions

View File

@ -38,6 +38,7 @@
-include("rabbit_stream_metrics.hrl").
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
start(_Type, _Args) ->
rabbit_stream_metrics:init(),
@ -48,7 +49,7 @@ start(_Type, _Args) ->
rabbit_stream_sup:start_link().
tls_host() ->
case application:get_env(rabbitmq_stream, advertised_tls_host,
case application:get_env(rabbitmq_stream, ?K_AD_TLS_HOST,
undefined)
of
undefined ->
@ -58,7 +59,7 @@ tls_host() ->
end.
host() ->
case application:get_env(rabbitmq_stream, advertised_host, undefined)
case application:get_env(rabbitmq_stream, ?K_AD_HOST, undefined)
of
undefined ->
hostname_from_node();
@ -79,7 +80,7 @@ hostname_from_node() ->
end.
port() ->
case application:get_env(rabbitmq_stream, advertised_port, undefined)
case application:get_env(rabbitmq_stream, ?K_AD_PORT, undefined)
of
undefined ->
port_from_listener();
@ -103,7 +104,7 @@ port_from_listener() ->
end.
tls_port() ->
case application:get_env(rabbitmq_stream, advertised_tls_port,
case application:get_env(rabbitmq_stream, ?K_AD_TLS_PORT,
undefined)
of
undefined ->

View File

@ -1473,32 +1473,17 @@ handle_frame_pre_auth(Transport,
VirtualHost,
{socket, S},
#{}),
AdvertisedHost =
case TransportLayer of
tcp ->
rabbit_stream:host();
ssl ->
rabbit_stream:tls_host()
end,
AdvertisedPort =
case TransportLayer of
tcp ->
rabbit_data_coercion:to_binary(
rabbit_stream:port());
ssl ->
rabbit_data_coercion:to_binary(
rabbit_stream:tls_port())
end,
ConnectionProperties =
#{<<"advertised_host">> => AdvertisedHost,
<<"advertised_port">> => AdvertisedPort},
AdHost = advertised_host(TransportLayer),
AdPort = rabbit_data_coercion:to_binary(advertised_port(TransportLayer)),
ConnProps = #{<<"advertised_host">> => AdHost,
<<"advertised_port">> => AdPort},
?LOG_DEBUG("sending open response ok ~ts", [VirtualHost]),
Frame =
rabbit_stream_core:frame({response, CorrelationId,
{open, ?RESPONSE_CODE_OK,
ConnectionProperties}}),
ConnProps}}),
send(Transport, S, Frame),
%% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2)
@ -2337,13 +2322,10 @@ handle_frame_post_auth(Transport,
Nodes0),
NodeEndpoints =
lists:foldr(fun(Node, Acc) ->
PortFunction =
case TransportLayer of
tcp -> port;
ssl -> tls_port
end,
Host = rpc:call(Node, rabbit_stream, host, []),
Port = rpc:call(Node, rabbit_stream, PortFunction, []),
HostFun = advertised_host_fun(TransportLayer),
PortFun = advertised_port_fun(TransportLayer),
Host = rpc:call(Node, rabbit_stream, HostFun, []),
Port = rpc:call(Node, rabbit_stream, PortFun, []),
case {is_binary(Host), is_integer(Port)} of
{true, true} -> Acc#{Node => {Host, Port}};
_ ->
@ -4077,3 +4059,21 @@ retry_sac_call(Call, N) ->
R ->
R
end.
advertised_host(Transport) ->
F = advertised_host_fun(Transport),
rabbit_stream:F().
advertised_port(Transport) ->
F = advertised_port_fun(Transport),
rabbit_stream:F().
advertised_host_fun(tcp) ->
host;
advertised_host_fun(ssl) ->
tls_host.
advertised_port_fun(tcp) ->
port;
advertised_port_fun(ssl) ->
tls_port.

View File

@ -14,3 +14,7 @@
%%
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
-define(K_AD_HOST, advertised_host).
-define(K_AD_PORT, advertised_port).
-define(K_AD_TLS_HOST, advertised_tls_host).
-define(K_AD_TLS_PORT, advertised_tls_port).

View File

@ -23,12 +23,14 @@
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include("rabbit_stream_metrics.hrl").
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-import(rabbit_stream_core, [frame/1]).
-import(rabbit_ct_broker_helpers, [rpc/5]).
-import(rabbit_ct_helpers, [await_condition/1]).
-define(WAIT, 5000).
@ -69,7 +71,9 @@ groups() ->
test_consumer_with_too_long_reference_errors,
subscribe_unsubscribe_should_create_events,
test_stream_test_utils,
sac_subscription_with_partition_index_conflict_should_return_error
sac_subscription_with_partition_index_conflict_should_return_error,
test_metadata_with_advertised_hints,
test_connection_properties_with_advertised_hints
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
@ -216,6 +220,17 @@ end_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
end_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(TestCase, Config)
when TestCase =:= test_metadata_with_advertised_hints orelse
TestCase =:= test_connection_properties_with_advertised_hints ->
lists:foreach(fun(K) ->
ok = rpc(Config, 0,
application,
set_env,
[rabbitmq_stream, K, undefined])
end, [?K_AD_HOST, ?K_AD_PORT,
?K_AD_TLS_HOST, ?K_AD_TLS_PORT]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
@ -409,71 +424,61 @@ test_super_stream_duplicate_partitions(Config) ->
ok.
test_metadata(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
FirstNode = get_node_name(Config, 0),
NodeInMaintenance = get_node_name(Config, 1),
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun() ->
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(Transport, S, C3),
{response, 1,
{metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 3
end),
rabbit_ct_broker_helpers:rpc(Config,
NodeInMaintenance,
rabbit_maintenance,
drain,
[]),
Transports = [gen_tcp, ssl],
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
lists:foreach(
fun(Transport) ->
TransportBin = atom_to_binary(Transport, utf8),
Stream = <<FunctionName/binary, <<"_">>/binary, TransportBin/binary>>,
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
FirstNode = get_node_name(Config, 0),
NodeInMaintenance = get_node_name(Config, 1),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun() ->
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(Transport, S, C3),
{response, 1,
{metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
IsBeingDrained =
fun() ->
rabbit_ct_broker_helpers:rpc(Config,
FirstNode,
rabbit_maintenance,
is_being_drained_consistent_read,
[NodeInMaintenance])
end,
rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() end),
await_condition(fun() -> length(GetStreamNodes()) == 3 end),
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 2
end),
rpc(Config, NodeInMaintenance, rabbit_maintenance, drain, []),
rabbit_ct_broker_helpers:rpc(Config,
NodeInMaintenance,
rabbit_maintenance,
revive,
[]),
IsBeingDrained =
fun() ->
rpc(Config, FirstNode,
rabbit_maintenance, is_being_drained_consistent_read,
[NodeInMaintenance])
end,
await_condition(fun() -> IsBeingDrained() end),
rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() =:= false
end),
await_condition(fun() -> length(GetStreamNodes()) == 2 end),
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 3
end),
rpc(Config, NodeInMaintenance, rabbit_maintenance, revive, []),
await_condition(fun() -> IsBeingDrained() =:= false end),
await_condition(fun() -> length(GetStreamNodes()) == 3 end),
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{CmdDelete, C4} = receive_commands(Transport, S, C3),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
CmdDelete),
_C5 = test_close(Transport, S, C4),
closed = wait_for_socket_close(Transport, S, 10)
end, Transports),
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{CmdDelete, C4} = receive_commands(Transport, S, C3),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
CmdDelete),
_C5 = test_close(Transport, S, C4),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_gc_consumers(Config) ->
@ -1100,6 +1105,94 @@ sac_subscription_with_partition_index_conflict_should_return_error(Config) ->
{ok, _} = stream_test_utils:close(S, C5),
ok.
test_metadata_with_advertised_hints(Config) ->
Transports = [gen_tcp, ssl],
lists:foreach(
fun(Transport) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
TransportBin = atom_to_binary(Transport, utf8),
Stream = <<FunctionName/binary, <<"_">>/binary, TransportBin/binary>>,
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun(Conn0) ->
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{Cmd, Conn1} = receive_commands(Transport, S, Conn0),
{response, 1,
{metadata, _Nodes,
#{Stream := {Node = {_H, _P}, _Replicas}}}} = Cmd,
{Conn1, Node}
end,
{C4, N1} = GetStreamNodes(C3),
?assertEqual({<<"localhost">>, Port}, N1),
AdHost = rand:bytes(20),
AdPort = rand:uniform(65535),
{KH, KP} = case Transport of
gen_tcp ->
{?K_AD_HOST, ?K_AD_PORT};
ssl ->
{?K_AD_TLS_HOST, ?K_AD_TLS_PORT}
end,
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]),
{C5, N2} = GetStreamNodes(C4),
?assertEqual({AdHost, AdPort}, N2),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined]),
_ = test_close(Transport, S, C5),
closed = wait_for_socket_close(Transport, S, 10)
end, Transports),
ok.
test_connection_properties_with_advertised_hints(Config) ->
TestFun =
fun(Transport, ExpectedHost, ExpectedPort) ->
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
{CP, C2} = test_authenticate_with_conn_props(Transport, S, C1),
ExpectedPortBin = integer_to_binary(ExpectedPort),
?assertMatch(#{<<"advertised_host">> := ExpectedHost,
<<"advertised_port">> := ExpectedPortBin},
CP),
_ = test_close(Transport, S, C2),
closed = wait_for_socket_close(Transport, S, 10)
end,
TestFun(gen_tcp, <<"localhost">>, get_port(gen_tcp, Config)),
TestFun(ssl, <<"localhost">>, get_port(ssl, Config)),
lists:foreach(
fun(Transport) ->
AdHost = rand:bytes(20),
AdPort = rand:uniform(65535),
{KH, KP} = case Transport of
gen_tcp ->
{?K_AD_HOST, ?K_AD_PORT};
ssl ->
{?K_AD_TLS_HOST, ?K_AD_TLS_PORT}
end,
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]),
TestFun(Transport, AdHost, AdPort),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined])
end, [gen_tcp, ssl]),
ok.
filtered_events(Config, EventType) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
@ -1318,6 +1411,11 @@ test_authenticate(Transport, S, C0) ->
tune(Transport, S,
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
test_authenticate_with_conn_props(Transport, S, C0) ->
tune_with_conn_props(
Transport, S,
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
test_authenticate(Transport, S, C0, Username) ->
test_authenticate(Transport, S, C0, Username, Username).
@ -1371,6 +1469,10 @@ tune(Transport, S, C2) ->
{{response, _, {open, ?RESPONSE_CODE_OK, _}}, C3} = do_tune(Transport, S, C2),
C3.
tune_with_conn_props(Transport, S, C2) ->
{{response, _, {open, ?RESPONSE_CODE_OK, CP}}, C3} = do_tune(Transport, S, C2),
{CP, C3}.
do_tune(Transport, S, C2) ->
{Tune, C3} = receive_commands(Transport, S, C2),
{tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,