Add new option 'ws_frame' to allow using binary frames
This commit is contained in:
parent
67defca8de
commit
eb28fc068a
|
|
@ -29,12 +29,14 @@
|
||||||
-export([send/2]).
|
-export([send/2]).
|
||||||
-export([close/3]).
|
-export([close/3]).
|
||||||
|
|
||||||
|
-record(state, {pid, type}).
|
||||||
|
|
||||||
%% Websocket.
|
%% Websocket.
|
||||||
|
|
||||||
init(_, _Req, _Opts) ->
|
init(_, _Req, _Opts) ->
|
||||||
{upgrade, protocol, cowboy_websocket}.
|
{upgrade, protocol, cowboy_websocket}.
|
||||||
|
|
||||||
websocket_init(_TransportName, Req, _Opts) ->
|
websocket_init(_TransportName, Req, [{type, FrameType}]) ->
|
||||||
{Peername, _} = cowboy_req:peer(Req),
|
{Peername, _} = cowboy_req:peer(Req),
|
||||||
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||||
{ok, Sockname} = Transport:sockname(Socket),
|
{ok, Sockname} = Transport:sockname(Socket),
|
||||||
|
|
@ -42,23 +44,25 @@ websocket_init(_TransportName, Req, _Opts) ->
|
||||||
{peername, Peername},
|
{peername, Peername},
|
||||||
{sockname, Sockname}]},
|
{sockname, Sockname}]},
|
||||||
{ok, _Sup, Pid} = rabbit_ws_sup:start_client({Conn}),
|
{ok, _Sup, Pid} = rabbit_ws_sup:start_client({Conn}),
|
||||||
{ok, Req, Pid}.
|
{ok, Req, #state{pid=Pid, type=FrameType}}.
|
||||||
|
|
||||||
websocket_handle({text, Data}, Req, State = Pid) ->
|
websocket_handle({text, Data}, Req, State=#state{pid=Pid}) ->
|
||||||
rabbit_ws_client:sockjs_msg(Pid, Data),
|
rabbit_ws_client:sockjs_msg(Pid, Data),
|
||||||
{ok, Req, State};
|
{ok, Req, State};
|
||||||
websocket_handle({binary, Data}, Req, State = Pid) ->
|
websocket_handle({binary, Data}, Req, State=#state{pid=Pid}) ->
|
||||||
rabbit_ws_client:sockjs_msg(Pid, Data),
|
rabbit_ws_client:sockjs_msg(Pid, Data),
|
||||||
{ok, Req, State};
|
{ok, Req, State};
|
||||||
websocket_handle(_Frame, Req, State) ->
|
websocket_handle(_Frame, Req, State) ->
|
||||||
{ok, Req, State}.
|
{ok, Req, State}.
|
||||||
|
|
||||||
websocket_info({send, Frame}, Req, State) ->
|
websocket_info({send, Msg}, Req, State=#state{type=FrameType}) ->
|
||||||
|
{reply, {FrameType, Msg}, Req, State};
|
||||||
|
websocket_info(Frame = {close, _, _}, Req, State) ->
|
||||||
{reply, Frame, Req, State};
|
{reply, Frame, Req, State};
|
||||||
websocket_info(_Info, Req, State) ->
|
websocket_info(_Info, Req, State) ->
|
||||||
{ok, Req, State}.
|
{ok, Req, State}.
|
||||||
|
|
||||||
websocket_terminate(_Reason, _Req, _State = Pid) ->
|
websocket_terminate(_Reason, _Req, #state{pid=Pid}) ->
|
||||||
rabbit_ws_client:sockjs_closed(Pid),
|
rabbit_ws_client:sockjs_closed(Pid),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
@ -77,9 +81,9 @@ info({?MODULE, _, Info}) ->
|
||||||
Info.
|
Info.
|
||||||
|
|
||||||
send(Data, {?MODULE, Pid, _}) ->
|
send(Data, {?MODULE, Pid, _}) ->
|
||||||
Pid ! {send, {text, Data}},
|
Pid ! {send, Data},
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
close(Code, Reason, {?MODULE, Pid, _}) ->
|
close(Code, Reason, {?MODULE, Pid, _}) ->
|
||||||
Pid ! {send, {close, Code, Reason}},
|
Pid ! {close, Code, Reason},
|
||||||
ok.
|
ok.
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ init() ->
|
||||||
{[{port, Port0}|TCPConf0], Port0}
|
{[{port, Port0}|TCPConf0], Port0}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
WsFrame = get_env(ws_frame, text),
|
||||||
CowboyOpts = get_env(cowboy_opts, []),
|
CowboyOpts = get_env(cowboy_opts, []),
|
||||||
|
|
||||||
SockjsOpts = get_env(sockjs_opts, []) ++ [{logger, fun logger/3}],
|
SockjsOpts = get_env(sockjs_opts, []) ++ [{logger, fun logger/3}],
|
||||||
|
|
@ -44,7 +45,7 @@ init() ->
|
||||||
<<"/stomp">>, fun service_stomp/3, {}, SockjsOpts),
|
<<"/stomp">>, fun service_stomp/3, {}, SockjsOpts),
|
||||||
VhostRoutes = [
|
VhostRoutes = [
|
||||||
{"/stomp/[...]", sockjs_cowboy_handler, SockjsState},
|
{"/stomp/[...]", sockjs_cowboy_handler, SockjsState},
|
||||||
{"/ws", rabbit_ws_handler, undefined}
|
{"/ws", rabbit_ws_handler, [{type, WsFrame}]}
|
||||||
],
|
],
|
||||||
Routes = cowboy_router:compile([{'_', VhostRoutes}]), % any vhost
|
Routes = cowboy_router:compile([{'_', VhostRoutes}]), % any vhost
|
||||||
NbAcceptors = get_env(nb_acceptors, 100),
|
NbAcceptors = get_env(nb_acceptors, 100),
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@
|
||||||
{ssl_config, []},
|
{ssl_config, []},
|
||||||
{nb_acceptors, 100},
|
{nb_acceptors, 100},
|
||||||
{cowboy_opts, []},
|
{cowboy_opts, []},
|
||||||
{sockjs_opts, []}]},
|
{sockjs_opts, []},
|
||||||
|
{ws_frame, text}]},
|
||||||
{applications, [kernel, stdlib, rabbit, rabbitmq_stomp, cowboy, sockjs]}
|
{applications, [kernel, stdlib, rabbit, rabbitmq_stomp, cowboy, sockjs]}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,50 @@ pubsub_test() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
||||||
|
raw_send_binary(WS, Command, Headers) ->
|
||||||
|
raw_send_binary(WS, Command, Headers, <<>>).
|
||||||
|
raw_send_binary(WS, Command, Headers, Body) ->
|
||||||
|
Frame = stomp:marshal(Command, Headers, Body),
|
||||||
|
rfc6455_client:send_binary(WS, Frame).
|
||||||
|
|
||||||
|
raw_recv_binary(WS) ->
|
||||||
|
{binary, P} = rfc6455_client:recv(WS),
|
||||||
|
stomp:unmarshal(P).
|
||||||
|
|
||||||
|
|
||||||
|
pubsub_binary_test() ->
|
||||||
|
%% Set frame type to binary and restart the web stomp application.
|
||||||
|
ok = application:set_env(rabbitmq_web_stomp, ws_frame, binary),
|
||||||
|
ok = application:stop(rabbitmq_web_stomp),
|
||||||
|
ok = cowboy:stop_listener(http),
|
||||||
|
ok = application:start(rabbitmq_web_stomp),
|
||||||
|
|
||||||
|
WS = rfc6455_client:new("ws://127.0.0.1:15674/ws", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS),
|
||||||
|
ok = raw_send(WS, "CONNECT", [{"login","guest"}, {"passcode", "guest"}]),
|
||||||
|
|
||||||
|
{<<"CONNECTED">>, _, <<>>} = raw_recv_binary(WS),
|
||||||
|
|
||||||
|
Dst = "/topic/test-" ++ stomp:list_to_hex(binary_to_list(crypto:rand_bytes(8))),
|
||||||
|
|
||||||
|
ok = raw_send(WS, "SUBSCRIBE", [{"destination", Dst},
|
||||||
|
{"id", "s0"}]),
|
||||||
|
|
||||||
|
ok = raw_send(WS, "SEND", [{"destination", Dst},
|
||||||
|
{"content-length", "3"}], <<"a\x00a">>),
|
||||||
|
|
||||||
|
{<<"MESSAGE">>, H, <<"a\x00a">>} = raw_recv_binary(WS),
|
||||||
|
Dst = binary_to_list(proplists:get_value(<<"destination">>, H)),
|
||||||
|
|
||||||
|
{close, _} = rfc6455_client:close(WS),
|
||||||
|
|
||||||
|
%% Set frame type back to text and restart the web stomp application.
|
||||||
|
ok = application:set_env(rabbitmq_web_stomp, ws_frame, text),
|
||||||
|
ok = application:stop(rabbitmq_web_stomp),
|
||||||
|
ok = cowboy:stop_listener(http),
|
||||||
|
ok = application:start(rabbitmq_web_stomp).
|
||||||
|
|
||||||
|
|
||||||
disconnect_test() ->
|
disconnect_test() ->
|
||||||
WS = rfc6455_client:new("ws://127.0.0.1:15674/ws", self()),
|
WS = rfc6455_client:new("ws://127.0.0.1:15674/ws", self()),
|
||||||
{ok, _} = rfc6455_client:open(WS),
|
{ok, _} = rfc6455_client:open(WS),
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,8 @@ recv(WS) ->
|
||||||
receive
|
receive
|
||||||
{rfc6455, recv, WS, Payload} ->
|
{rfc6455, recv, WS, Payload} ->
|
||||||
{ok, Payload};
|
{ok, Payload};
|
||||||
|
{rfc6455, recv_binary, WS, Payload} ->
|
||||||
|
{binary, Payload};
|
||||||
{rfc6455, close, WS, R} ->
|
{rfc6455, close, WS, R} ->
|
||||||
{close, R}
|
{close, R}
|
||||||
end.
|
end.
|
||||||
|
|
@ -60,6 +62,10 @@ send(WS, IoData) ->
|
||||||
WS ! {send, IoData},
|
WS ! {send, IoData},
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
send_binary(WS, IoData) ->
|
||||||
|
WS ! {send_binary, IoData},
|
||||||
|
ok.
|
||||||
|
|
||||||
close(WS) ->
|
close(WS) ->
|
||||||
close(WS, {1000, ""}).
|
close(WS, {1000, ""}).
|
||||||
|
|
||||||
|
|
@ -130,6 +136,9 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) ->
|
||||||
{1, 1, Payload, Rest} ->
|
{1, 1, Payload, Rest} ->
|
||||||
PPid ! {rfc6455, recv, self(), Payload},
|
PPid ! {rfc6455, recv, self(), Payload},
|
||||||
State#state{data = Rest};
|
State#state{data = Rest};
|
||||||
|
{1, 2, Payload, Rest} ->
|
||||||
|
PPid ! {rfc6455, recv_binary, self(), Payload},
|
||||||
|
State#state{data = Rest};
|
||||||
{1, 8, Payload, _Rest} ->
|
{1, 8, Payload, _Rest} ->
|
||||||
WsReason = case Payload of
|
WsReason = case Payload of
|
||||||
<<WC:16, WR/binary>> -> {WC, WR};
|
<<WC:16, WR/binary>> -> {WC, WR};
|
||||||
|
|
@ -167,6 +176,10 @@ do_send(State = #state{socket = Socket}, Payload) ->
|
||||||
gen_tcp:send(Socket, encode_frame(1, 1, Payload)),
|
gen_tcp:send(Socket, encode_frame(1, 1, Payload)),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
do_send_binary(State = #state{socket = Socket}, Payload) ->
|
||||||
|
gen_tcp:send(Socket, encode_frame(1, 2, Payload)),
|
||||||
|
State.
|
||||||
|
|
||||||
do_close(State = #state{socket = Socket}, {Code, Reason}) ->
|
do_close(State = #state{socket = Socket}, {Code, Reason}) ->
|
||||||
Payload = iolist_to_binary([<<Code:16>>, Reason]),
|
Payload = iolist_to_binary([<<Code:16>>, Reason]),
|
||||||
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
|
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
|
||||||
|
|
@ -181,6 +194,8 @@ loop(State = #state{socket = Socket, ppid = PPid, data = Data,
|
||||||
loop(do_recv(State1));
|
loop(do_recv(State1));
|
||||||
{send, Payload} when Phase == open ->
|
{send, Payload} when Phase == open ->
|
||||||
loop(do_send(State, Payload));
|
loop(do_send(State, Payload));
|
||||||
|
{send_binary, Payload} when Phase == open ->
|
||||||
|
loop(do_send_binary(State, Payload));
|
||||||
{tcp_closed, Socket} ->
|
{tcp_closed, Socket} ->
|
||||||
die(Socket, PPid, {1006, "Connection closed abnormally"}, normal);
|
die(Socket, PPid, {1006, "Connection closed abnormally"}, normal);
|
||||||
{close, WsReason} when Phase == open ->
|
{close, WsReason} when Phase == open ->
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue