337 lines
12 KiB
Plaintext
337 lines
12 KiB
Plaintext
%% ----------------------------------------------------------------------------
|
|
%% RabbitMQ MQTT Adapter
|
|
%%
|
|
%% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
|
|
%% for details
|
|
%% ----------------------------------------------------------------------------
|
|
|
|
% {rabbitmq_mqtt,
|
|
% [%% Enable anonymous access. If this is set to false, clients MUST provide
|
|
%% login information in order to connect. See the anonymous_login_user/anonymous_login_pass
|
|
%% configuration elements for managing logins without authentication.
|
|
%%
|
|
%% {allow_anonymous, true},
|
|
|
|
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
%% If you have multiple chosts, specify the one to which the
|
|
%% adapter connects.
|
|
%%
|
|
%% {vhost, <<"/">>},
|
|
|
|
{mapping, "mqtt.vhost", "rabbitmq_mqtt.vhost", [{datatype, string}]}.
|
|
|
|
{translation, "rabbitmq_mqtt.vhost",
|
|
fun(Conf) ->
|
|
list_to_binary(cuttlefish:conf_get("mqtt.vhost", Conf))
|
|
end}.
|
|
|
|
%% Specify the exchange to which messages from MQTT clients are published.
|
|
%%
|
|
%% {exchange, <<"amq.topic">>},
|
|
|
|
{mapping, "mqtt.exchange", "rabbitmq_mqtt.exchange", [{datatype, string}]}.
|
|
|
|
{translation, "rabbitmq_mqtt.exchange",
|
|
fun(Conf) ->
|
|
list_to_binary(cuttlefish:conf_get("mqtt.exchange", Conf))
|
|
end}.
|
|
|
|
{mapping, "mqtt.subscription_ttl", "rabbitmq_mqtt.subscription_ttl", [
|
|
{datatype, [{enum, [undefined, infinity]}, integer]}
|
|
]}.
|
|
|
|
{translation, "rabbitmq_mqtt.subscription_ttl",
|
|
fun(Conf) ->
|
|
cuttlefish:invalid(
|
|
"Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported. "
|
|
"Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.")
|
|
end}.
|
|
|
|
%% Defines the maximum Session Expiry Interval in seconds allowed by the server.
|
|
%% 'infinity' means the session does not expire.
|
|
%% An MQTT 5.0 client can choose a lower value.
|
|
|
|
{mapping, "mqtt.max_session_expiry_interval_seconds", "rabbitmq_mqtt.max_session_expiry_interval_seconds", [
|
|
{datatype, [integer, {atom, infinity}]}
|
|
]}.
|
|
|
|
{translation, "rabbitmq_mqtt.max_session_expiry_interval_seconds",
|
|
fun(Conf) ->
|
|
case cuttlefish:conf_get("mqtt.max_session_expiry_interval_seconds", Conf) of
|
|
N when is_integer(N) andalso N < 0 ->
|
|
cuttlefish:invalid("negative integer not allowed");
|
|
Valid ->
|
|
Valid
|
|
end
|
|
end}.
|
|
|
|
%% Set the prefetch count (governing the maximum number of unacknowledged
|
|
%% messages that will be delivered).
|
|
%%
|
|
%% {prefetch, 10},
|
|
{mapping, "mqtt.prefetch", "rabbitmq_mqtt.prefetch",
|
|
[{datatype, integer}]}.
|
|
|
|
%% Enable "Sparkplug B" namespace recognition so that the dot in the
|
|
%% namespace is not translated to a slash
|
|
%%
|
|
%% {sparkplug, true},
|
|
{mapping, "mqtt.sparkplug", "rabbitmq_mqtt.sparkplug",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.retained_message_store", "rabbitmq_mqtt.retained_message_store",
|
|
[{datatype, atom}]}.
|
|
|
|
{mapping, "mqtt.retained_message_store_dets_sync_interval", "rabbitmq_mqtt.retained_message_store_dets_sync_interval",
|
|
[{datatype, integer}]}.
|
|
|
|
%% Whether or not to enable proxy protocol support.
|
|
%%
|
|
%% {proxy_protocol, false}
|
|
|
|
{mapping, "mqtt.proxy_protocol", "rabbitmq_mqtt.proxy_protocol",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.login_timeout", "rabbitmq_mqtt.login_timeout",
|
|
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
|
|
|
|
%% TCP/SSL Configuration (as per the broker configuration).
|
|
%%
|
|
%% {tcp_listeners, [1883]},
|
|
%% {ssl_listeners, []},
|
|
|
|
{mapping, "mqtt.listeners.tcp", "rabbitmq_mqtt.tcp_listeners",[
|
|
{datatype, {enum, [none]}}
|
|
]}.
|
|
|
|
{mapping, "mqtt.listeners.tcp.$name", "rabbitmq_mqtt.tcp_listeners",[
|
|
{datatype, [integer, ip]}
|
|
]}.
|
|
|
|
{translation, "rabbitmq_mqtt.tcp_listeners",
|
|
fun(Conf) ->
|
|
case cuttlefish:conf_get("mqtt.listeners.tcp", Conf, undefined) of
|
|
none -> [];
|
|
_ ->
|
|
Settings = cuttlefish_variable:filter_by_prefix("mqtt.listeners.tcp", Conf),
|
|
[ V || {_, V} <- Settings ]
|
|
end
|
|
end}.
|
|
|
|
{mapping, "mqtt.listeners.ssl", "rabbitmq_mqtt.ssl_listeners",[
|
|
{datatype, {enum, [none]}}
|
|
]}.
|
|
|
|
{mapping, "mqtt.listeners.ssl.$name", "rabbitmq_mqtt.ssl_listeners",[
|
|
{datatype, [integer, ip]}
|
|
]}.
|
|
|
|
{translation, "rabbitmq_mqtt.ssl_listeners",
|
|
fun(Conf) ->
|
|
case cuttlefish:conf_get("mqtt.listeners.ssl", Conf, undefined) of
|
|
none -> [];
|
|
_ ->
|
|
Settings = cuttlefish_variable:filter_by_prefix("mqtt.listeners.ssl", Conf),
|
|
[ V || {_, V} <- Settings ]
|
|
end
|
|
end}.
|
|
|
|
%% Number of Erlang processes that will accept connections for the TCP
|
|
%% and SSL listeners.
|
|
%%
|
|
%% {num_tcp_acceptors, 10},
|
|
%% {num_ssl_acceptors, 10},
|
|
|
|
{mapping, "mqtt.num_acceptors.ssl", "rabbitmq_mqtt.num_ssl_acceptors", [
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "mqtt.num_acceptors.tcp", "rabbitmq_mqtt.num_tcp_acceptors", [
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "mqtt.ssl_cert_login", "rabbitmq_mqtt.ssl_cert_login", [
|
|
{datatype, {enum, [true, false]}}]}.
|
|
|
|
|
|
{mapping, "mqtt.ssl_cert_client_id_from", "rabbitmq_mqtt.ssl_cert_client_id_from", [
|
|
{datatype, {enum, [distinguished_name, subject_alternative_name]}}
|
|
]}.
|
|
|
|
{mapping, "mqtt.ssl_cert_login_san_type", "rabbitmq_mqtt.ssl_cert_login_san_type", [
|
|
{datatype, {enum, [dns, ip, email, uri, other_name]}}
|
|
]}.
|
|
|
|
{mapping, "mqtt.ssl_cert_login_san_index", "rabbitmq_mqtt.ssl_cert_login_san_index", [
|
|
{datatype, integer}, {validators, ["non_negative_integer"]}
|
|
]}.
|
|
|
|
|
|
|
|
%% TCP/Socket options (as per the broker configuration).
|
|
%%
|
|
%% {tcp_listen_options, [{backlog, 128},
|
|
%% {nodelay, true}]}
|
|
% ]},
|
|
|
|
%% TCP listener section ======================================================
|
|
|
|
{mapping, "mqtt.tcp_listen_options", "rabbitmq_mqtt.rabbit.tcp_listen_options", [
|
|
{datatype, {enum, [none]}}]}.
|
|
|
|
{translation, "rabbitmq_mqtt.rabbit.tcp_listen_options",
|
|
fun(Conf) ->
|
|
case cuttlefish:conf_get("mqtt.tcp_listen_options") of
|
|
none -> [];
|
|
_ -> cuttlefish:invalid("Invalid mqtt.tcp_listen_options")
|
|
end
|
|
end}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.backlog", "rabbitmq_mqtt.tcp_listen_options.backlog", [
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.nodelay", "rabbitmq_mqtt.tcp_listen_options.nodelay", [
|
|
{datatype, {enum, [true, false]}}
|
|
]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.buffer", "rabbitmq_mqtt.tcp_listen_options.buffer",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.delay_send", "rabbitmq_mqtt.tcp_listen_options.delay_send",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.dontroute", "rabbitmq_mqtt.tcp_listen_options.dontroute",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.exit_on_close", "rabbitmq_mqtt.tcp_listen_options.exit_on_close",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.fd", "rabbitmq_mqtt.tcp_listen_options.fd",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.high_msgq_watermark", "rabbitmq_mqtt.tcp_listen_options.high_msgq_watermark",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.high_watermark", "rabbitmq_mqtt.tcp_listen_options.high_watermark",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.keepalive", "rabbitmq_mqtt.tcp_listen_options.keepalive",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.low_msgq_watermark", "rabbitmq_mqtt.tcp_listen_options.low_msgq_watermark",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.low_watermark", "rabbitmq_mqtt.tcp_listen_options.low_watermark",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.port", "rabbitmq_mqtt.tcp_listen_options.port",
|
|
[{datatype, integer}, {validators, ["port"]}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.priority", "rabbitmq_mqtt.tcp_listen_options.priority",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.recbuf", "rabbitmq_mqtt.tcp_listen_options.recbuf",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.send_timeout", "rabbitmq_mqtt.tcp_listen_options.send_timeout",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.send_timeout_close", "rabbitmq_mqtt.tcp_listen_options.send_timeout_close",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.sndbuf", "rabbitmq_mqtt.tcp_listen_options.sndbuf",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.tos", "rabbitmq_mqtt.tcp_listen_options.tos",
|
|
[{datatype, integer}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.linger.on", "rabbitmq_mqtt.tcp_listen_options.linger",
|
|
[{datatype, {enum, [true, false]}}]}.
|
|
|
|
{mapping, "mqtt.tcp_listen_options.linger.timeout", "rabbitmq_mqtt.tcp_listen_options.linger",
|
|
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
|
|
|
|
{translation, "rabbitmq_mqtt.tcp_listen_options.linger",
|
|
fun(Conf) ->
|
|
LingerOn = cuttlefish:conf_get("mqtt.tcp_listen_options.linger.on", Conf, false),
|
|
LingerTimeout = cuttlefish:conf_get("mqtt.tcp_listen_options.linger.timeout", Conf, 0),
|
|
{LingerOn, LingerTimeout}
|
|
end}.
|
|
|
|
%% Durable queue type to be used for QoS 1 subscriptions
|
|
%%
|
|
|
|
{mapping, "mqtt.durable_queue_type", "rabbitmq_mqtt.durable_queue_type",
|
|
[{datatype, {enum, [classic, quorum]}}]}.
|
|
|
|
%% If feature flag rabbit_mqtt_qos0_queue is enabled, a pseudo queue of type rabbit_mqtt_qos0_queue is
|
|
%% created for an MQTT "Clean Session" that subscribes with QoS 0. This queue type holds MQTT messages
|
|
%% directly in the MQTT connection process mailbox. This queue type has no flow control which
|
|
%% means MQTT messages might arrive faster in the Erlang mailbox than being sent to the subscribing MQTT
|
|
%% client. To protect against high memory usage, RabbitMQ intentionally drops QoS 0 messages if both
|
|
%% 1.) the number of messages in the MQTT connection process mailbox exceeds mailbox_soft_limit, and
|
|
%% 2.) the socket sending to the MQTT client is busy (i.e. not sending fast enough).
|
|
%% Note that there can be other messages in the mailbox (e.g. MQTT messages sent from the MQTT client to
|
|
%% RabbitMQ or confirmations from another queue type to the MQTT connection process) which are
|
|
%% obviously not dropped. However, these other messages also contribute to the `mailbox_soft_limit`.
|
|
%% Setting mailbox_soft_limit to 0 disables the overload protection mechanism to drop QoS 0 messages.
|
|
%% Setting mailbox_soft_limit to a very high value decreases the likelihood of intentionally dropping
|
|
%% QoS 0 messages while increasing the risk of causing a cluster wide memory alarm.
|
|
%%
|
|
%% {mailbox_soft_limit, 30},
|
|
{mapping, "mqtt.mailbox_soft_limit", "rabbitmq_mqtt.mailbox_soft_limit",
|
|
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
|
|
|
|
%% Maximum packet size
|
|
|
|
{mapping, "mqtt.max_packet_size_unauthenticated", "rabbitmq_mqtt.max_packet_size_unauthenticated", [
|
|
{datatype, integer},
|
|
{validators, ["non_zero_positive_integer"]}
|
|
]}.
|
|
|
|
{mapping, "mqtt.max_packet_size_authenticated", "rabbitmq_mqtt.max_packet_size_authenticated", [
|
|
{datatype, integer},
|
|
{validators, ["non_zero_positive_integer"]}
|
|
]}.
|
|
|
|
%% Topic Aliax Maximum
|
|
|
|
{mapping, "mqtt.topic_alias_maximum", "rabbitmq_mqtt.topic_alias_maximum", [
|
|
{datatype, integer},
|
|
{validators, ["non_negative_integer"]}
|
|
]}.
|
|
|
|
%%
|
|
%% Message interceptors
|
|
%%
|
|
|
|
{mapping, "mqtt.message_interceptors.$stage.$name.$key", "rabbitmq_mqtt.message_interceptors", [
|
|
{datatype, {enum, [true, false]}}
|
|
]}.
|
|
|
|
{translation, "rabbitmq_mqtt.message_interceptors",
|
|
fun(Conf) ->
|
|
case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of
|
|
[] ->
|
|
cuttlefish:unset();
|
|
L ->
|
|
lists:foldr(
|
|
fun({["mqtt", "message_interceptors", "incoming", "set_client_id_annotation", "enabled"], Enabled}, Acc) ->
|
|
case Enabled of
|
|
true ->
|
|
Mod = rabbit_mqtt_msg_interceptor_client_id,
|
|
Cfg = #{},
|
|
[{Mod, Cfg} | Acc];
|
|
false ->
|
|
Acc
|
|
end;
|
|
(Other, _Acc) ->
|
|
cuttlefish:invalid(io_lib:format("~p is invalid", [Other]))
|
|
end, [], L)
|
|
end
|
|
end
|
|
}.
|