diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl index ab36e6936b..aa67f3bb9c 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl @@ -29,12 +29,14 @@ -export([send/2]). -export([close/3]). +-record(state, {pid, type}). + %% Websocket. init(_, _Req, _Opts) -> {upgrade, protocol, cowboy_websocket}. -websocket_init(_TransportName, Req, _Opts) -> +websocket_init(_TransportName, Req, [{type, FrameType}]) -> {Peername, _} = cowboy_req:peer(Req), [Socket, Transport] = cowboy_req:get([socket, transport], Req), {ok, Sockname} = Transport:sockname(Socket), @@ -42,23 +44,25 @@ websocket_init(_TransportName, Req, _Opts) -> {peername, Peername}, {sockname, Sockname}]}, {ok, _Sup, Pid} = rabbit_ws_sup:start_client({Conn}), - {ok, Req, Pid}. + {ok, Req, #state{pid=Pid, type=FrameType}}. -websocket_handle({text, Data}, Req, State = Pid) -> +websocket_handle({text, Data}, Req, State=#state{pid=Pid}) -> rabbit_ws_client:sockjs_msg(Pid, Data), {ok, Req, State}; -websocket_handle({binary, Data}, Req, State = Pid) -> +websocket_handle({binary, Data}, Req, State=#state{pid=Pid}) -> rabbit_ws_client:sockjs_msg(Pid, Data), {ok, Req, State}; websocket_handle(_Frame, Req, State) -> {ok, Req, State}. -websocket_info({send, Frame}, Req, State) -> +websocket_info({send, Msg}, Req, State=#state{type=FrameType}) -> + {reply, {FrameType, Msg}, Req, State}; +websocket_info(Frame = {close, _, _}, Req, State) -> {reply, Frame, Req, State}; websocket_info(_Info, Req, State) -> {ok, Req, State}. -websocket_terminate(_Reason, _Req, _State = Pid) -> +websocket_terminate(_Reason, _Req, #state{pid=Pid}) -> rabbit_ws_client:sockjs_closed(Pid), ok. @@ -77,9 +81,9 @@ info({?MODULE, _, Info}) -> Info. send(Data, {?MODULE, Pid, _}) -> - Pid ! {send, {text, Data}}, + Pid ! {send, Data}, ok. close(Code, Reason, {?MODULE, Pid, _}) -> - Pid ! {send, {close, Code, Reason}}, + Pid ! {close, Code, Reason}, ok. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_sockjs.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_sockjs.erl index faf2256af6..aac4ed0cd0 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_sockjs.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_sockjs.erl @@ -36,6 +36,7 @@ init() -> {[{port, Port0}|TCPConf0], Port0} end, + WsFrame = get_env(ws_frame, text), CowboyOpts = get_env(cowboy_opts, []), SockjsOpts = get_env(sockjs_opts, []) ++ [{logger, fun logger/3}], @@ -44,7 +45,7 @@ init() -> <<"/stomp">>, fun service_stomp/3, {}, SockjsOpts), VhostRoutes = [ {"/stomp/[...]", sockjs_cowboy_handler, SockjsState}, - {"/ws", rabbit_ws_handler, undefined} + {"/ws", rabbit_ws_handler, [{type, WsFrame}]} ], Routes = cowboy_router:compile([{'_', VhostRoutes}]), % any vhost NbAcceptors = get_env(nb_acceptors, 100), diff --git a/deps/rabbitmq_web_stomp/src/rabbitmq_web_stomp.app.src b/deps/rabbitmq_web_stomp/src/rabbitmq_web_stomp.app.src index 14341a8f6d..e63325c849 100644 --- a/deps/rabbitmq_web_stomp/src/rabbitmq_web_stomp.app.src +++ b/deps/rabbitmq_web_stomp/src/rabbitmq_web_stomp.app.src @@ -10,6 +10,7 @@ {ssl_config, []}, {nb_acceptors, 100}, {cowboy_opts, []}, - {sockjs_opts, []}]}, + {sockjs_opts, []}, + {ws_frame, text}]}, {applications, [kernel, stdlib, rabbit, rabbitmq_stomp, cowboy, sockjs]} ]}. diff --git a/deps/rabbitmq_web_stomp/test/src/rabbit_ws_test_cowboy_websocket.erl b/deps/rabbitmq_web_stomp/test/src/rabbit_ws_test_cowboy_websocket.erl index 1f742e3a1d..d55fb9a117 100644 --- a/deps/rabbitmq_web_stomp/test/src/rabbit_ws_test_cowboy_websocket.erl +++ b/deps/rabbitmq_web_stomp/test/src/rabbit_ws_test_cowboy_websocket.erl @@ -58,6 +58,50 @@ pubsub_test() -> ok. +raw_send_binary(WS, Command, Headers) -> + raw_send_binary(WS, Command, Headers, <<>>). +raw_send_binary(WS, Command, Headers, Body) -> + Frame = stomp:marshal(Command, Headers, Body), + rfc6455_client:send_binary(WS, Frame). + +raw_recv_binary(WS) -> + {binary, P} = rfc6455_client:recv(WS), + stomp:unmarshal(P). + + +pubsub_binary_test() -> + %% Set frame type to binary and restart the web stomp application. + ok = application:set_env(rabbitmq_web_stomp, ws_frame, binary), + ok = application:stop(rabbitmq_web_stomp), + ok = cowboy:stop_listener(http), + ok = application:start(rabbitmq_web_stomp), + + WS = rfc6455_client:new("ws://127.0.0.1:15674/ws", self()), + {ok, _} = rfc6455_client:open(WS), + ok = raw_send(WS, "CONNECT", [{"login","guest"}, {"passcode", "guest"}]), + + {<<"CONNECTED">>, _, <<>>} = raw_recv_binary(WS), + + Dst = "/topic/test-" ++ stomp:list_to_hex(binary_to_list(crypto:rand_bytes(8))), + + ok = raw_send(WS, "SUBSCRIBE", [{"destination", Dst}, + {"id", "s0"}]), + + ok = raw_send(WS, "SEND", [{"destination", Dst}, + {"content-length", "3"}], <<"a\x00a">>), + + {<<"MESSAGE">>, H, <<"a\x00a">>} = raw_recv_binary(WS), + Dst = binary_to_list(proplists:get_value(<<"destination">>, H)), + + {close, _} = rfc6455_client:close(WS), + + %% Set frame type back to text and restart the web stomp application. + ok = application:set_env(rabbitmq_web_stomp, ws_frame, text), + ok = application:stop(rabbitmq_web_stomp), + ok = cowboy:stop_listener(http), + ok = application:start(rabbitmq_web_stomp). + + disconnect_test() -> WS = rfc6455_client:new("ws://127.0.0.1:15674/ws", self()), {ok, _} = rfc6455_client:open(WS), diff --git a/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl b/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl index 6394f10e25..cda62327c5 100644 --- a/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl +++ b/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl @@ -52,6 +52,8 @@ recv(WS) -> receive {rfc6455, recv, WS, Payload} -> {ok, Payload}; + {rfc6455, recv_binary, WS, Payload} -> + {binary, Payload}; {rfc6455, close, WS, R} -> {close, R} end. @@ -60,6 +62,10 @@ send(WS, IoData) -> WS ! {send, IoData}, ok. +send_binary(WS, IoData) -> + WS ! {send_binary, IoData}, + ok. + close(WS) -> close(WS, {1000, ""}). @@ -130,6 +136,9 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) -> {1, 1, Payload, Rest} -> PPid ! {rfc6455, recv, self(), Payload}, State#state{data = Rest}; + {1, 2, Payload, Rest} -> + PPid ! {rfc6455, recv_binary, self(), Payload}, + State#state{data = Rest}; {1, 8, Payload, _Rest} -> WsReason = case Payload of <> -> {WC, WR}; @@ -167,6 +176,10 @@ do_send(State = #state{socket = Socket}, Payload) -> gen_tcp:send(Socket, encode_frame(1, 1, Payload)), State. +do_send_binary(State = #state{socket = Socket}, Payload) -> + gen_tcp:send(Socket, encode_frame(1, 2, Payload)), + State. + do_close(State = #state{socket = Socket}, {Code, Reason}) -> Payload = iolist_to_binary([<>, Reason]), gen_tcp:send(Socket, encode_frame(1, 8, Payload)), @@ -181,6 +194,8 @@ loop(State = #state{socket = Socket, ppid = PPid, data = Data, loop(do_recv(State1)); {send, Payload} when Phase == open -> loop(do_send(State, Payload)); + {send_binary, Payload} when Phase == open -> + loop(do_send_binary(State, Payload)); {tcp_closed, Socket} -> die(Socket, PPid, {1006, "Connection closed abnormally"}, normal); {close, WsReason} when Phase == open ->