parent
dc81d674f7
commit
4cbeaf0e73
|
|
@ -1,22 +1,22 @@
|
||||||
%% The contents of this file are subject to the Mozilla Public License
|
%% The contents of this file are subject to the Mozilla Public License
|
||||||
%% Version 1.1 (the "License"); you may not use this file except in
|
%% Version 1.1 (the "License"); you may not use this file except in
|
||||||
%% compliance with the License. You may obtain a copy of the License at
|
%% compliance with the License. You may obtain a copy of the License at
|
||||||
%% http://www.mozilla.org/MPL/
|
%% http://www.mozilla.org/MPL/
|
||||||
%%
|
%%
|
||||||
%% Software distributed under the License is distributed on an "AS IS"
|
%% Software distributed under the License is distributed on an "AS IS"
|
||||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
||||||
%% License for the specific language governing rights and limitations
|
%% License for the specific language governing rights and limitations
|
||||||
%% under the License.
|
%% under the License.
|
||||||
%%
|
%%
|
||||||
%% The Original Code is RabbitMQ Management Console.
|
%% The Original Code is RabbitMQ.
|
||||||
%%
|
%%
|
||||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||||
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
|
%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-module(rfc6455_client).
|
-module(rfc6455_client).
|
||||||
|
|
||||||
-export([new/2, new/3, new/4, new/5, open/1, recv/1, send/2, send_binary/2, close/1, close/2]).
|
-export([new/2, new/3, new/4, new/5, open/1, recv/1, recv/2, send/2, send_binary/2, close/1, close/2]).
|
||||||
|
|
||||||
-record(state, {host, port, addr, path, ppid, socket, data, phase, transport}).
|
-record(state, {host, port, addr, path, ppid, socket, data, phase, transport}).
|
||||||
|
|
||||||
|
|
@ -76,6 +76,18 @@ recv(WS) ->
|
||||||
{close, R}
|
{close, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
recv(WS, Timeout) ->
|
||||||
|
receive
|
||||||
|
{rfc6455, recv, WS, Payload} ->
|
||||||
|
{ok, Payload};
|
||||||
|
{rfc6455, recv_binary, WS, Payload} ->
|
||||||
|
{binary, Payload};
|
||||||
|
{rfc6455, close, WS, R} ->
|
||||||
|
{close, R}
|
||||||
|
after Timeout ->
|
||||||
|
{error, timeout}
|
||||||
|
end.
|
||||||
|
|
||||||
send(WS, IoData) ->
|
send(WS, IoData) ->
|
||||||
WS ! {send, IoData},
|
WS ! {send, IoData},
|
||||||
ok.
|
ok.
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,11 @@ groups() ->
|
||||||
[
|
[
|
||||||
{non_parallel_tests, [],
|
{non_parallel_tests, [],
|
||||||
[connection
|
[connection
|
||||||
,pubsub
|
, pubsub_shared_connection
|
||||||
,disconnect
|
, pubsub_separate_connections
|
||||||
|
, last_will_enabled
|
||||||
|
, last_will_disabled
|
||||||
|
, disconnect
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
@ -61,6 +64,8 @@ init_per_testcase(Testcase, Config) ->
|
||||||
end_per_testcase(Testcase, Config) ->
|
end_per_testcase(Testcase, Config) ->
|
||||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||||
|
|
||||||
|
-define(DEFAULT_TIMEOUT, 15000).
|
||||||
|
|
||||||
|
|
||||||
connection(Config) ->
|
connection(Config) ->
|
||||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
|
|
@ -70,13 +75,14 @@ connection(Config) ->
|
||||||
{close, _} = rfc6455_client:close(WS),
|
{close, _} = rfc6455_client:close(WS),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
pubsub(Config) ->
|
pubsub_shared_connection(Config) ->
|
||||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
PortStr = integer_to_list(Port),
|
PortStr = integer_to_list(Port),
|
||||||
WS = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
WS = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
{ok, _} = rfc6455_client:open(WS),
|
{ok, _} = rfc6455_client:open(WS),
|
||||||
ok = raw_send(WS,
|
ok = raw_send(WS,
|
||||||
?CONNECT_PACKET(#mqtt_packet_connect{
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
client_id = <<"web-mqtt-tests-pubsub">>,
|
client_id = <<"web-mqtt-tests-pubsub">>,
|
||||||
username = <<"guest">>,
|
username = <<"guest">>,
|
||||||
password = <<"guest">>})),
|
password = <<"guest">>})),
|
||||||
|
|
@ -96,6 +102,120 @@ pubsub(Config) ->
|
||||||
{close, _} = rfc6455_client:close(WS),
|
{close, _} = rfc6455_client:close(WS),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
pubsub_separate_connections(Config) ->
|
||||||
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
|
PortStr = integer_to_list(Port),
|
||||||
|
WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS1),
|
||||||
|
ok = raw_send(WS1,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-publisher">>,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1),
|
||||||
|
|
||||||
|
WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS2),
|
||||||
|
ok = raw_send(WS2,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-consumer">>,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
Dst = <<"/topic/test-web-mqtt">>,
|
||||||
|
ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{Dst, ?QOS_1}])),
|
||||||
|
{ok, ?SUBACK_PACKET(_, _), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
Payload = <<"a\x00a">>,
|
||||||
|
ok = raw_send(WS1, ?PUBLISH_PACKET(?QOS_1, Dst, 2, Payload)),
|
||||||
|
{ok, ?PUBLISH_PACKET(_, Dst, _, Payload), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS1),
|
||||||
|
{close, _} = rfc6455_client:close(WS2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
last_will_enabled(Config) ->
|
||||||
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
|
PortStr = integer_to_list(Port),
|
||||||
|
|
||||||
|
LastWillDst = <<"/topic/web-mqtt-tests-ws1-last-will">>,
|
||||||
|
LastWillMsg = <<"a last will and testament message">>,
|
||||||
|
|
||||||
|
WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS1),
|
||||||
|
ok = raw_send(WS1,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-last-will-ws1">>,
|
||||||
|
will_flag = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_topic = LastWillDst,
|
||||||
|
will_msg = LastWillMsg,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1),
|
||||||
|
|
||||||
|
WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS2),
|
||||||
|
ok = raw_send(WS2,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-last-will-ws2">>,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{LastWillDst, ?QOS_1}])),
|
||||||
|
{ok, ?SUBACK_PACKET(_, _), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS1),
|
||||||
|
?assertMatch({ok, ?PUBLISH_PACKET(_, LastWillDst, _, LastWillMsg), _}, raw_recv(WS2, 5000)),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
last_will_disabled(Config) ->
|
||||||
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
|
PortStr = integer_to_list(Port),
|
||||||
|
|
||||||
|
LastWillDst = <<"/topic/web-mqtt-tests-ws1-last-will-disabled">>,
|
||||||
|
LastWillMsg = <<"a last will and testament message">>,
|
||||||
|
|
||||||
|
WS1 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS1),
|
||||||
|
ok = raw_send(WS1,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-last-will-ws1-disabled">>,
|
||||||
|
will_flag = false,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_topic = LastWillDst,
|
||||||
|
will_msg = LastWillMsg,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS1),
|
||||||
|
|
||||||
|
WS2 = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS2),
|
||||||
|
ok = raw_send(WS2,
|
||||||
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
clean_sess = true,
|
||||||
|
client_id = <<"web-mqtt-tests-last-will-ws2-disabled">>,
|
||||||
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS2),
|
||||||
|
|
||||||
|
ok = raw_send(WS2, ?SUBSCRIBE_PACKET(1, [{LastWillDst, ?QOS_1}])),
|
||||||
|
?assertMatch({ok, ?SUBACK_PACKET(_, _), _}, raw_recv(WS2)),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS1),
|
||||||
|
?assertEqual({error, timeout}, raw_recv(WS2, 5000)),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS2),
|
||||||
|
ok.
|
||||||
|
|
||||||
disconnect(Config) ->
|
disconnect(Config) ->
|
||||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||||
|
|
@ -104,9 +224,10 @@ disconnect(Config) ->
|
||||||
{ok, _} = rfc6455_client:open(WS),
|
{ok, _} = rfc6455_client:open(WS),
|
||||||
ok = raw_send(WS,
|
ok = raw_send(WS,
|
||||||
?CONNECT_PACKET(#mqtt_packet_connect{
|
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
client_id = <<"web-mqtt-tests-disconnect">>,
|
clean_sess = true,
|
||||||
username = <<"guest">>,
|
client_id = <<"web-mqtt-tests-disconnect">>,
|
||||||
password = <<"guest">>})),
|
username = <<"guest">>,
|
||||||
|
password = <<"guest">>})),
|
||||||
|
|
||||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS),
|
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS),
|
||||||
|
|
||||||
|
|
@ -121,5 +242,13 @@ raw_send(WS, Packet) ->
|
||||||
rfc6455_client:send_binary(WS, Frame).
|
rfc6455_client:send_binary(WS, Frame).
|
||||||
|
|
||||||
raw_recv(WS) ->
|
raw_recv(WS) ->
|
||||||
{binary, P} = rfc6455_client:recv(WS),
|
raw_recv(WS, ?DEFAULT_TIMEOUT).
|
||||||
emqttc_parser:parse(P, emqttc_parser:new()).
|
|
||||||
|
raw_recv(WS, Timeout) ->
|
||||||
|
case rfc6455_client:recv(WS, Timeout) of
|
||||||
|
{binary, P} ->
|
||||||
|
ct:pal("raw_recv parsed: ~p", [emqttc_parser:parse(P, emqttc_parser:new())]),
|
||||||
|
emqttc_parser:parse(P, emqttc_parser:new());
|
||||||
|
{error, timeout} ->
|
||||||
|
{error, timeout}
|
||||||
|
end.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue