Commit Graph

528 Commits

Author SHA1 Message Date
David Ansari fb6c8da2fc Block Web MQTT connection if memory or disk alarm
Previously (until RabbitMQ v3.11.x), a memory or disk alarm did
not block the Web MQTT connection because this feature was only
implemented half way through: The part that registers the Web MQTT
connection with rabbit_alarm was missing.
2023-01-24 17:32:59 +00:00
David Ansari a8b69b43c1 Fix dialyzer issues and add function specs
Fix all dialyzer warnings in rabbitmq_mqtt and rabbitmq_web_mqtt.

Add more function specs.
2023-01-24 17:32:58 +00:00
David Ansari 1720aa0e75 Allow CLI listing rabbit_mqtt_qos0_queue queues 2023-01-24 17:30:10 +00:00
David Ansari 56e97a9142 Fix MQTT in management plugin
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
   after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.

For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
2023-01-24 17:30:10 +00:00
Chunyi Lyu cb68e4866e Resolve some dialyzer issues
- mqtt processor publish_to_queue/2 is called in
process_request(?PUBLISH,_, _) and maybe_send_will/3. In
both places #mqtt_msg{} is initialized with value so it will
never be 'undefined'.
- all possible value are already matched in mqtt_processor
human_readable_vhost_lookup_strategy/1; deleted the unneeded
catch all function clause.
- Removed a unnecessary case matching in mqtt_reader init/1.
Return values for 'rabbit_net:connection_string' are {ok, _} or
{error, _}. {'network_error', Reason} will never match.
- Fix function spec for mqtt_util gen_client_id/1. Return type of
rabbit_misc:base64url is string(), not binary().
2023-01-24 17:30:10 +00:00
Chunyi Lyu 4fa8e830ad Allow undefined in some mqtt record type fields
- to get rid of dialyzer warnings like "Record construction...
violates the declared type of field XYZ"
2023-01-24 17:30:10 +00:00
Chunyi Lyu 340e930d28 Web mqtt returns 1002 with mqtt parsing error
- it is a mqtt protocol error
2023-01-24 17:30:10 +00:00
Chunyi Lyu 4ca12b767a Fix func spec for mqtt process_request
- it also returns {stop, disconnect, state()} when receiving
a disconnect packet
- remove match for a {timeout, _} return when calling register_client.
register_client only returns {ok, _} and {error, _} according to its
function spec
2023-01-24 17:30:10 +00:00
Chunyi Lyu fb913009c4 Add func specs for mqtt process_packet and process_request
- removed return matching for {error, Error} when calling process_packet
because that's not the return type
2023-01-24 17:30:10 +00:00
David Ansari 97fefff0fe Add overflow drop-head to rabbit_mqtt_qos_queue type
Traditionally, queue types implement flow control by keeping state in
both sending and receiving Erlang processes (for example credit based flow
control keeps the number of credits within the process dictionary).

The rabbit_mqtt_qos0_queue cannot keep such state in sending or receiving
Erlang process because such state would consume a large amount of memory
in case of large fan-ins or large fan-outs.
The whole idea of the rabbit_mqtt_qos_queue type is to not keep any
state in the rabbit_queue_type client. This makes this new queue
type scale so well.

Therefore the new queue type cannot (easily) implement flow control
throttling individual senders.

In this commit, we take a different approach:
Instead of implementing flow control throttling individual senders,
the receiving MQTT connection process drops QoS 0 messages from the
rabbit_mqtt_qos_queue if it is overflowed with messages AND its MQTT
client is not capable of receiving messages fast enough.

This is a simple and sufficient solution because it's better to drop QoS
0 (at most once) messages instead of causing cluster-wide memory alarms.

The user can opt out of dropping messages by setting the new env setting
mailbox_soft_limit to 0.

Additionally, we reduce the send_timeout from 30 seconds default in
Ranch to 15 seconds default in MQTT. This will detect hanging MQTT
clients faster causing the MQTT connection to be closed.
2023-01-24 17:30:10 +00:00
David Ansari 61f6ca7b66 Support iodata() when sending message to MQTT client
When the MQTT connection receives an AMQP 0.9.1 message, it will contain
a list of payload fragments.

This commit avoids the expensive operation of turning that list into a binary.

All I/O methods accept iodata():
* erlang:port_command/2
* ssl:send/2
* In Web MQTT, cowboy websockets accept iodata():
0d04cfffa3/src/cow_ws.erl (L58)
2023-01-24 17:30:10 +00:00
David Ansari 15636fdb90 Rename frame to packet
The MQTT protocol specs define the term "MQTT Control Packet".
The MQTT specs never talk about "frame".

Let's reflect this naming in the source code since things get confusing
otherwise:
Packets belong to MQTT.
Frames belong to AMQP 0.9.1 or web sockets.
2023-01-24 17:30:10 +00:00
David Ansari 86de0a1557 Reduce memory usage of MQTT connection process
by removing the two fields referencing a function:
mqtt2amqp_fun and amqp2mqtt_fun

Each field required 1 words + 11 words for the function reference.

Therefore, for 1 million MQTT connections this commit saves:
    (1+11) * 2 * 1,000,000 words
    = 192 MB of memory

In addition, the code is now simpler to understand.

There is no change in behaviour except for the sparkplug environment
variable being read upon application start.

We put the compiled regexes into persistent term because they are the
same for all MQTT connections.
2023-01-24 17:30:10 +00:00
David Ansari 7782142020 Reduce memory usage of reader processes
Reduce memory usage of (Web) MQTT connection process and STOMP reader
process by storing the connection name as binary instead of string.

Previously:
82 = erts_debug:size("192.168.2.104:52497 -> 192.168.2.175:1883").

The binary <<"192.168.2.104:52497 -> 192.168.2.175:1883">>
requires 8 words.

So, for 1 million MQTT connections, this commit should save
    (82 - 8) words * 1,000,000
    = 592 MB
of memory.
2023-01-24 17:30:10 +00:00
Chunyi Lyu 46e8a65d96 Check if state.stats_timer is undefined to avoid crashing
- if #state.stats_timer is undefined, rabbit_event:if_enabled crashes
- remove compression related TODO from web_mqtt. It's a intentional
default behavior set in: https://github.com/rabbitmq/rabbitmq-web-mqtt/pull/35
2023-01-24 17:30:10 +00:00
David Ansari f842ffd250 Add feature flag rabbit_mqtt_qos0_queue
Routing to a queue of type rabbit_mqtt_qos0_queue hosted on
a remote node requires knowledge of that queue type on the
local node.
2023-01-24 17:30:10 +00:00
David Ansari 1493cbe13d Rename message_id to packet_id
MQTT spec only talks about "Packet Identifier",
but never about "Message Identitier".

RabbitMQ has message identifiers (for example the classic queue store
uses message identifiers to uniquely identify internal messages).

So, let's not confuse these two terms and be specific.
2023-01-24 17:30:10 +00:00
David Ansari 7bc8208a1b Remove local record definitions from header files
Record #state{} is purely local to rabbit_mqtt_reader.
Record #proc_state{} is purely local to rabbit_mqtt_processor.

Therefore, move these record definitions to the defining module.
This avoids unnecessarily exposing internal information.

Now, that #proc_state{} is defined in rabbit_mqtt_processor,
rename #proc_state to #state{}.
2023-01-24 17:30:10 +00:00
David Ansari 76f4598d92 Send last will if client did not DISCONNECT
"The Will Message MUST be published when the Network Connection is subsequently
closed unless the Will Message has been deleted by the Server on receipt of a
DISCONNECT Packet [MQTT-3.1.2-8].
Situations in which the Will Message is published include, but are not limited to:
•	An I/O error or network failure detected by the Server.
•	The Client fails to communicate within the Keep Alive time.
•	The Client closes the Network Connection without first sending a DISCONNECT Packet.
•	The Server closes the Network Connection because of a protocol error.
"

Prior to this commit, the will message was not sent in all scenarios
where it should have been sent.

In this commit, the will message is always sent unless the client sent a
DISCONNECT packet to the server.

We achieve this by sending the will message in the terminate callback.

Note that the Reason passed into the terminate callback of
rabbit_web_mqtt_handler is the atom 'stop' (that is, we cannot pass a custom
reason here).

Therefore, in order to know within the terminate callback whether the client
sent a DISCONNECT packet, we have to modify the process state.
Rather than including a new field into the process state record which requires
1 additional word per MQTT connection (i.e. expensive with millions of
MQTT connection processes - we want to keep the process state small),
we intead modify the state just before stopping the process to
{SendWill, State}.
2023-01-24 17:30:10 +00:00
David Ansari 65bc0c395b Fix global counters
Prior to this commit messages_delivered for queue_type_qos0 is
wrongfully incremented if clean session is false

Also, delete duplicate code.
2023-01-24 17:30:10 +00:00
Chunyi Lyu 075bc06623 Handle messages_delivered_consume_*_ack counters at delivery
- increment messages_delivered_consume_auto_ack if subscribe queue
is type mqtt_qos0 queue or if publising QoS is 0
- increment messages_delivered_consume_manual_ack if both publising
and subcribe are QoS 1
- increment messages_acknowledged at queue_type:settle()
2023-01-24 17:30:10 +00:00
David Ansari 16fa12244e Avoid exceptions in mixed version cluster
1. Avoid following exceptions in mixed version clusters when new MQTT
   connections are created:
```
{{exception,{undef,[{rabbit_mqtt_util,remove_duplicate_clientid_connections,
                                      [{<<"/">>,
                                        <<"publish_to_all_queue_types">>},
                                       <0.1447.0>],
                                      []}]}},
 [{erpc,execute_cast,3,[{file,"erpc.erl"},{line,621}]}]}
```
If feature flag delete_ra_cluster_mqtt_node is disabled, let's still
populate pg with MQTT client IDs such that we don't have to migrate them
from the Ra cluster to pg when we enable the feature flag.
However, for actually closing duplicate MQTT client ID connections, if
that feature flag is disabled, let's rely on the Ra cluster to take care
of it.

2. Write a test ensuring the QoS responses are in the right order when a
   single SUBSCRIBE packet contains multiple subscriptions.
2023-01-24 17:30:10 +00:00
David Ansari 573934259a Consume from queue once
Each MQTT connection consumes from its queue at most once
(unless when failing over).
2023-01-24 17:30:10 +00:00
Chunyi Lyu c3779d9996 Implement message consuming counters in mqtt 2023-01-24 17:30:10 +00:00
David Ansari 6e527fb940 Replace existing subscription
"If a Server receives a SUBSCRIBE Packet containing a Topic Filter
that is identical to an existing Subscription’s Topic Filter then
it MUST completely replace that existing Subscription with a new
Subscription. The Topic Filter in the new Subscription will be
identical to that in the previous Subscription, although its
maximum QoS value could be different. Any existing retained
messages matching the Topic Filter MUST be re-sent, but the flow
of publications MUST NOT be interrupted [MQTT-3.8.4-3]."
2023-01-24 17:30:10 +00:00
David Ansari 0ba0a6e8f8 Several small improvements
1. The mqtt_qos0 queue type uses now QName in the delivery.
This makes the code simpler although it might be a bit less efficient
because the tuple containing binaries is sent around and a hash is
computed within rabbit_queue_type:module/2

2. Do not construct a new binary on every PUBACK. This is expensive with
   many PUBACKs per second. Instead, we store the QoS1 queue name in the
   process state (but only if the connection also consumes from that
   queue).

3. To make the code more readable, and less specialised, always handle
   queue actions when we call rabbit_queue_type:settle/5.
   This method only returns an action (delivery) when settling to the stream
   queue, which the MQTT plugin never does because an MQTT connection
   does not consume from a stream. It's not expensive at all to handle
   an empty list of queue actions.
2023-01-24 17:30:10 +00:00
David Ansari b2c87c59a0 Minor reformatting and renaming 2023-01-24 17:30:10 +00:00
David Ansari e06d3e7069 Unblock queue when it gets deleted
When a quorum queue or stream gets deleted while the MQTT connection
process (or channel) is blocked by that deleted queue due to soft limit
being exceeded, unblock that queue.

In this commit, an unblock action is also returned with the eol.
2023-01-24 17:30:09 +00:00
Chunyi Lyu 80f8e0754f Implement consumer global counter for clean sess false
- remove has_subs from proc state; query datebase to check
if a connection has subscription or not
2023-01-24 17:29:08 +00:00
Chunyi Lyu 0b43f002f5 Remove subscriptions map from proc state in mqtt
- subscriptions information can be retrieved directly from mnesia
- when unsubscribe, we check if there is binding between topic name
and queue (check for both qos0 queue name and qos1 queue name) to
unbind
- added a boolean value has_subs in proc state which will indicate
if connection has any active subscriptions. Used for setting consumer
global counter
2023-01-24 17:29:07 +00:00
David Ansari aad7e1cdf6 Add test for consuming MQTT classic queue going down 2023-01-24 17:29:07 +00:00
David Ansari bda52dbf64 Support consuming classic mirrored queue failover
Some users use classic mirrored queues for MQTT queues by
applying a policy.

