Make mqtt.subscription_ttl unsupported
Starting with RabbitMQ 3.13 mqtt.max_session_expiry_interval_seconds (set in seconds) will replace the previous setting mqtt.subscription_ttl. MQTT 5.0 introduces the Session Expiry Interval feature which does not only apply to subscribers, but also to publishers. The new config name mqtt.max_session_expiry_interval_seconds makes it clear that it also applies to publishers. Prior to this commit, when mqtt.subscription_ttl was set, a warning got logged and the value was ignored. This is dangerous if an operator does not see the warning but relies for example on `mqtt.subscription = infinity` to not expire non clean session. It's safer to make the boot fail if that unsupported config name is still set. A clear error message will be logged: ``` [error] <0.142.0> Error preparing configuration in phase apply_translations: [error] <0.142.0> - Translation for 'rabbitmq_mqtt.subscription_ttl' found invalid configuration: Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported. Use mqtt.max_session_expiry_interval_seconds (in seconds) instead. ``` Alternatively, RabbitMQ could translate mqtt.subscription_ttl to mqtt.max_session_expiry_interval_seconds. However, forcing the new config option sounds the better way to go. Once we write MQTT 5.0 docs, this change must go into the 3.13 release notes. This commit also renames max_session_expiry_interval_secs to max_session_expiry_interval_seconds. The latter is clearer to users.
This commit is contained in:
parent
50e96345b6
commit
ed31a818c3
|
@ -913,7 +913,7 @@
|
|||
## 'infinity' means the session does not expire.
|
||||
## An MQTT 5.0 client can choose a lower value.
|
||||
##
|
||||
# mqtt.max_session_expiry_interval_secs = 1800
|
||||
# mqtt.max_session_expiry_interval_seconds = 1800
|
||||
|
||||
## Set the prefetch count (governing the maximum number of unacknowledged
|
||||
## messages that will be delivered).
|
||||
|
|
|
@ -34,7 +34,7 @@ APP_ENV = """[
|
|||
{allow_anonymous, true},
|
||||
{vhost, <<"/">>},
|
||||
{exchange, <<"amq.topic">>},
|
||||
{max_session_expiry_interval_secs, 86400}, %% 1 day
|
||||
{max_session_expiry_interval_seconds, 86400}, %% 1 day
|
||||
{retained_message_store, rabbit_mqtt_retained_msg_store_dets},
|
||||
%% only used by DETS store
|
||||
{retained_message_store_dets_sync_interval, 2000},
|
||||
|
|
|
@ -11,7 +11,7 @@ define PROJECT_ENV
|
|||
{allow_anonymous, true},
|
||||
{vhost, <<"/">>},
|
||||
{exchange, <<"amq.topic">>},
|
||||
{max_session_expiry_interval_secs, 86400}, %% 1 day
|
||||
{max_session_expiry_interval_seconds, 86400}, %% 1 day
|
||||
{retained_message_store, rabbit_mqtt_retained_msg_store_dets},
|
||||
%% only used by DETS store
|
||||
{retained_message_store_dets_sync_interval, 2000},
|
||||
|
|
|
@ -71,24 +71,22 @@ end}.
|
|||
|
||||
{translation, "rabbitmq_mqtt.subscription_ttl",
|
||||
fun(Conf) ->
|
||||
cuttlefish:warn(
|
||||
"Since 3.13 mqtt.subscription_ttl (in milliseconds) is deprecated and "
|
||||
"has no effect anymore. Use mqtt.max_session_expiry_interval_secs (in "
|
||||
"seconds) instead."),
|
||||
cuttlefish:unset()
|
||||
cuttlefish:invalid(
|
||||
"Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported. "
|
||||
"Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.")
|
||||
end}.
|
||||
|
||||
%% Defines the maximum Session Expiry Interval in seconds allowed by the server.
|
||||
%% 'infinity' means the session does not expire.
|
||||
%% An MQTT 5.0 client can choose a lower value.
|
||||
|
||||
{mapping, "mqtt.max_session_expiry_interval_secs", "rabbitmq_mqtt.max_session_expiry_interval_secs", [
|
||||
{mapping, "mqtt.max_session_expiry_interval_seconds", "rabbitmq_mqtt.max_session_expiry_interval_seconds", [
|
||||
{datatype, [integer, {atom, infinity}]}
|
||||
]}.
|
||||
|
||||
{translation, "rabbitmq_mqtt.max_session_expiry_interval_secs",
|
||||
{translation, "rabbitmq_mqtt.max_session_expiry_interval_seconds",
|
||||
fun(Conf) ->
|
||||
case cuttlefish:conf_get("mqtt.max_session_expiry_interval_secs", Conf) of
|
||||
case cuttlefish:conf_get("mqtt.max_session_expiry_interval_seconds", Conf) of
|
||||
N when is_integer(N) andalso N < 0 ->
|
||||
cuttlefish:invalid("negative integer not allowed");
|
||||
Valid ->
|
||||
|
|
|
@ -148,7 +148,7 @@ process_connect(
|
|||
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
|
||||
TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM),
|
||||
TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax),
|
||||
{ok, MaxSessionExpiry} = application:get_env(?APP_NAME, max_session_expiry_interval_secs),
|
||||
{ok, MaxSessionExpiry} = application:get_env(?APP_NAME, max_session_expiry_interval_seconds),
|
||||
SessionExpiry =
|
||||
case {ProtoVer, CleanStart} of
|
||||
{5, _} ->
|
||||
|
@ -157,7 +157,7 @@ process_connect(
|
|||
?UINT_MAX ->
|
||||
%% "If the Session Expiry Interval is 0xFFFFFFFF (UINT_MAX),
|
||||
%% the Session does not expire."
|
||||
min(infinity, MaxSessionExpiry);
|
||||
MaxSessionExpiry;
|
||||
Seconds ->
|
||||
min(Seconds, MaxSessionExpiry)
|
||||
end;
|
||||
|
@ -569,7 +569,7 @@ process_request(?DISCONNECT,
|
|||
State0;
|
||||
_ ->
|
||||
%% "The session expiry interval can be modified at disconnect."
|
||||
{ok, MaxSEI} = application:get_env(?APP_NAME, max_session_expiry_interval_secs),
|
||||
{ok, MaxSEI} = application:get_env(?APP_NAME, max_session_expiry_interval_seconds),
|
||||
NewSEI = min(RequestedSEI, MaxSEI),
|
||||
lists:foreach(fun(QName) ->
|
||||
update_session_expiry_interval(QName, NewSEI)
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
mqtt.allow_anonymous = true
|
||||
mqtt.vhost = /
|
||||
mqtt.exchange = amq.topic
|
||||
mqtt.max_session_expiry_interval_secs = 1800
|
||||
mqtt.max_session_expiry_interval_seconds = 86400
|
||||
mqtt.prefetch = 10
|
||||
mqtt.sparkplug = true
|
||||
mqtt.listeners.ssl = none
|
||||
|
@ -25,7 +25,7 @@
|
|||
{allow_anonymous,true},
|
||||
{vhost,<<"/">>},
|
||||
{exchange,<<"amq.topic">>},
|
||||
{max_session_expiry_interval_secs,1800},
|
||||
{max_session_expiry_interval_seconds,86400},
|
||||
{prefetch,10},
|
||||
{sparkplug,true},
|
||||
{ssl_listeners,[]},
|
||||
|
@ -106,7 +106,7 @@
|
|||
mqtt.allow_anonymous = true
|
||||
mqtt.vhost = /
|
||||
mqtt.exchange = amq.topic
|
||||
mqtt.max_session_expiry_interval_secs = infinity
|
||||
mqtt.max_session_expiry_interval_seconds = infinity
|
||||
mqtt.prefetch = 10
|
||||
mqtt.proxy_protocol = true",
|
||||
[{rabbit,[{tcp_listeners,[5672]}]},
|
||||
|
@ -116,7 +116,7 @@
|
|||
{allow_anonymous,true},
|
||||
{vhost,<<"/">>},
|
||||
{exchange,<<"amq.topic">>},
|
||||
{max_session_expiry_interval_secs,infinity},
|
||||
{max_session_expiry_interval_seconds,infinity},
|
||||
{prefetch,10},
|
||||
{proxy_protocol,true}]}],
|
||||
[rabbitmq_mqtt]},
|
||||
|
@ -126,7 +126,7 @@
|
|||
mqtt.allow_anonymous = true
|
||||
mqtt.vhost = /
|
||||
mqtt.exchange = amq.topic
|
||||
mqtt.max_session_expiry_interval_secs = 1800
|
||||
mqtt.max_session_expiry_interval_seconds = 1800
|
||||
mqtt.prefetch = 10
|
||||
## use DETS (disk-based) store for retained messages
|
||||
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
|
||||
|
@ -141,7 +141,7 @@
|
|||
{allow_anonymous,true},
|
||||
{vhost,<<"/">>},
|
||||
{exchange,<<"amq.topic">>},
|
||||
{max_session_expiry_interval_secs,1800},
|
||||
{max_session_expiry_interval_seconds,1800},
|
||||
{prefetch,10},
|
||||
{retained_message_store,rabbit_mqtt_retained_msg_store_dets},
|
||||
{retained_message_store_dets_sync_interval,2000},
|
||||
|
|
|
@ -824,7 +824,7 @@ delete_create_queue(Config) ->
|
|||
|
||||
session_expiry(Config) ->
|
||||
App = rabbitmq_mqtt,
|
||||
Par = max_session_expiry_interval_secs,
|
||||
Par = max_session_expiry_interval_seconds,
|
||||
Seconds = 1,
|
||||
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
|
||||
ok = rpc(Config, application, set_env, [App, Par, Seconds]),
|
||||
|
|
|
@ -198,7 +198,7 @@ init_per_testcase(T, Config)
|
|||
when T =:= session_expiry_disconnect_infinity_to_zero;
|
||||
T =:= session_expiry_disconnect_to_infinity;
|
||||
T =:= session_expiry_reconnect_infinity_to_zero ->
|
||||
Par = max_session_expiry_interval_secs,
|
||||
Par = max_session_expiry_interval_seconds,
|
||||
{ok, Default} = rpc(Config, application, get_env, [?APP, Par]),
|
||||
ok = rpc(Config, application, set_env, [?APP, Par, infinity]),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, {Par, Default}),
|
||||
|
@ -215,7 +215,7 @@ end_per_testcase(T, Config)
|
|||
when T =:= session_expiry_disconnect_infinity_to_zero;
|
||||
T =:= session_expiry_disconnect_to_infinity;
|
||||
T =:= session_expiry_reconnect_infinity_to_zero ->
|
||||
Par = max_session_expiry_interval_secs,
|
||||
Par = max_session_expiry_interval_seconds,
|
||||
Default = ?config(Par, Config),
|
||||
ok = rpc(Config, application, set_env, [?APP, Par, Default]),
|
||||
end_per_testcase0(T, Config);
|
||||
|
|
Loading…
Reference in New Issue