rabbitmq-server/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema

337 lines
12 KiB
Plaintext

%% ----------------------------------------------------------------------------
%% RabbitMQ MQTT Adapter
%%
%% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
%% for details
%% ----------------------------------------------------------------------------
% {rabbitmq_mqtt,
% [%% Enable anonymous access. If this is set to false, clients MUST provide
%% login information in order to connect. See the anonymous_login_user/anonymous_login_pass
%% configuration elements for managing logins without authentication.
%%
%% {allow_anonymous, true},
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
[{datatype, {enum, [true, false]}}]}.
%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
%% {vhost, <<"/">>},
{mapping, "mqtt.vhost", "rabbitmq_mqtt.vhost", [{datatype, string}]}.
{translation, "rabbitmq_mqtt.vhost",
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("mqtt.vhost", Conf))
end}.
%% Specify the exchange to which messages from MQTT clients are published.
%%
%% {exchange, <<"amq.topic">>},
{mapping, "mqtt.exchange", "rabbitmq_mqtt.exchange", [{datatype, string}]}.
{translation, "rabbitmq_mqtt.exchange",
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("mqtt.exchange", Conf))
end}.
{mapping, "mqtt.subscription_ttl", "rabbitmq_mqtt.subscription_ttl", [
{datatype, [{enum, [undefined, infinity]}, integer]}
]}.
{translation, "rabbitmq_mqtt.subscription_ttl",
fun(Conf) ->
cuttlefish:invalid(
"Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported. "
"Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.")
end}.
%% Defines the maximum Session Expiry Interval in seconds allowed by the server.
%% 'infinity' means the session does not expire.
%% An MQTT 5.0 client can choose a lower value.
{mapping, "mqtt.max_session_expiry_interval_seconds", "rabbitmq_mqtt.max_session_expiry_interval_seconds", [
{datatype, [integer, {atom, infinity}]}
]}.
{translation, "rabbitmq_mqtt.max_session_expiry_interval_seconds",
fun(Conf) ->
case cuttlefish:conf_get("mqtt.max_session_expiry_interval_seconds", Conf) of
N when is_integer(N) andalso N < 0 ->
cuttlefish:invalid("negative integer not allowed");
Valid ->
Valid
end
end}.
%% Set the prefetch count (governing the maximum number of unacknowledged
%% messages that will be delivered).
%%
%% {prefetch, 10},
{mapping, "mqtt.prefetch", "rabbitmq_mqtt.prefetch",
[{datatype, integer}]}.
%% Enable "Sparkplug B" namespace recognition so that the dot in the
%% namespace is not translated to a slash
%%
%% {sparkplug, true},
{mapping, "mqtt.sparkplug", "rabbitmq_mqtt.sparkplug",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.retained_message_store", "rabbitmq_mqtt.retained_message_store",
[{datatype, atom}]}.
{mapping, "mqtt.retained_message_store_dets_sync_interval", "rabbitmq_mqtt.retained_message_store_dets_sync_interval",
[{datatype, integer}]}.
%% Whether or not to enable proxy protocol support.
%%
%% {proxy_protocol, false}
{mapping, "mqtt.proxy_protocol", "rabbitmq_mqtt.proxy_protocol",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.login_timeout", "rabbitmq_mqtt.login_timeout",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
%% TCP/SSL Configuration (as per the broker configuration).
%%
%% {tcp_listeners, [1883]},
%% {ssl_listeners, []},
{mapping, "mqtt.listeners.tcp", "rabbitmq_mqtt.tcp_listeners",[
{datatype, {enum, [none]}}
]}.
{mapping, "mqtt.listeners.tcp.$name", "rabbitmq_mqtt.tcp_listeners",[
{datatype, [integer, ip]}
]}.
{translation, "rabbitmq_mqtt.tcp_listeners",
fun(Conf) ->
case cuttlefish:conf_get("mqtt.listeners.tcp", Conf, undefined) of
none -> [];
_ ->
Settings = cuttlefish_variable:filter_by_prefix("mqtt.listeners.tcp", Conf),
[ V || {_, V} <- Settings ]
end
end}.
{mapping, "mqtt.listeners.ssl", "rabbitmq_mqtt.ssl_listeners",[
{datatype, {enum, [none]}}
]}.
{mapping, "mqtt.listeners.ssl.$name", "rabbitmq_mqtt.ssl_listeners",[
{datatype, [integer, ip]}
]}.
{translation, "rabbitmq_mqtt.ssl_listeners",
fun(Conf) ->
case cuttlefish:conf_get("mqtt.listeners.ssl", Conf, undefined) of
none -> [];
_ ->
Settings = cuttlefish_variable:filter_by_prefix("mqtt.listeners.ssl", Conf),
[ V || {_, V} <- Settings ]
end
end}.
%% Number of Erlang processes that will accept connections for the TCP
%% and SSL listeners.
%%
%% {num_tcp_acceptors, 10},
%% {num_ssl_acceptors, 10},
{mapping, "mqtt.num_acceptors.ssl", "rabbitmq_mqtt.num_ssl_acceptors", [
{datatype, integer}
]}.
{mapping, "mqtt.num_acceptors.tcp", "rabbitmq_mqtt.num_tcp_acceptors", [
{datatype, integer}
]}.
{mapping, "mqtt.ssl_cert_login", "rabbitmq_mqtt.ssl_cert_login", [
{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.ssl_cert_client_id_from", "rabbitmq_mqtt.ssl_cert_client_id_from", [
{datatype, {enum, [distinguished_name, subject_alternative_name]}}
]}.
{mapping, "mqtt.ssl_cert_login_san_type", "rabbitmq_mqtt.ssl_cert_login_san_type", [
{datatype, {enum, [dns, ip, email, uri, other_name]}}
]}.
{mapping, "mqtt.ssl_cert_login_san_index", "rabbitmq_mqtt.ssl_cert_login_san_index", [
{datatype, integer}, {validators, ["non_negative_integer"]}
]}.
%% TCP/Socket options (as per the broker configuration).
%%
%% {tcp_listen_options, [{backlog, 128},
%% {nodelay, true}]}
% ]},
%% TCP listener section ======================================================
{mapping, "mqtt.tcp_listen_options", "rabbitmq_mqtt.rabbit.tcp_listen_options", [
{datatype, {enum, [none]}}]}.
{translation, "rabbitmq_mqtt.rabbit.tcp_listen_options",
fun(Conf) ->
case cuttlefish:conf_get("mqtt.tcp_listen_options") of
none -> [];
_ -> cuttlefish:invalid("Invalid mqtt.tcp_listen_options")
end
end}.
{mapping, "mqtt.tcp_listen_options.backlog", "rabbitmq_mqtt.tcp_listen_options.backlog", [
{datatype, integer}
]}.
{mapping, "mqtt.tcp_listen_options.nodelay", "rabbitmq_mqtt.tcp_listen_options.nodelay", [
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.tcp_listen_options.buffer", "rabbitmq_mqtt.tcp_listen_options.buffer",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.delay_send", "rabbitmq_mqtt.tcp_listen_options.delay_send",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.dontroute", "rabbitmq_mqtt.tcp_listen_options.dontroute",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.exit_on_close", "rabbitmq_mqtt.tcp_listen_options.exit_on_close",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.fd", "rabbitmq_mqtt.tcp_listen_options.fd",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.high_msgq_watermark", "rabbitmq_mqtt.tcp_listen_options.high_msgq_watermark",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.high_watermark", "rabbitmq_mqtt.tcp_listen_options.high_watermark",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.keepalive", "rabbitmq_mqtt.tcp_listen_options.keepalive",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.low_msgq_watermark", "rabbitmq_mqtt.tcp_listen_options.low_msgq_watermark",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.low_watermark", "rabbitmq_mqtt.tcp_listen_options.low_watermark",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.port", "rabbitmq_mqtt.tcp_listen_options.port",
[{datatype, integer}, {validators, ["port"]}]}.
{mapping, "mqtt.tcp_listen_options.priority", "rabbitmq_mqtt.tcp_listen_options.priority",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.recbuf", "rabbitmq_mqtt.tcp_listen_options.recbuf",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.send_timeout", "rabbitmq_mqtt.tcp_listen_options.send_timeout",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.send_timeout_close", "rabbitmq_mqtt.tcp_listen_options.send_timeout_close",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.sndbuf", "rabbitmq_mqtt.tcp_listen_options.sndbuf",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.tos", "rabbitmq_mqtt.tcp_listen_options.tos",
[{datatype, integer}]}.
{mapping, "mqtt.tcp_listen_options.linger.on", "rabbitmq_mqtt.tcp_listen_options.linger",
[{datatype, {enum, [true, false]}}]}.
{mapping, "mqtt.tcp_listen_options.linger.timeout", "rabbitmq_mqtt.tcp_listen_options.linger",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
{translation, "rabbitmq_mqtt.tcp_listen_options.linger",
fun(Conf) ->
LingerOn = cuttlefish:conf_get("mqtt.tcp_listen_options.linger.on", Conf, false),
LingerTimeout = cuttlefish:conf_get("mqtt.tcp_listen_options.linger.timeout", Conf, 0),
{LingerOn, LingerTimeout}
end}.
%% Durable queue type to be used for QoS 1 subscriptions
%%
{mapping, "mqtt.durable_queue_type", "rabbitmq_mqtt.durable_queue_type",
[{datatype, {enum, [classic, quorum]}}]}.
%% If feature flag rabbit_mqtt_qos0_queue is enabled, a pseudo queue of type rabbit_mqtt_qos0_queue is
%% created for an MQTT "Clean Session" that subscribes with QoS 0. This queue type holds MQTT messages
%% directly in the MQTT connection process mailbox. This queue type has no flow control which
%% means MQTT messages might arrive faster in the Erlang mailbox than being sent to the subscribing MQTT
%% client. To protect against high memory usage, RabbitMQ intentionally drops QoS 0 messages if both
%% 1.) the number of messages in the MQTT connection process mailbox exceeds mailbox_soft_limit, and
%% 2.) the socket sending to the MQTT client is busy (i.e. not sending fast enough).
%% Note that there can be other messages in the mailbox (e.g. MQTT messages sent from the MQTT client to
%% RabbitMQ or confirmations from another queue type to the MQTT connection process) which are
%% obviously not dropped. However, these other messages also contribute to the `mailbox_soft_limit`.
%% Setting mailbox_soft_limit to 0 disables the overload protection mechanism to drop QoS 0 messages.
%% Setting mailbox_soft_limit to a very high value decreases the likelihood of intentionally dropping
%% QoS 0 messages while increasing the risk of causing a cluster wide memory alarm.
%%
%% {mailbox_soft_limit, 30},
{mapping, "mqtt.mailbox_soft_limit", "rabbitmq_mqtt.mailbox_soft_limit",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
%% Maximum packet size
{mapping, "mqtt.max_packet_size_unauthenticated", "rabbitmq_mqtt.max_packet_size_unauthenticated", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.
{mapping, "mqtt.max_packet_size_authenticated", "rabbitmq_mqtt.max_packet_size_authenticated", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.
%% Topic Aliax Maximum
{mapping, "mqtt.topic_alias_maximum", "rabbitmq_mqtt.topic_alias_maximum", [
{datatype, integer},
{validators, ["non_negative_integer"]}
]}.
%%
%% Message interceptors
%%
{mapping, "mqtt.message_interceptors.$stage.$name.$key", "rabbitmq_mqtt.message_interceptors", [
{datatype, {enum, [true, false]}}
]}.
{translation, "rabbitmq_mqtt.message_interceptors",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of
[] ->
cuttlefish:unset();
L ->
lists:foldr(
fun({["mqtt", "message_interceptors", "incoming", "set_client_id_annotation", "enabled"], Enabled}, Acc) ->
case Enabled of
true ->
Mod = rabbit_mqtt_msg_interceptor_client_id,
Cfg = #{},
[{Mod, Cfg} | Acc];
false ->
Acc
end;
(Other, _Acc) ->
cuttlefish:invalid(io_lib:format("~p is invalid", [Other]))
end, [], L)
end
end
}.