diff --git a/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl b/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl index 8ec73f5594..6845c81627 100644 --- a/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl +++ b/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl @@ -1,22 +1,22 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. %% -%% The Original Code is RabbitMQ Management Console. +%% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved. %% -module(rfc6455_client). --export([new/2, new/3, new/4, new/5, open/1, recv/1, send/2, send_binary/2, close/1, close/2]). +-export([new/2, new/3, new/4, new/5, open/1, recv/1, recv/2, send/2, send_binary/2, close/1, close/2]). -record(state, {host, port, addr, path, ppid, socket, data, phase, transport}). @@ -76,6 +76,18 @@ recv(WS) -> {close, R} end. +recv(WS, Timeout) -> + receive + {rfc6455, recv, WS, Payload} -> + {ok, Payload}; + {rfc6455, recv_binary, WS, Payload} -> + {binary, Payload}; + {rfc6455, close, WS, R} -> + {close, R} + after Timeout -> + {error, timeout} + end. + send(WS, IoData) -> WS ! {send, IoData}, ok. diff --git a/deps/rabbitmq_web_mqtt/test/src/system_SUITE.erl b/deps/rabbitmq_web_mqtt/test/src/system_SUITE.erl index 1fb39c5be6..1d0b464c29 100644 --- a/deps/rabbitmq_web_mqtt/test/src/system_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/src/system_SUITE.erl @@ -30,8 +30,11 @@ groups() -> [ {non_parallel_tests, [], [connection - ,pubsub - ,disconnect + , pubsub_shared_connection + , pubsub_separate_connections + , last_will_enabled + , last_will_disabled + , disconnect ]} ]. @@ -61,6 +64,8 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +-define(DEFAULT_TIMEOUT, 15000). + connection(Config) -> Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), @@ -70,13 +75,14 @@ connection(Config) -> {close, _} = rfc6455_client:close(WS), ok. -pubsub(Config) -> +pubsub_shared_connection(Config) -> Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), PortStr = integer_to_list(Port), WS = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), {ok, _} = rfc6455_client:open(WS), ok = raw_send(WS, ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, client_id = <<"web-mqtt-tests-pubsub">>, username = <<"guest">>, password = <<"guest">>})), @@ -96,6 +102,120 @@ pubsub(Config) -> {close, _} = rfc6455_client:close(WS), ok. +pubsub_separate_connections(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), + PortStr = integer_to_list(Port), + WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS1), + ok = raw_send(WS1, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-publisher">>, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1), + + WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS2), + ok = raw_send(WS2, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-consumer">>, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2), + + Dst = <<"/topic/test-web-mqtt">>, + ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{Dst, ?QOS_1}])), + {ok, ?SUBACK_PACKET(_, _), _} = raw_recv(WS2), + + Payload = <<"a\x00a">>, + ok = raw_send(WS1, ?PUBLISH_PACKET(?QOS_1, Dst, 2, Payload)), + {ok, ?PUBLISH_PACKET(_, Dst, _, Payload), _} = raw_recv(WS2), + + {close, _} = rfc6455_client:close(WS1), + {close, _} = rfc6455_client:close(WS2), + ok. + +last_will_enabled(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), + PortStr = integer_to_list(Port), + + LastWillDst = <<"/topic/web-mqtt-tests-ws1-last-will">>, + LastWillMsg = <<"a last will and testament message">>, + + WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS1), + ok = raw_send(WS1, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-last-will-ws1">>, + will_flag = true, + will_qos = ?QOS_1, + will_topic = LastWillDst, + will_msg = LastWillMsg, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1), + + WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS2), + ok = raw_send(WS2, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-last-will-ws2">>, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2), + + ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{LastWillDst, ?QOS_1}])), + {ok, ?SUBACK_PACKET(_, _), _} = raw_recv(WS2), + + {close, _} = rfc6455_client:close(WS1), + ?assertMatch({ok, ?PUBLISH_PACKET(_, LastWillDst, _, LastWillMsg), _}, raw_recv(WS2, 5000)), + + {close, _} = rfc6455_client:close(WS2), + ok. + +last_will_disabled(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), + PortStr = integer_to_list(Port), + + LastWillDst = <<"/topic/web-mqtt-tests-ws1-last-will-disabled">>, + LastWillMsg = <<"a last will and testament message">>, + + WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS1), + ok = raw_send(WS1, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-last-will-ws1-disabled">>, + will_flag = false, + will_qos = ?QOS_1, + will_topic = LastWillDst, + will_msg = LastWillMsg, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1), + + WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()), + {ok, _} = rfc6455_client:open(WS2), + ok = raw_send(WS2, + ?CONNECT_PACKET(#mqtt_packet_connect{ + clean_sess = true, + client_id = <<"web-mqtt-tests-last-will-ws2-disabled">>, + username = <<"guest">>, + password = <<"guest">>})), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2), + + ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{LastWillDst, ?QOS_1}])), + ?assertMatch({ok, ?SUBACK_PACKET(_, _), _}, raw_recv(WS2)), + + {close, _} = rfc6455_client:close(WS1), + ?assertEqual({error, timeout}, raw_recv(WS2, 5000)), + + {close, _} = rfc6455_client:close(WS2), + ok. disconnect(Config) -> Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt), @@ -104,9 +224,10 @@ disconnect(Config) -> {ok, _} = rfc6455_client:open(WS), ok = raw_send(WS, ?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"web-mqtt-tests-disconnect">>, - username = <<"guest">>, - password = <<"guest">>})), + clean_sess = true, + client_id = <<"web-mqtt-tests-disconnect">>, + username = <<"guest">>, + password = <<"guest">>})), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS), @@ -121,5 +242,13 @@ raw_send(WS, Packet) -> rfc6455_client:send_binary(WS, Frame). raw_recv(WS) -> - {binary, P} = rfc6455_client:recv(WS), - emqttc_parser:parse(P, emqttc_parser:new()). + raw_recv(WS, ?DEFAULT_TIMEOUT). + +raw_recv(WS, Timeout) -> + case rfc6455_client:recv(WS, Timeout) of + {binary, P} -> + ct:pal("raw_recv parsed: ~p", [emqttc_parser:parse(P, emqttc_parser:new())]), + emqttc_parser:parse(P, emqttc_parser:new()); + {error, timeout} -> + {error, timeout} + end.