Given that classic mirrored queues are deprecated, but still supported
in RabbitMQ 3.x, native MQTT must support classic mirrored queues.
2023-01-24 17:29:07 +00:00
David Ansari b97006c4b9 Output username in connection closed event 2023-01-24 17:29:07 +00:00
David Ansari 7fc2234117 Test ETS and NOOP retained message stores 2023-01-24 17:29:07 +00:00
David Ansari 6533532039 Simplify counters
by storing mqtt310 and mqtt311 atoms directly in the processor state.
2023-01-24 17:29:07 +00:00
David Ansari ab5007a53b Handle queue deletion
Handle deletion of queues correctly that an MQTT connection is
publishing to with QoS 1.
2023-01-24 17:29:07 +00:00
Chunyi Lyu de984d026b Subs from 1 connection counts as 1 consumer in global counter
- rename proc state isPublisher to has_published
- create macro for v3 and v4 mqtt protocol name for global
counters
- sub groups in integration suite
2023-01-24 17:29:07 +00:00
David Ansari 38e5e20bb8 Add tests 2023-01-24 17:29:07 +00:00
Chunyi Lyu 17c5dffe7a Set common global counters for mqtt
- protocols are set to mqtt301 or mqtt311 depends
on protocal version set by client connection
- added boolean isPublisher in proc state to track
if connection was ever used to publish. This is used to
set publisher_create and publisher_delete global counters.
- tests added in integration_SUITE
2023-01-24 17:29:07 +00:00
David Ansari 9fd5704e30 Fix mixed version Web MQTT system tests
In mixed verion tests all non-required feature flags are disabled.
Therefore, MQTT client ID tracking happens in Ra.
The Ra client sends commands asynchronously when registering and
deregistering the client ID.

Also, add more tests.
2023-01-24 17:29:07 +00:00
David Ansari 319af3872e Handle duplicate packet IDs
"If a Client re-sends a particular Control Packet, then it MUST use the
same Packet Identifier in subsequent re-sends of that packet."

A client can re-send a PUBLISH packet with the same packet ID.
If the MQTT connection process already received the original packet and
sent it to destination queues, it will ignore this re-send.

The packet ID will be acked to the publisher once a confirmation from
all target queues is received.

There should be no risk of "stuck" messages within the MQTT connection
process because quorum and stream queue clients re-send the message and
classic queues will send a monitor DOWN message in case they are down.
2023-01-24 17:29:07 +00:00
Chunyi Lyu 645531bc95 Register mqtt connections in case event refresh 2023-01-24 17:29:07 +00:00
David Ansari 14f59f1380 Handle soft limit exceeded as queue action
Instead of performing credit_flow within quorum queue and stream queue
clients, return new {block | unblock, QueueName} actions.

The queue client process can then decide what to do.

For example, the channel continues to use credit_flow such that the
channel gets blocked sending any more credits to rabbit_reader.

However, the MQTT connection process does not use credit_flow. It
instead blocks its reader directly.
2023-01-24 17:29:07 +00:00
David Ansari 816fedf080 Enable flow control to target classic queue 2023-01-24 17:29:07 +00:00
David Ansari 33bf2150a5 Add test for publishing via MQTT to different queue types 2023-01-24 17:29:07 +00:00
Chunyi Lyu 8126925617 Implement format_status for mqtt reader
- truncate queue type state from mqtt proc_state, which
could be huge with many destination queues. Instead, format_status
now returns number of destination queues.
2023-01-24 17:29:07 +00:00
David Ansari 627ea8588a Add rabbit_event tests for MQTT
Add tests that MQTT plugin sends correct events to rabbit_event.

Add event connection_closed.
2023-01-24 17:29:07 +00:00
Chunyi Lyu b74dea4435 Send rabbit event declaring mqtt_qos0 queue 2023-01-24 17:29:07 +00:00
David Ansari 07ad410d81 Skip queue when MQTT QoS 0
This commit allows for huge fanouts if the MQTT subscriber connects with
clean_session = true and QoS 0. Messages are not sent to a conventional queue.
Instead, messages are forwarded directly from MQTT publisher connection
process or channel to MQTT subscriber connection process.
So, the queue process is skipped.

The MQTT subscriber connection process acts as the queue process.
Its mailbox is a superset of the queue. This new queue type is called
rabbit_mqtt_qos0_queue.
Given that the only current use case is MQTT, this queue type is
currently defined in the MQTT plugin.
The rabbit app is not aware that this new queue type exists.

The new queue gets persisted as any other queue such that routing via
the topic exchange contineues to work as usual. This allows routing
across different protocols without any additional changes, e.g. huge
fanout from AMQP client (or management UI) to all MQTT devices.

The main benefit is that memory usage of the publishing process is kept at
0 MB once garbage collection kicked in (when hibernating the gen_server).
This is achieved by having this queue type's client not maintain any
state. Previously, without this new queue type, the publisher process
maintained state of 500MB to all the 1 million destination queues even
long after stopping sending messages to these queues.

Another big benefit is that no queue process need to be created.
Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang
processes got created: 1 million MQTT connection processes, 1 million classic
queue processes, and 1 million classic queue supervisor processes.
After this commit, only the 1 million MQTT connection processes get
created. Hence, a few GBs of process memory will be saved.

Yet another big benefit is that because the new queue type's client
auto-settles the delivery when sending, the publishing process only
awaits confirmation from queues who potentially have at-least-once
consumers. So, the publishing process is not blocked on sending the
confirm back to the publisher if 1 message is let's say routed to 1
million MQTT QoS 0 subscribers while 1 copy is routed to an important
quorum queue or stream and while a single out of the million MQTT
connection processes is down.

Other benefits include:
* Lower publisher confirm latency
* Reduced inter-node network traffic

In a certain sense, this commit allows RabbitMQ to act as a high scale
and high throughput MQTT router (that obviously can lose messages at any
time given the QoS is 0).
For example, it allows use cases as using RabbitMQ to send messages cheaply
and quickly to 1 million devices that happen to be online at the given
time: e.g. send a notification to any online mobile device.
2023-01-24 17:29:07 +00:00
David Ansari af68fb4484 Decrease memory usage of queue_type state
Prior to this commit, 1 MQTT publisher publishing to 1 Million target
classic queues requires around 680 MB of process memory.

After this commit, it requires around 290 MB of process memory.

This commit requires feature flag classic_queue_type_delivery_support
and introduces a new one called no_queue_name_in_classic_queue_client.

Instead of storing the binary queue name 4 times, this commit now stores
it only 1 time.

The monitor_registry is removed since only classic queue clients monitor
their classic queue server processes.

The classic queue client does not store the queue name anymore. Instead
the queue name is included in messages handled by the classic queue
client.

Storing the queue name in the record ctx was unnecessary.

More potential future memory optimisations:
* When routing to destination queues, looking up the queue record,
  delivering to queue: Use streaming / batching instead of fetching all
  at once
* Only fetch ETS columns that are necessary instead of whole queue
  records
* Do not hold the same vhost binary in memory many times. Instead,
  maintain a mapping.
* Remove unnecessary tuple fields.
2023-01-24 17:29:07 +00:00
David Ansari 4b1c2c870b Emit cluster-wide MQTT connection infos
When listing MQTT connections with the CLI, whether feature flag
delete_ra_cluster_mqtt_node is enabled or disabled, in both cases
return cluster wide MQTT connections.

If connection tracking is done in Ra, the CLI target node returns all
connection infos because Ra is aware of all MQTT connections.

If connection tracking is done in (local-only) pg, all nodes return
their local MQTT connection infos.
2023-01-24 17:29:07 +00:00
David Ansari 3e28a52066 Convert rabbit_mqtt_reader from gen_server2 to gen_server
There is no need to use gen_server2.

gen_server2 requires lots of memory with millions of MQTT connections
because it creates 1 entry per connection into ETS table
'gen_server2_metrics'.

Instead of using handle_pre_hibernate, erasing the permission cache
is now done by using a timeout.

We do not need a hibernate backoff feature, simply hibernate after 1
second.

It's okay for MQTT connection processes to hibernate because they
usually send data rather rarely (compared to AMQP connections).
2023-01-24 17:29:07 +00:00
David Ansari 199238d76e Use pg to track MQTT client IDs
Instead of tracking {Vhost, ClientId} to ConnectionPid mappings in our
custom process registry, i.e. custom local ETS table with a custom
gen_server process managing that ETS table, this commit uses the pg module
because pg is better tested.

To save memory with millions of MQTT client connections, we want to save
the mappings only locally on the node where the connection resides and
therfore not be replicated across all nodes.

According to Maxim Fedorov:
"The easy way to provide per-node unique pg scope is to start it like
pg:start_link(node()). At least that's what we've been doing to have
node-local scopes. It will still try to discover scopes on nodeup from
nodes joining the cluster, but since you cannot have nodes with the
same name in one cluster, using node() for local-only scopes worked
well for us."

So that's what we're doing in this commit.
2023-01-24 17:29:07 +00:00
David Ansari ab8957ba9c Use best-effort client ID tracking
"Each Client connecting to the Server has a unique ClientId"

"If the ClientId represents a Client already connected to
the Server then the Server MUST disconnect the existing
Client [MQTT-3.1.4-2]."

Instead of tracking client IDs via Raft, we use local ETS tables in this
commit.

Previous tracking of client IDs via Raft:
(+) consistency (does the right thing)
(-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients
(-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k)
    disconnect at the same time because monitor (DOWN) Ra commands get
    written resulting in Ra machine timeout.
(-) if we need consistency, we ideally want a single source of truth,
    e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process)

While above downsides could be fixed (e.g. avoiding DOWN commands by
instead doing periodic cleanups of client ID entries using session interval
in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config),
in this case we do not necessarily need the consistency guarantees Raft provides.

In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort
basis: If there are no network failures and no messages get lost,
existing clients with duplicate client IDs get disconnected.

In the presence of network failures / lost messages, two clients with
the same client ID can end up publishing or receiving from the same
queue. Arguably, that's acceptable and less worse than the scaling
issues we experience when we want stronger consistency.

Note that it is also the responsibility of the client to not connect
twice with the same client ID.

This commit also ensures that the client ID is a binary to save memory.

A new feature flag is introduced, which when enabled, deletes the Ra
cluster named 'mqtt_node'.

Independent of that feature flag, client IDs are tracked locally in ETS
tables.
If that feature flag is disabled, client IDs are additionally tracked in
Ra.

The feature flag is required such that clients can continue to connect
to all nodes except for the node being udpated in a rolling update.

This commit also fixes a bug where previously all MQTT connections were
cluster-wide closed when one RabbitMQ node was put into maintenance
mode.
2023-01-24 17:29:07 +00:00
David Ansari 43bd548dcc Handle deprecated classic queue delivery
when feature flag classic_queue_type_delivery_support is disabled.
2023-01-24 17:29:07 +00:00
David Ansari 5710a9474a Support MQTT Keepalive in WebMQTT
Share the same MQTT keepalive code between rabbit_mqtt_reader and
rabbit_web_mqtt_handler.

Add MQTT keepalive test in both plugins rabbitmq_mqtt and
rabbitmq_web_mqtt.
2023-01-24 17:29:07 +00:00
David Ansari 6f00ccb3ad Get all existing rabbitmq_web_mqtt tests green 2023-01-24 17:29:07 +00:00
David Ansari a02cbb73a1 Get all existing rabbitmq_mqtt tests green 2023-01-24 17:29:07 +00:00
David Ansari 23dac495ad Support QoS 1 for sending and receiving 2023-01-24 17:29:07 +00:00
David Ansari cdd253ee87 Receive many messages from classic queue
Before this commit, a consumer from a classic queue was receiving max
200 messages:
bb5d6263c9/deps/rabbit/src/rabbit_queue_consumers.erl (L24)

MQTT consumer process must give credit to classic queue process
due to internal flow control.
2023-01-24 17:29:07 +00:00
David Ansari 99337b84d3 Emit stats
'connection' field is not needed anymore because it was
previously the internal AMQP connection PID
2023-01-24 17:29:07 +00:00
David Ansari 218ee196c4 Make proxy_protocol tests green 2023-01-24 17:29:07 +00:00
David Ansari 77da78f478 Get most auth_SUITE tests green
Some tests which require clean_start=false
or QoS1 are skipped for now.

