Add integration test MQTT 5.0 -> Stream

Add an integration test that sends via MQTT 5.0,
converts the MQTT message to an AMQP 1.0 message via mc_mqtt,
and consumes via the Stream protocol.
This commit is contained in:
David Ansari 2023-09-05 11:04:44 +02:00
parent 37833887ae
commit 62710f576e
2 changed files with 135 additions and 2 deletions

View File

@ -281,6 +281,7 @@ rabbitmq_integration_suite(
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_stomp:erlang_app",
"//deps/rabbitmq_stream_common:erlang_app",
"@emqtt//:erlang_app",
],
)

View File

@ -326,8 +326,119 @@ stomp(Config) ->
ok = emqtt:disconnect(C).
stream(_Config) ->
{skip, "TODO write test"}.
%% The stream test case is one-way because an MQTT client can publish to a stream,
%% but not consume (directly) from a stream.
stream(Config) ->
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
%% Bind a stream to the MQTT topic exchange.
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = Q,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
exchange = <<"amq.topic">>,
routing_key = <<"my.topic">>}),
%% MQTT 5.0 to Stream
C = connect(ClientId, Config),
ContentType = <<"text/plain">>,
Correlation = <<"some correlation ID">>,
Payload = <<"my payload">>,
UserProperty = [{<<"rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
%% We expect that this message annotation will be dropped
%% since AMQP 1.0 annoations must be symbols, i.e encoded as ASCII.
{<<"x-rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
{<<"key">>, <<"val">>},
%% We expect that this application property will be dropped
%% since AMQP 1.0 application properties are maps and maps disallow duplicate keys.
{<<"key">>, <<"val">>},
{<<"x-key">>, <<"val">>},
%% We expect that this message annotation will be dropped
%% since AMQP 1.0 annoations are maps and maps disallow duplicate keys.
{<<"x-key">>, <<"val">>}],
{ok, _} = emqtt:publish(C, <<"my/topic">>,
#{'Content-Type' => ContentType,
'Correlation-Data' => Correlation,
'Response-Topic' => <<"response/topic">>,
'User-Property' => UserProperty,
'Payload-Format-Indicator' => 1},
Payload, [{qos, 1}]),
ok = emqtt:disconnect(C),
%% There is no open source Erlang RabbitMQ Stream client.
%% Therefore, we have to build the commands for the Stream protocol handshake manually.
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream),
{ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
ok = gen_tcp:send(S, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0),
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})),
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1),
Username = <<"guest">>,
Password = <<"guest">>,
Null = 0,
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2),
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3),
ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4),
SubscriptionId = 99,
SubCmd = {request, 1, {subscribe, SubscriptionId, Q, 0, 10, #{}}},
SubscribeFrame = rabbit_stream_core:frame(SubCmd),
ok = gen_tcp:send(S, SubscribeFrame),
{{response, 1, {subscribe, _}}, C6} = receive_stream_commands(S, C5),
{{deliver, SubscriptionId, Chunk}, _C7} = receive_stream_commands(S, C6),
<<5:4/unsigned,
0:4/unsigned,
0:8,
1:16,
1:32,
_Timestamp:64,
_Epoch:64,
_COffset:64,
_Crc:32,
_DataLength:32,
_TrailerLength:32,
_ReservedBytes:32,
0:1,
BodySize:31/unsigned,
Sections0:BodySize/binary>> = Chunk,
Sections = amqp10_framing:decode_bin(Sections0),
ct:pal("Stream client received AMQP 1.0 sections:~n~p", [Sections]),
U = undefined,
FakeTransfer = {'v1_0.transfer', U, U, U, U, U, U, U, U, U, U, U},
Msg = amqp10_msg:from_amqp_records([FakeTransfer | Sections]),
?assert(amqp10_msg:header(durable, Msg)),
?assertEqual(#{<<"x-exchange">> => <<"amq.topic">>,
<<"x-routing-key">> => <<"my.topic">>,
<<"x-key">> => <<"val">>},
amqp10_msg:message_annotations(Msg)),
?assertEqual(#{correlation_id => Correlation,
content_type => ContentType,
%% We expect that reply_to contains a valid address,
%% and that the topic format got translated from MQTT to AMQP 0.9.1.
reply_to => <<"/topic/response.topic">>},
amqp10_msg:properties(Msg)),
?assertEqual(#{<<"rabbit🐇"/utf8>> => <<"carrot🥕"/utf8>>,
<<"key">> => <<"val">>},
amqp10_msg:application_properties(Msg)),
%% We excpet the body to be a single AMQP 1.0 value section where the value is a string
%% because we set the MQTT 5.0 Payload-Format-Indicator.
?assertEqual({'v1_0.amqp_value', {utf8, Payload}},
amqp10_msg:body(Msg)).
%% -------------------------------------------------------------------
%% Helpers
@ -336,6 +447,27 @@ stream(_Config) ->
delete_queues() ->
[{ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()].
receive_stream_commands(Sock, C0) ->
case rabbit_stream_core:next_command(C0) of
empty ->
case gen_tcp:recv(Sock, 0, 5000) of
{ok, Data} ->
C1 = rabbit_stream_core:incoming_data(Data, C0),
case rabbit_stream_core:next_command(C1) of
empty ->
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
rabbit_stream_core:next_command(
rabbit_stream_core:incoming_data(Data2, C1));
Res ->
Res
end;
{error, Err} ->
ct:fail("error receiving stream data ~w", [Err])
end;
Res ->
Res
end.
%% -------------------------------------------------------------------
%% STOMP client BEGIN
%% -------------------------------------------------------------------