Make it possible to configure WebSocket options such as compression

While at it, correctly configure connection inactivity
timeout with modern Cowboy versions.

Closes #34.
References rabbitmq/rabbitmq-web-stomp#89.

[#161053821]
This commit is contained in:
Michael Klishin 2018-10-08 21:19:12 +03:00
parent 0707831427
commit 97a08e78a5
4 changed files with 92 additions and 12 deletions

View File

@ -67,5 +67,38 @@
[{datatype, integer}]}.
{mapping, "web_mqtt.cowboy_opts.max_request_line_length", "rabbitmq_web_mqtt.cowboy_opts.max_request_line_length",
[{datatype, integer}]}.
{mapping, "web_mqtt.cowboy_opts.timeout", "rabbitmq_web_mqtt.cowboy_opts.timeout",
{mapping, "web_mqtt.cowboy_opts.timeout", "rabbitmq_web_mqtt.cowboy_opts.idle_timeout",
[{datatype, integer}]}.
%% backwards compatibility
{mapping, "web_mqtt.cowboy_opts.timeout", "rabbitmq_web_mqtt.cowboy_opts.idle_timeout",
[{datatype, integer}, {validators, ["non_negative_integer"]}]
}.
%% recent Cowboy versions have several timeout settings
{mapping, "web_mqtt.cowboy_opts.idle_timeout", "rabbitmq_web_mqtt.cowboy_opts.idle_timeout",
[{datatype, integer}]
}.
{translation,
"rabbitmq_web_mqtt.cowboy_opts.idle_timeout",
fun(Conf) ->
case cuttlefish:conf_get("web_mqtt.cowboy_opts.timeout", Conf, undefined) of
Value when is_integer(Value) ->
Value;
undefined ->
case cuttlefish:conf_get("web_mqtt.cowboy_opts.idle_timeout", Conf, undefined) of
undefined -> cuttlefish:unset();
Value -> Value
end
end
end
}.
{mapping, "web_mqtt.ws_opts.compress", "rabbitmq_web_mqtt.cowboy_ws_opts.compress",
[{datatype, {enum, [true, false]}}]}.
{mapping, "web_mqtt.ws_opts.max_frame_size", "rabbitmq_web_mqtt.cowboy_ws_opts.max_frame_size",
[{datatype, integer}, {validators, ["non_negative_integer"]}]
}.
{mapping, "web_mqtt.ws_opts.idle_timeout", "rabbitmq_web_mqtt.cowboy_ws_opts.idle_timeout",
[{datatype, integer}, {validators, ["non_negative_integer"]}]
}.

View File

@ -45,21 +45,27 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.
%%----------------------------------------------------------------------------
mqtt_init() ->
CowboyOpts0 = get_env(cowboy_opts, []),
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
Routes = cowboy_router:compile([{'_', [
{get_env(ws_path, "/ws"), rabbit_web_mqtt_handler, []}
]}]),
CowboyOpts = maps:from_list([
{env, #{dispatch => Routes}},
{middlewares, [cowboy_router, rabbit_web_mqtt_middleware, cowboy_handler]}
|CowboyOpts0]),
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
middlewares => [cowboy_router, rabbit_web_mqtt_middleware, cowboy_handler]},
{TCPConf, IpStr, Port} = get_tcp_conf(),
{ok, _} = ranch:start_listener(web_mqtt, get_env(num_tcp_acceptors, 10),
case ranch:start_listener(web_mqtt, get_env(num_tcp_acceptors, 10),
ranch_tcp, TCPConf,
rabbit_web_mqtt_connection_sup, CowboyOpts),
rabbit_web_mqtt_connection_sup, CowboyOpts) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
{error, Err} ->
rabbit_log_connection:error(
"Failed to start a WebSocket (HTTP) listener. Error: ~p,"
" listener settings: ~p~n",
[Err, TCPConf]),
throw(Err)
end,
listener_started('http/web-mqtt', TCPConf),
rabbit_log:info("rabbit_web_mqtt: listening for HTTP connections on ~s:~w~n",
[IpStr, Port]),

View File

@ -46,13 +46,14 @@ init(Req, Opts) ->
SecWsProtocol ->
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, SecWsProtocol, Req)
end,
WsOpts = proplists:get_value(ws_opts, Opts, #{}),
{cowboy_websocket, Req2, #state{
conn_name = ConnStr,
keepalive = {none, none},
keepalive_sup = KeepaliveSup,
parse_state = rabbit_mqtt_frame:initial_state(),
socket = Sock
}};
}, WsOpts};
_ ->
{stop, Req}
end.

View File

@ -49,7 +49,47 @@
[{ws_path, "/rmq/ws"}]}],
[rabbitmq_web_mqtt]},
%%
%% Cowboy options
%%
{cowboy_max_keepalive,
"web_mqtt.cowboy_opts.max_keepalive = 10",
[{rabbitmq_web_mqtt,[{cowboy_opts,[{max_keepalive,10}]}]}],
[rabbitmq_web_mqtt]}].
[{rabbitmq_web_mqtt,[{cowboy_opts,[{max_keepalive, 10}]}]}],
[rabbitmq_web_mqtt]},
{cowboy_timeout,
"web_mqtt.cowboy_opts.timeout = 10000",
[{rabbitmq_web_mqtt,[{cowboy_opts,[{idle_timeout, 10000}]}]}],
[rabbitmq_web_mqtt]},
{cowboy_idle_timeout,
"web_mqtt.cowboy_opts.idle_timeout = 10000",
[{rabbitmq_web_mqtt,[{cowboy_opts,[{idle_timeout, 10000}]}]}],
[rabbitmq_web_mqtt]},
%%
%% Cowboy WebSocket options
%%
{ws_opts_compress_true,
"web_mqtt.ws_opts.compress = true",
[{rabbitmq_web_mqtt,[{cowboy_ws_opts,[{compress, true}]}]}],
[rabbitmq_web_mqtt]},
{ws_opts_compress_false,
"web_mqtt.ws_opts.compress = false",
[{rabbitmq_web_mqtt,[{cowboy_ws_opts,[{compress, false}]}]}],
[rabbitmq_web_mqtt]},
{ws_opts_max_frame_size,
"web_mqtt.ws_opts.max_frame_size = 8000",
[{rabbitmq_web_mqtt,[{cowboy_ws_opts,[{max_frame_size, 8000}]}]}],
[rabbitmq_web_mqtt]},
{ws_idle_timeout,
"web_mqtt.ws_opts.idle_timeout = 10000",
[{rabbitmq_web_mqtt,[{cowboy_ws_opts,[{idle_timeout, 10000}]}]}],
[rabbitmq_web_mqtt]}
].