diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index d68e7ff144..e1baceb657 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -38,6 +38,7 @@ -include("rabbit_stream_metrics.hrl"). -include_lib("kernel/include/logger.hrl"). +-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl"). start(_Type, _Args) -> rabbit_stream_metrics:init(), @@ -48,7 +49,7 @@ start(_Type, _Args) -> rabbit_stream_sup:start_link(). tls_host() -> - case application:get_env(rabbitmq_stream, advertised_tls_host, + case application:get_env(rabbitmq_stream, ?K_AD_TLS_HOST, undefined) of undefined -> @@ -58,7 +59,7 @@ tls_host() -> end. host() -> - case application:get_env(rabbitmq_stream, advertised_host, undefined) + case application:get_env(rabbitmq_stream, ?K_AD_HOST, undefined) of undefined -> hostname_from_node(); @@ -79,7 +80,7 @@ hostname_from_node() -> end. port() -> - case application:get_env(rabbitmq_stream, advertised_port, undefined) + case application:get_env(rabbitmq_stream, ?K_AD_PORT, undefined) of undefined -> port_from_listener(); @@ -103,7 +104,7 @@ port_from_listener() -> end. tls_port() -> - case application:get_env(rabbitmq_stream, advertised_tls_port, + case application:get_env(rabbitmq_stream, ?K_AD_TLS_PORT, undefined) of undefined -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 21969915c3..3217409b3b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1473,32 +1473,17 @@ handle_frame_pre_auth(Transport, VirtualHost, {socket, S}, #{}), - AdvertisedHost = - case TransportLayer of - tcp -> - rabbit_stream:host(); - ssl -> - rabbit_stream:tls_host() - end, - AdvertisedPort = - case TransportLayer of - tcp -> - rabbit_data_coercion:to_binary( - rabbit_stream:port()); - ssl -> - rabbit_data_coercion:to_binary( - rabbit_stream:tls_port()) - end, - ConnectionProperties = - #{<<"advertised_host">> => AdvertisedHost, - <<"advertised_port">> => AdvertisedPort}, + AdHost = advertised_host(TransportLayer), + AdPort = rabbit_data_coercion:to_binary(advertised_port(TransportLayer)), + ConnProps = #{<<"advertised_host">> => AdHost, + <<"advertised_port">> => AdPort}, ?LOG_DEBUG("sending open response ok ~ts", [VirtualHost]), Frame = rabbit_stream_core:frame({response, CorrelationId, {open, ?RESPONSE_CODE_OK, - ConnectionProperties}}), + ConnProps}}), send(Transport, S, Frame), %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2) @@ -2337,13 +2322,10 @@ handle_frame_post_auth(Transport, Nodes0), NodeEndpoints = lists:foldr(fun(Node, Acc) -> - PortFunction = - case TransportLayer of - tcp -> port; - ssl -> tls_port - end, - Host = rpc:call(Node, rabbit_stream, host, []), - Port = rpc:call(Node, rabbit_stream, PortFunction, []), + HostFun = advertised_host_fun(TransportLayer), + PortFun = advertised_port_fun(TransportLayer), + Host = rpc:call(Node, rabbit_stream, HostFun, []), + Port = rpc:call(Node, rabbit_stream, PortFun, []), case {is_binary(Host), is_integer(Port)} of {true, true} -> Acc#{Node => {Host, Port}}; _ -> @@ -4077,3 +4059,21 @@ retry_sac_call(Call, N) -> R -> R end. + +advertised_host(Transport) -> + F = advertised_host_fun(Transport), + rabbit_stream:F(). + +advertised_port(Transport) -> + F = advertised_port_fun(Transport), + rabbit_stream:F(). + +advertised_host_fun(tcp) -> + host; +advertised_host_fun(ssl) -> + tls_host. + +advertised_port_fun(tcp) -> + port; +advertised_port_fun(ssl) -> + tls_port. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl b/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl index a957d06c41..f41d5d30ee 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl @@ -14,3 +14,7 @@ %% -define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255). +-define(K_AD_HOST, advertised_host). +-define(K_AD_PORT, advertised_port). +-define(K_AD_TLS_HOST, advertised_tls_host). +-define(K_AD_TLS_PORT, advertised_tls_port). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index df3d62b1c3..651fd7ec89 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -23,12 +23,14 @@ -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). -include("rabbit_stream_metrics.hrl"). +-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl"). -compile(nowarn_export_all). -compile(export_all). -import(rabbit_stream_core, [frame/1]). -import(rabbit_ct_broker_helpers, [rpc/5]). +-import(rabbit_ct_helpers, [await_condition/1]). -define(WAIT, 5000). @@ -69,7 +71,9 @@ groups() -> test_consumer_with_too_long_reference_errors, subscribe_unsubscribe_should_create_events, test_stream_test_utils, - sac_subscription_with_partition_index_conflict_should_return_error + sac_subscription_with_partition_index_conflict_should_return_error, + test_metadata_with_advertised_hints, + test_connection_properties_with_advertised_hints ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -216,6 +220,17 @@ end_per_testcase(store_offset_requires_read_access = TestCase, Config) -> end_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) -> ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), rabbit_ct_helpers:testcase_finished(Config, TestCase); +end_per_testcase(TestCase, Config) + when TestCase =:= test_metadata_with_advertised_hints orelse + TestCase =:= test_connection_properties_with_advertised_hints -> + lists:foreach(fun(K) -> + ok = rpc(Config, 0, + application, + set_env, + [rabbitmq_stream, K, undefined]) + end, [?K_AD_HOST, ?K_AD_PORT, + ?K_AD_TLS_HOST, ?K_AD_TLS_PORT]), + rabbit_ct_helpers:testcase_finished(Config, TestCase); end_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_finished(Config, TestCase). @@ -409,71 +424,61 @@ test_super_stream_duplicate_partitions(Config) -> ok. test_metadata(Config) -> - Stream = atom_to_binary(?FUNCTION_NAME, utf8), - Transport = gen_tcp, - Port = get_stream_port(Config), - FirstNode = get_node_name(Config, 0), - NodeInMaintenance = get_node_name(Config, 1), - {ok, S} = - Transport:connect("localhost", Port, - [{active, false}, {mode, binary}]), - C0 = rabbit_stream_core:init(0), - C1 = test_peer_properties(Transport, S, C0), - C2 = test_authenticate(Transport, S, C1), - C3 = test_create_stream(Transport, S, Stream, C2), - GetStreamNodes = - fun() -> - MetadataFrame = request({metadata, [Stream]}), - ok = Transport:send(S, MetadataFrame), - {CmdMetadata, _} = receive_commands(Transport, S, C3), - {response, 1, - {metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} = - CmdMetadata, - [Leader | Replicas] - end, - rabbit_ct_helpers:await_condition(fun() -> - length(GetStreamNodes()) == 3 - end), - rabbit_ct_broker_helpers:rpc(Config, - NodeInMaintenance, - rabbit_maintenance, - drain, - []), + Transports = [gen_tcp, ssl], + FunctionName = atom_to_binary(?FUNCTION_NAME, utf8), + lists:foreach( + fun(Transport) -> + TransportBin = atom_to_binary(Transport, utf8), + Stream = <>/binary, TransportBin/binary>>, + Port = get_port(Transport, Config), + Opts = get_opts(Transport), + FirstNode = get_node_name(Config, 0), + NodeInMaintenance = get_node_name(Config, 1), + {ok, S} = Transport:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(Transport, S, C0), + C2 = test_authenticate(Transport, S, C1), + C3 = test_create_stream(Transport, S, Stream, C2), + GetStreamNodes = + fun() -> + MetadataFrame = request({metadata, [Stream]}), + ok = Transport:send(S, MetadataFrame), + {CmdMetadata, _} = receive_commands(Transport, S, C3), + {response, 1, + {metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} = + CmdMetadata, + [Leader | Replicas] + end, - IsBeingDrained = - fun() -> - rabbit_ct_broker_helpers:rpc(Config, - FirstNode, - rabbit_maintenance, - is_being_drained_consistent_read, - [NodeInMaintenance]) - end, - rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() end), + await_condition(fun() -> length(GetStreamNodes()) == 3 end), - rabbit_ct_helpers:await_condition(fun() -> - length(GetStreamNodes()) == 2 - end), + rpc(Config, NodeInMaintenance, rabbit_maintenance, drain, []), - rabbit_ct_broker_helpers:rpc(Config, - NodeInMaintenance, - rabbit_maintenance, - revive, - []), + IsBeingDrained = + fun() -> + rpc(Config, FirstNode, + rabbit_maintenance, is_being_drained_consistent_read, + [NodeInMaintenance]) + end, + await_condition(fun() -> IsBeingDrained() end), - rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() =:= false - end), + await_condition(fun() -> length(GetStreamNodes()) == 2 end), - rabbit_ct_helpers:await_condition(fun() -> - length(GetStreamNodes()) == 3 - end), + rpc(Config, NodeInMaintenance, rabbit_maintenance, revive, []), + + await_condition(fun() -> IsBeingDrained() =:= false end), + + await_condition(fun() -> length(GetStreamNodes()) == 3 end), + + DeleteStreamFrame = request({delete_stream, Stream}), + ok = Transport:send(S, DeleteStreamFrame), + {CmdDelete, C4} = receive_commands(Transport, S, C3), + ?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, + CmdDelete), + _C5 = test_close(Transport, S, C4), + closed = wait_for_socket_close(Transport, S, 10) + end, Transports), - DeleteStreamFrame = request({delete_stream, Stream}), - ok = Transport:send(S, DeleteStreamFrame), - {CmdDelete, C4} = receive_commands(Transport, S, C3), - ?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, - CmdDelete), - _C5 = test_close(Transport, S, C4), - closed = wait_for_socket_close(Transport, S, 10), ok. test_gc_consumers(Config) -> @@ -1100,6 +1105,94 @@ sac_subscription_with_partition_index_conflict_should_return_error(Config) -> {ok, _} = stream_test_utils:close(S, C5), ok. +test_metadata_with_advertised_hints(Config) -> + Transports = [gen_tcp, ssl], + lists:foreach( + fun(Transport) -> + FunctionName = atom_to_binary(?FUNCTION_NAME, utf8), + TransportBin = atom_to_binary(Transport, utf8), + Stream = <>/binary, TransportBin/binary>>, + Port = get_port(Transport, Config), + Opts = get_opts(Transport), + {ok, S} = Transport:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(Transport, S, C0), + C2 = test_authenticate(Transport, S, C1), + C3 = test_create_stream(Transport, S, Stream, C2), + GetStreamNodes = + fun(Conn0) -> + MetadataFrame = request({metadata, [Stream]}), + ok = Transport:send(S, MetadataFrame), + {Cmd, Conn1} = receive_commands(Transport, S, Conn0), + {response, 1, + {metadata, _Nodes, + #{Stream := {Node = {_H, _P}, _Replicas}}}} = Cmd, + {Conn1, Node} + end, + + {C4, N1} = GetStreamNodes(C3), + ?assertEqual({<<"localhost">>, Port}, N1), + AdHost = rand:bytes(20), + AdPort = rand:uniform(65535), + {KH, KP} = case Transport of + gen_tcp -> + {?K_AD_HOST, ?K_AD_PORT}; + ssl -> + {?K_AD_TLS_HOST, ?K_AD_TLS_PORT} + end, + + rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]), + rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]), + {C5, N2} = GetStreamNodes(C4), + ?assertEqual({AdHost, AdPort}, N2), + + rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]), + rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined]), + + _ = test_close(Transport, S, C5), + closed = wait_for_socket_close(Transport, S, 10) + end, Transports), + ok. + +test_connection_properties_with_advertised_hints(Config) -> + TestFun = + fun(Transport, ExpectedHost, ExpectedPort) -> + Port = get_port(Transport, Config), + Opts = get_opts(Transport), + {ok, S} = Transport:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(Transport, S, C0), + {CP, C2} = test_authenticate_with_conn_props(Transport, S, C1), + ExpectedPortBin = integer_to_binary(ExpectedPort), + ?assertMatch(#{<<"advertised_host">> := ExpectedHost, + <<"advertised_port">> := ExpectedPortBin}, + CP), + + _ = test_close(Transport, S, C2), + closed = wait_for_socket_close(Transport, S, 10) + end, + + TestFun(gen_tcp, <<"localhost">>, get_port(gen_tcp, Config)), + TestFun(ssl, <<"localhost">>, get_port(ssl, Config)), + + lists:foreach( + fun(Transport) -> + AdHost = rand:bytes(20), + AdPort = rand:uniform(65535), + {KH, KP} = case Transport of + gen_tcp -> + {?K_AD_HOST, ?K_AD_PORT}; + ssl -> + {?K_AD_TLS_HOST, ?K_AD_TLS_PORT} + end, + rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]), + rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]), + TestFun(Transport, AdHost, AdPort), + rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]), + rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined]) + end, [gen_tcp, ssl]), + + ok. filtered_events(Config, EventType) -> Events = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -1318,6 +1411,11 @@ test_authenticate(Transport, S, C0) -> tune(Transport, S, test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)). +test_authenticate_with_conn_props(Transport, S, C0) -> + tune_with_conn_props( + Transport, S, + test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)). + test_authenticate(Transport, S, C0, Username) -> test_authenticate(Transport, S, C0, Username, Username). @@ -1371,6 +1469,10 @@ tune(Transport, S, C2) -> {{response, _, {open, ?RESPONSE_CODE_OK, _}}, C3} = do_tune(Transport, S, C2), C3. +tune_with_conn_props(Transport, S, C2) -> + {{response, _, {open, ?RESPONSE_CODE_OK, CP}}, C3} = do_tune(Transport, S, C2), + {CP, C3}. + do_tune(Transport, S, C2) -> {Tune, C3} = receive_commands(Transport, S, C2), {tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,