Add stream TLS test

This commit is contained in:
dcorbacho 2021-05-13 17:01:00 +02:00 committed by Karl Nilsson
parent b2a7884a45
commit 8f54150867
1 changed files with 76 additions and 58 deletions

View File

@ -31,7 +31,7 @@ all() ->
groups() ->
[{single_node, [],
[test_stream, test_gc_consumers, test_gc_publishers]},
[test_stream, test_stream_tls, test_gc_consumers, test_gc_publishers]},
{cluster, [], [test_stream, java]}].
init_per_suite(Config) ->
@ -44,7 +44,8 @@ end_per_suite(Config) ->
init_per_group(single_node, Config) ->
Config1 =
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
rabbit_ct_helpers:run_setup_steps(Config1,
Config2 = rabbit_ct_helpers:set_config(Config1, {rabbitmq_ct_tls_verify, verify_none}),
rabbit_ct_helpers:run_setup_steps(Config2,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
@ -85,7 +86,13 @@ end_per_testcase(_Test, _Config) ->
test_stream(Config) ->
Port = get_stream_port(Config),
test_server(Port),
test_server(gen_tcp, Port),
ok.
test_stream_tls(Config) ->
Port = get_stream_port_tls(Config),
application:ensure_all_started(ssl),
test_server(ssl, Port),
ok.
test_gc_consumers(Config) ->
@ -135,6 +142,8 @@ ets_count(Config, Table) ->
java(Config) ->
StreamPortNode1 = get_stream_port(Config, 0),
StreamPortNode2 = get_stream_port(Config, 1),
StreamPortTlsNode1 = get_stream_port_tls(Config, 0),
StreamPortTlsNode2 = get_stream_port_tls(Config, 1),
Node1Name = get_node_name(Config, 0),
Node2Name = get_node_name(Config, 1),
RabbitMqCtl = get_rabbitmqctl(Config),
@ -143,9 +152,11 @@ java(Config) ->
rabbit_ct_helpers:make(Config, DataDir,
["tests",
{"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
{"NODE1_STREAM_PORT_TLS=~b", [StreamPortTlsNode1]},
{"NODE1_NAME=~p", [Node1Name]},
{"NODE2_NAME=~p", [Node2Name]},
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
{"NODE2_STREAM_PORT_TLS=~b", [StreamPortTlsNode2]},
{"RABBITMQCTL=~p", [RabbitMqCtl]}]),
{ok, _} = MakeResult.
@ -159,49 +170,56 @@ get_stream_port(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node,
tcp_port_stream).
get_stream_port_tls(Config) ->
get_stream_port_tls(Config, 0).
get_stream_port_tls(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node,
tcp_port_stream_tls).
get_node_name(Config) ->
get_node_name(Config, 0).
get_node_name(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
test_server(Port) ->
test_server(Transport, Port) ->
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
Transport:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(S, C0),
C2 = test_authenticate(S, C1),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Stream = <<"stream1">>,
C3 = test_create_stream(S, Stream, C2),
C3 = test_create_stream(Transport, S, Stream, C2),
PublisherId = 42,
C4 = test_declare_publisher(S, PublisherId, Stream, C3),
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
Body = <<"hello">>,
C5 = test_publish_confirm(S, PublisherId, Body, C4),
C5 = test_publish_confirm(Transport, S, PublisherId, Body, C4),
SubscriptionId = 42,
C6 = test_subscribe(S, SubscriptionId, Stream, C5),
C7 = test_deliver(S, SubscriptionId, Body, C6),
C8 = test_delete_stream(S, Stream, C7),
C6 = test_subscribe(Transport, S, SubscriptionId, Stream, C5),
C7 = test_deliver(Transport, S, SubscriptionId, Body, C6),
C8 = test_delete_stream(Transport, S, Stream, C7),
% test_metadata_update_stream_deleted(S, Stream),
_C9 = test_close(S, C8),
closed = wait_for_socket_close(S, 10),
_C9 = test_close(Transport, S, C8),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_peer_properties(S, C0) ->
test_peer_properties(Transport, S, C0) ->
PeerPropertiesFrame =
rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
ok = gen_tcp:send(S, PeerPropertiesFrame),
{C, [Cmd]} = receive_commands(S, C0),
ok = Transport:send(S, PeerPropertiesFrame),
{C, [Cmd]} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {peer_properties, ?RESPONSE_CODE_OK, _}},
Cmd),
C.
test_authenticate(S, C0) ->
test_authenticate(Transport, S, C0) ->
SaslHandshakeFrame =
rabbit_stream_core:frame({request, 1, sasl_handshake}),
ok = gen_tcp:send(S, SaslHandshakeFrame),
ok = Transport:send(S, SaslHandshakeFrame),
Plain = <<"PLAIN">>,
AmqPlain = <<"AMQPLAIN">>,
{C1, [Cmd]} = receive_commands(S, C0),
{C1, [Cmd]} = receive_commands(Transport, S, C0),
case Cmd of
{response, _, {sasl_handshake, ?RESPONSE_CODE_OK, Mechanisms}} ->
?assertEqual([AmqPlain, Plain], lists:sort(Mechanisms));
@ -217,13 +235,13 @@ test_authenticate(S, C0) ->
SaslAuthenticateFrame =
rabbit_stream_core:frame({request, 2,
{sasl_authenticate, Plain, PlainSasl}}),
ok = gen_tcp:send(S, SaslAuthenticateFrame),
{C2, [SaslAuth | Cmds2]} = receive_commands(S, C1),
ok = Transport:send(S, SaslAuthenticateFrame),
{C2, [SaslAuth | Cmds2]} = receive_commands(Transport, S, C1),
{response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}} = SaslAuth,
{C3, Tune} =
case Cmds2 of
[] ->
{C2b, [X]} = receive_commands(S, C2),
{C2b, [X]} = receive_commands(Transport, S, C2),
{C2b, X};
[T] ->
{C2, T}
@ -234,30 +252,30 @@ test_authenticate(S, C0) ->
TuneFrame =
rabbit_stream_core:frame({response, 0,
{tune, ?DEFAULT_FRAME_MAX, 0}}),
ok = gen_tcp:send(S, TuneFrame),
ok = Transport:send(S, TuneFrame),
VirtualHost = <<"/">>,
OpenFrame =
rabbit_stream_core:frame({request, 3, {open, VirtualHost}}),
ok = gen_tcp:send(S, OpenFrame),
ok = Transport:send(S, OpenFrame),
{C4,
[{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}}]} =
receive_commands(S, C3),
receive_commands(Transport, S, C3),
C4.
test_create_stream(S, Stream, C0) ->
test_create_stream(Transport, S, Stream, C0) ->
CreateStreamFrame =
rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
ok = gen_tcp:send(S, CreateStreamFrame),
{C, [Cmd]} = receive_commands(S, C0),
ok = Transport:send(S, CreateStreamFrame),
{C, [Cmd]} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd),
C.
test_delete_stream(S, Stream, C0) ->
test_delete_stream(Transport, S, Stream, C0) ->
DeleteStreamFrame =
rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
ok = gen_tcp:send(S, DeleteStreamFrame),
{C1, [Cmd | MaybeMetaData]} = receive_commands(S, C0),
ok = Transport:send(S, DeleteStreamFrame),
{C1, [Cmd | MaybeMetaData]} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd),
case MaybeMetaData of
[] ->
@ -268,44 +286,44 @@ test_delete_stream(S, Stream, C0) ->
C1
end.
test_metadata_update_stream_deleted(S, Stream, C0) ->
{C1, [Meta]} = receive_commands(S, C0),
test_metadata_update_stream_deleted(Transport, S, Stream, C0) ->
{C1, [Meta]} = receive_commands(Transport, S, C0),
{metadata_update, Stream, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE} = Meta,
C1.
test_declare_publisher(S, PublisherId, Stream, C0) ->
test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
DeclarePublisherFrame =
rabbit_stream_core:frame({request, 1,
{declare_publisher,
PublisherId,
<<>>,
Stream}}),
ok = gen_tcp:send(S, DeclarePublisherFrame),
{C, [Cmd]} = receive_commands(S, C0),
ok = Transport:send(S, DeclarePublisherFrame),
{C, [Cmd]} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {declare_publisher, ?RESPONSE_CODE_OK}},
Cmd),
C.
test_publish_confirm(S, PublisherId, Body, C0) ->
test_publish_confirm(Transport, S, PublisherId, Body, C0) ->
BodySize = byte_size(Body),
Messages = [<<1:64, 0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame =
rabbit_stream_core:frame({publish, PublisherId, 1, Messages}),
ok = gen_tcp:send(S, PublishFrame),
{C, [Cmd]} = receive_commands(S, C0),
ok = Transport:send(S, PublishFrame),
{C, [Cmd]} = receive_commands(Transport, S, C0),
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
C.
test_subscribe(S, SubscriptionId, Stream, C0) ->
test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
SubCmd = {request, 1, {subscribe, SubscriptionId, Stream, 0, 10}},
SubscribeFrame = rabbit_stream_core:frame(SubCmd),
ok = gen_tcp:send(S, SubscribeFrame),
{C, [Cmd]} = receive_commands(S, C0),
ok = Transport:send(S, SubscribeFrame),
{C, [Cmd]} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}}, Cmd),
C.
test_deliver(S, SubscriptionId, Body, C0) ->
{C, [{deliver, SubscriptionId, Chunk}]} = receive_commands(S, C0),
test_deliver(Transport, S, SubscriptionId, Body, C0) ->
{C, [{deliver, SubscriptionId, Chunk}]} = receive_commands(Transport, S, C0),
<<5:4/unsigned,
0:4/unsigned,
0:8,
@ -324,7 +342,7 @@ test_deliver(S, SubscriptionId, Body, C0) ->
Chunk,
C.
test_metadata_update_stream_deleted(S, Stream) ->
test_metadata_update_stream_deleted(Transport, S, Stream) ->
StreamSize = byte_size(Stream),
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
{ok,
@ -335,35 +353,35 @@ test_metadata_update_stream_deleted(S, Stream) ->
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
Stream/binary>>} =
gen_tcp:recv(S, 0, 5000).
Transport:recv(S, 0, 5000).
test_close(S, C0) ->
test_close(Transport, S, C0) ->
CloseReason = <<"OK">>,
CloseFrame =
rabbit_stream_core:frame({request, 1,
{close, ?RESPONSE_CODE_OK, CloseReason}}),
ok = gen_tcp:send(S, CloseFrame),
ok = Transport:send(S, CloseFrame),
{C, [{response, 1, {close, ?RESPONSE_CODE_OK}}]} =
receive_commands(S, C0),
receive_commands(Transport, S, C0),
C.
wait_for_socket_close(_S, 0) ->
wait_for_socket_close(_Transport, _S, 0) ->
not_closed;
wait_for_socket_close(S, Attempt) ->
case gen_tcp:recv(S, 0, 1000) of
wait_for_socket_close(Transport, S, Attempt) ->
case Transport:recv(S, 0, 1000) of
{error, timeout} ->
wait_for_socket_close(S, Attempt - 1);
wait_for_socket_close(Transport, S, Attempt - 1);
{error, closed} ->
closed
end.
receive_commands(S, C0) ->
case gen_tcp:recv(S, 0, 5000) of
receive_commands(Transport, S, C0) ->
case Transport:recv(S, 0, 5000) of
{ok, Data} ->
case rabbit_stream_core:incoming_data(Data, C0) of
{C1, []} ->
%% no command received, try once more
{ok, Data2} = gen_tcp:recv(S, 0, 5000),
{ok, Data2} = Transport:recv(S, 0, 5000),
rabbit_stream_core:incoming_data(Data2, C1);
{_C, _Cmds} = Ret ->
Ret