Differentiate between v3 and v4:
v4 allows for an error code in SUBACK frame.
2023-01-24 17:29:07 +00:00
David Ansari 73ad3bafe7 Revert maybe expression
rabbit_misc:pipeline looks better and doesn't require experimental
feature
2023-01-24 17:29:07 +00:00
David Ansari f4d1f68212 Move authn / authz into rabbitmq_mqtt 2023-01-24 17:29:07 +00:00
David Ansari eac0622f37 Consume with QoS0 via queue_type interface 2023-01-24 17:29:07 +00:00
David Ansari 24b0a6bcb2 Publish with QoS0 via queue_type interface 2023-01-24 17:29:07 +00:00
David Ansari 8710565b2a Use 1 instead of 22 Erlang processes per MQTT connection
* Create MQTT connections without proxying via AMQP
* Do authn / authz in rabbitmq_mqtt instead of rabbit_direct:connect/5
* Remove rabbit_heartbeat process and per connection supervisors

Current status:

Creating 10k MQTT connections with clean session succeeds:
./emqtt_bench conn -V 4 -C true -c 10000 -R 500
2023-01-24 17:29:07 +00:00
Alexey Lebedeff b6cd708a08 Fix all dialyzer warnings in rabbitmq_web_mqtt 2023-01-19 17:23:23 +01:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Jean-Sébastien Pédron 15d9cdea61
Call `rabbit:data_dir/0` instead of `rabbit_mnesia:dir/0`
This is a follow-up commit to the parent commit. To quote part of the
parent commit's message:

> Historically, this was the Mnesia directory. But semantically, this
> should be the reverse: RabbitMQ owns the data directory and Mnesia is
> configured to put its files there too.

Now all subsystems call `rabbit:data_dir/0`. They are not tied to Mnesia
anymore.
2022-11-30 14:41:32 +01:00
David Ansari 694501b923 Close local MQTT connections when draining node
When a node gets drained (i.e. goes into maintenance mode), only local
connections should be terminated.

However, prior to this commit, all MQTT connections got terminated
cluster-wide when a single node was drained.
2022-10-13 11:14:01 +00:00
Luke Bakken 7fe159edef
Yolo-replace format strings
Replaces `~s` and `~p` with their unicode-friendly counterparts.

```
git ls-files *.erl | xargs sed -i.ORIG -e s/~s>/~ts/g -e s/~p>/~tp/g
```
2022-10-10 10:32:03 +04:00
Michal Kuratczyk 2855278034
Migrate from supervisor2 to supervisor 2022-09-27 13:53:06 +02:00
Péter Gömöri c4b7cd98bf Add login_timeout to mqtt and stomp reader
Similarly to handshake_timeout in amqp reader.
2022-09-12 17:48:48 +02:00
David Ansari b953b0f10e Stop sending stats to rabbit_event
Stop sending connection_stats from protocol readers to rabbit_event.
Stop sending queue_stats from queues to rabbit_event.
Sending these stats every 5 seconds to the event manager process is
superfluous because noone handles these events.

They seem to be a relict from before rabbit_core_metrics ETS tables got
introduced in 2016.

Delete test head_message_timestamp_statistics because it tests that
head_message_timestamp is set correctly in queue_stats events
although queue_stats events are used nowhere.
The functionality of head_message_timestamp itself is still tested in
deps/rabbit/test/priority_queue_SUITE.erl and
deps/rabbit/test/temp/head_message_timestamp_tests.py
2022-09-09 10:52:38 +00:00
David Ansari 49ed70900e Fix failing proxy_protocol test
Prior to this commit, test
```
make -C deps/rabbitmq_web_mqtt ct-proxy_protocol t=http_tests:proxy_protocol
```

was failing with reason
```
exception error: no function clause matching
                 rabbit_net:sockname({rabbit_proxy_socket,#Port<0.96>,
```
2022-08-25 20:00:49 +02:00
David Ansari 28db862d56 Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.

While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.

When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.

Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
    [{rabbit_mqtt_processor,process_login,4,
                            [{file,"rabbit_mqtt_processor.erl"},{line,585}]},
     {rabbit_mqtt_processor,process_request,3,
                            [{file,"rabbit_mqtt_processor.erl"},{line,143}]},
     {rabbit_mqtt_processor,process_frame,2,
                            [{file,"rabbit_mqtt_processor.erl"},{line,69}]},
     {rabbit_mqtt_reader,process_received_bytes,2,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```

After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```

In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.

To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.

Ideally we could use some better mechanism for detecting whether the
client disconnected.

The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."

Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.

Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 18:42:37 +02:00
Philip Kuryloski a250a533a4 Remove elixir related -ignore_xref calls
As they are no longer necessary with xref2 and the erlang.mk updates
2022-06-09 23:18:40 +02:00
David Ansari 20677395cd Check queue and exchange existence with ets:member/2
This reduces memory usage and improves code readability.
2022-05-10 10:16:40 +00:00
Péter Gömöri 52cb5796a3 Remove leftover compiler option for get_stacktrace 2022-05-03 18:40:49 +02:00
Michael Klishin 0ae3f19698
mqtt.queue_type => mqtt.durable_queue_type 2022-03-31 19:48:00 +04:00
Gabriele Santomaggio 2c49748c70
Add quorum queues support for MQTT
Enable the quorum queue for MQTT only if CleanSession is False.
QQs don't support auto-delete flag so in case Clean session is True
the queue will be a classic queue.

Add another group test non_parallel_tests_quorum.
For Mixed test the quorum_queue feature flag must be enabled.

Add log message
2022-03-30 08:49:17 -07:00
Michael Klishin c38a3d697d
Bump (c) year 2022-03-21 01:21:56 +04:00
Alexey Lebedeff e0723d5e66 Prevent crash logs when mqtt user is missing permissions
Fixes #2941

This adds proper exception handlers in the right places. And tests
ensure that it indeed provides nice neat logs without large
stacktraces for every amqp operation.

Unnecessary checking for subscribe permissions on topic was dropped,
as `queue.bind` does exactly the same check. Topic permissions tests
were also added, and they indeed confirm that there was no change in
behaviour.

Ideally the same explicit topic permission check should be dropped for
publishing, but it's more complicated - so for now there only a
detailed comment in the source code explaining it.

A few other things were also optimized away:
- Using amqp client to test for queue existence
- Creating queues/starting consumptions too eagerly, even if not yet
  requested by client
2021-11-12 18:03:05 +01:00
Michael Klishin 0f6a9dac27
Introduce rabbit_nodes:all/0 2021-09-20 22:24:25 +03:00
Michal Kuratczyk 41922b96cf
Change a log line from INFO to DEBUG
This line is printed on every new MQTT connection which leads to very chatty logs when there is a lot of connections. Given that the way MQTT uses vhosts is generally static (once set up, always the same for all connections), I think this can be a debug message instead.
2021-07-12 16:50:25 +02:00
Philip Kuryloski 98e71c45d8 Perform xref checks on many tier-1 plugins 2021-05-21 12:03:22 +02:00
Michael Klishin a755dca8e9
MQTT: use consistent Ra operation timeout values
of more than the default 5s which is really low.
2021-05-18 14:35:48 +03:00
kjnilsson 62677cbacf
MQTT ra systems changes 2021-03-22 21:44:19 +03:00
dcorbacho a41ece3950 Make ranch parameter `num_conns_sups` configurable
Defaults to 1
rabbit - num_conns_sup
rabbitmq_mqtt - num_conns_sup
rabbitmq_stomp - num_conns_sup
2021-03-18 21:38:13 +01:00
Michael Klishin 91964db0e6
MQTT: correct a typo in mqtt_machine
Introduced in #2861
2021-03-12 05:33:03 +03:00
Michael Klishin 97ff62d3b2
Drop trailing newlines from logged messages where possible
Lager strips trailing newline characters but OTP logger with the default
formatter adds a newline at the end. To avoid unintentional multi-line log
messages we have to revisit most messages logged.

Some log entries are intentionally multiline, others
are printed to stdout directly: newlines are required there
for sensible formatting.
2021-03-11 15:17:37 +01:00
dcorbacho 61f7b2a723 Update to ranch 2.0 2021-03-08 23:11:05 +01:00
Michael Klishin 52479099ec
Bump (c) year 2021-01-22 09:00:14 +03:00
kjnilsson 04a55e0ee6 bug fixes 2020-12-22 15:16:17 +00:00
kjnilsson 160e41687d MQTT machine versions 2020-12-22 10:21:21 +00:00
kjnilsson 067a42e066 Optimise MQTT state machine
It was particularly slow when processing down commands.
2020-12-21 15:58:32 +00:00
Michael Klishin 79a02256f1 Merge pull request #238 from rabbitmq/auth-attempt-metrics
Add auth attempt metrics
2020-10-14 23:56:29 +03:00
dcorbacho d80e8e1bec Add protocol to auth attempt metrics 2020-09-23 11:16:13 +01:00