Commit Graph

116 Commits

Author SHA1 Message Date
David Ansari 14f59f1380 Handle soft limit exceeded as queue action
Instead of performing credit_flow within quorum queue and stream queue
clients, return new {block | unblock, QueueName} actions.

The queue client process can then decide what to do.

For example, the channel continues to use credit_flow such that the
channel gets blocked sending any more credits to rabbit_reader.

However, the MQTT connection process does not use credit_flow. It
instead blocks its reader directly.
2023-01-24 17:29:07 +00:00
David Ansari 816fedf080 Enable flow control to target classic queue 2023-01-24 17:29:07 +00:00
David Ansari 199238d76e Use pg to track MQTT client IDs
Instead of tracking {Vhost, ClientId} to ConnectionPid mappings in our
custom process registry, i.e. custom local ETS table with a custom
gen_server process managing that ETS table, this commit uses the pg module
because pg is better tested.

To save memory with millions of MQTT client connections, we want to save
the mappings only locally on the node where the connection resides and
therfore not be replicated across all nodes.

According to Maxim Fedorov:
"The easy way to provide per-node unique pg scope is to start it like
pg:start_link(node()). At least that's what we've been doing to have
node-local scopes. It will still try to discover scopes on nodeup from
nodes joining the cluster, but since you cannot have nodes with the
same name in one cluster, using node() for local-only scopes worked
well for us."

So that's what we're doing in this commit.
2023-01-24 17:29:07 +00:00
David Ansari ab8957ba9c Use best-effort client ID tracking
"Each Client connecting to the Server has a unique ClientId"

"If the ClientId represents a Client already connected to
the Server then the Server MUST disconnect the existing
Client [MQTT-3.1.4-2]."

Instead of tracking client IDs via Raft, we use local ETS tables in this
commit.

Previous tracking of client IDs via Raft:
(+) consistency (does the right thing)
(-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients
(-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k)
    disconnect at the same time because monitor (DOWN) Ra commands get
    written resulting in Ra machine timeout.
(-) if we need consistency, we ideally want a single source of truth,
    e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process)

While above downsides could be fixed (e.g. avoiding DOWN commands by
instead doing periodic cleanups of client ID entries using session interval
in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config),
in this case we do not necessarily need the consistency guarantees Raft provides.

In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort
basis: If there are no network failures and no messages get lost,
existing clients with duplicate client IDs get disconnected.

In the presence of network failures / lost messages, two clients with
the same client ID can end up publishing or receiving from the same
queue. Arguably, that's acceptable and less worse than the scaling
issues we experience when we want stronger consistency.

Note that it is also the responsibility of the client to not connect
twice with the same client ID.

This commit also ensures that the client ID is a binary to save memory.

A new feature flag is introduced, which when enabled, deletes the Ra
cluster named 'mqtt_node'.

Independent of that feature flag, client IDs are tracked locally in ETS
tables.
If that feature flag is disabled, client IDs are additionally tracked in
Ra.

The feature flag is required such that clients can continue to connect
to all nodes except for the node being udpated in a rolling update.

This commit also fixes a bug where previously all MQTT connections were
cluster-wide closed when one RabbitMQ node was put into maintenance
mode.
2023-01-24 17:29:07 +00:00
David Ansari 5710a9474a Support MQTT Keepalive in WebMQTT
Share the same MQTT keepalive code between rabbit_mqtt_reader and
rabbit_web_mqtt_handler.

Add MQTT keepalive test in both plugins rabbitmq_mqtt and
rabbitmq_web_mqtt.
2023-01-24 17:29:07 +00:00
David Ansari 6f00ccb3ad Get all existing rabbitmq_web_mqtt tests green 2023-01-24 17:29:07 +00:00
David Ansari a02cbb73a1 Get all existing rabbitmq_mqtt tests green 2023-01-24 17:29:07 +00:00
David Ansari 23dac495ad Support QoS 1 for sending and receiving 2023-01-24 17:29:07 +00:00
David Ansari 99337b84d3 Emit stats
'connection' field is not needed anymore because it was
previously the internal AMQP connection PID
2023-01-24 17:29:07 +00:00
David Ansari 218ee196c4 Make proxy_protocol tests green 2023-01-24 17:29:07 +00:00
David Ansari 77da78f478 Get most auth_SUITE tests green
Some tests which require clean_start=false
or QoS1 are skipped for now.

Differentiate between v3 and v4:
v4 allows for an error code in SUBACK frame.
2023-01-24 17:29:07 +00:00
David Ansari f4d1f68212 Move authn / authz into rabbitmq_mqtt 2023-01-24 17:29:07 +00:00
David Ansari eac0622f37 Consume with QoS0 via queue_type interface 2023-01-24 17:29:07 +00:00
David Ansari 24b0a6bcb2 Publish with QoS0 via queue_type interface 2023-01-24 17:29:07 +00:00
David Ansari 8710565b2a Use 1 instead of 22 Erlang processes per MQTT connection
* Create MQTT connections without proxying via AMQP
* Do authn / authz in rabbitmq_mqtt instead of rabbit_direct:connect/5
* Remove rabbit_heartbeat process and per connection supervisors

Current status:

Creating 10k MQTT connections with clean session succeeds:
./emqtt_bench conn -V 4 -C true -c 10000 -R 500
2023-01-24 17:29:07 +00:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Michael Klishin c38a3d697d
Bump (c) year 2022-03-21 01:21:56 +04:00
Michael Klishin b11a79cccf
Bump (c) year in header files 2021-02-04 07:04:58 +03:00
kjnilsson 160e41687d MQTT machine versions 2020-12-22 10:21:21 +00:00
kjnilsson 067a42e066 Optimise MQTT state machine
It was particularly slow when processing down commands.
2020-12-21 15:58:32 +00:00
dcorbacho 119eb99e8d Switch to Mozilla Public License 2.0 (MPL 2.0) 2020-07-13 17:39:36 +01:00
Jean-Sébastien Pédron dcc5f7b553 Update copyright (year 2020) 2020-03-10 16:39:48 +01:00
kjnilsson eadf5f7094 Make interactions with Ra async
To avoid blocking when registering or unregistering a client id. This is
ok as informing the current connection holder of the client id is
already async. This should be more scalable and provide much better MQTT
connection setup latency.
2020-02-10 17:28:18 +00:00
Michael Klishin 2927f473ce (c) bump 2019-12-29 05:50:32 +03:00
Luke Bakken d0c0ec33ff Use new translation funs in library 2019-09-04 08:07:33 -07:00
Michael Klishin 1434eb991b Switch rabbit_mqtt_collector:list/0 to use a leader query
While at it, improve error handling around client ID
collector unavailability.
2019-06-04 13:40:26 +03:00
Michael Klishin 8ab9f8ff6d CLI commands: provide more information to the new help command 2019-03-26 20:29:56 +03:00
Spring Operator 852c61ea99 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# HTTP URLs that Could Not Be Fixed
These URLs were unable to be fixed. Please review them to see if they can be manually resolved.

* http://blog.listincomprehension.com/search/label/procket (200) with 1 occurrences could not be migrated:
   ([https](https://blog.listincomprehension.com/search/label/procket) result ClosedChannelException).
* http://dozzie.jarowit.net/trac/wiki/TOML (200) with 1 occurrences could not be migrated:
   ([https](https://dozzie.jarowit.net/trac/wiki/TOML) result SSLHandshakeException).
* http://dozzie.jarowit.net/trac/wiki/subproc (200) with 1 occurrences could not be migrated:
   ([https](https://dozzie.jarowit.net/trac/wiki/subproc) result SSLHandshakeException).
* http://e2project.org (200) with 1 occurrences could not be migrated:
   ([https](https://e2project.org) result AnnotatedConnectException).
* http://nitrogenproject.com/ (200) with 2 occurrences could not be migrated:
   ([https](https://nitrogenproject.com/) result ConnectTimeoutException).
* http://proper.softlab.ntua.gr (200) with 1 occurrences could not be migrated:
   ([https](https://proper.softlab.ntua.gr) result SSLHandshakeException).
* http://yaws.hyber.org (200) with 1 occurrences could not be migrated:
   ([https](https://yaws.hyber.org) result AnnotatedConnectException).
* http://choven.ca (503) with 1 occurrences could not be migrated:
   ([https](https://choven.ca) result ConnectTimeoutException).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* http://fixprotocol.org/ (301) with 1 occurrences migrated to:
  https://fixtrading.org ([https](https://fixprotocol.org/) result SSLHandshakeException).
* http://erldb.org (UnknownHostException) with 1 occurrences migrated to:
  https://erldb.org ([https](https://erldb.org) result UnknownHostException).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://cloudi.org/ with 27 occurrences migrated to:
  https://cloudi.org/ ([https](https://cloudi.org/) result 200).
* http://erlware.org/ with 1 occurrences migrated to:
  https://erlware.org/ ([https](https://erlware.org/) result 200).
* http://inaka.github.io/cowboy-trails/ with 1 occurrences migrated to:
  https://inaka.github.io/cowboy-trails/ ([https](https://inaka.github.io/cowboy-trails/) result 200).
* http://ninenines.eu with 6 occurrences migrated to:
  https://ninenines.eu ([https](https://ninenines.eu) result 200).
* http://www.actordb.com/ with 2 occurrences migrated to:
  https://www.actordb.com/ ([https](https://www.actordb.com/) result 200).
* http://www.cs.kent.ac.uk/projects/wrangler/Home.html with 1 occurrences migrated to:
  https://www.cs.kent.ac.uk/projects/wrangler/Home.html ([https](https://www.cs.kent.ac.uk/projects/wrangler/Home.html) result 200).
* http://www.rabbitmq.com/mpl.html with 1 occurrences migrated to:
  https://www.rabbitmq.com/mpl.html ([https](https://www.rabbitmq.com/mpl.html) result 200).
* http://www.rabbitmq.com/mqtt.html with 1 occurrences migrated to:
  https://www.rabbitmq.com/mqtt.html ([https](https://www.rabbitmq.com/mqtt.html) result 200).
* http://www.rebar3.org with 1 occurrences migrated to:
  https://www.rebar3.org ([https](https://www.rebar3.org) result 200).
* http://contributor-covenant.org with 1 occurrences migrated to:
  https://contributor-covenant.org ([https](https://contributor-covenant.org) result 301).
* http://contributor-covenant.org/version/1/3/0/ with 1 occurrences migrated to:
  https://contributor-covenant.org/version/1/3/0/ ([https](https://contributor-covenant.org/version/1/3/0/) result 301).
* http://inaka.github.com/apns4erl with 1 occurrences migrated to:
  https://inaka.github.com/apns4erl ([https](https://inaka.github.com/apns4erl) result 301).
* http://inaka.github.com/edis/ with 1 occurrences migrated to:
  https://inaka.github.com/edis/ ([https](https://inaka.github.com/edis/) result 301).
* http://lasp-lang.org/ with 1 occurrences migrated to:
  https://lasp-lang.org/ ([https](https://lasp-lang.org/) result 301).
* http://saleyn.github.com/erlexec with 1 occurrences migrated to:
  https://saleyn.github.com/erlexec ([https](https://saleyn.github.com/erlexec) result 301).
* http://www.mozilla.org/MPL/ with 27 occurrences migrated to:
  https://www.mozilla.org/MPL/ ([https](https://www.mozilla.org/MPL/) result 301).
* http://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html with 1 occurrences migrated to:
  https://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html ([https](https://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html) result 301).
* http://zhongwencool.github.io/observer_cli with 1 occurrences migrated to:
  https://zhongwencool.github.io/observer_cli ([https](https://zhongwencool.github.io/observer_cli) result 301).
2019-03-20 03:18:59 -05:00
Loïc Hoguin 7e09b85426 Allow giving the peer address directly for initial_state
Changes initial_state/4 to initial_state/5 to add the peer
address that needs to be provided by Web MQTT. This function
was only used locally and by Web MQTT.
2018-12-04 14:50:32 +01:00
Daniil Fedotov 82d1cb23e9 remove duplicate info item for mqtt connections 2017-06-09 13:23:37 +01:00
Daniil Fedotov 98c44e10f9 Backported part of: A CLI command to list mqtt connections 2017-01-26 15:43:11 +00:00
Jean-Sébastien Pédron 39251c5fca Use the new -spec format
The old format is removed in Erlang 19.0, leading to build errors.

Also, get rid of the `use_specs` macro and thus always define -spec() &
friends.

While here, unnify the style of -type and -spec.

References rabbitmq/rabbitmq-server#860.
[#118562897]
[#122335241]
2016-06-29 16:53:00 +02:00
Daniil Fedotov 0cae4fc7fe Set session_present flag 2016-04-22 12:28:08 +01:00
Michael Klishin f6d613871a defered => deferred 2016-03-05 10:56:53 +03:00
Daniil Fedotov 98cf125e39 Defer last receive after blocking 2016-03-04 20:33:01 +00:00
Loïc Hoguin 439d8521aa Emit stats for management UI 2016-03-03 14:59:53 +01:00
Loïc Hoguin 7b42c20b7e Allow passing the adapter info on processor init 2016-03-03 14:59:53 +01:00
Daniil Fedotov be1555b469 rename flag to received_connect_frame 2016-01-09 17:13:41 +03:00
Daniil Fedotov 17e4c707c6 Log MQTT connection. 2016-01-09 17:13:41 +03:00
Daniil Fedotov 954016c69f Log MQTT connection start and close with debug if no data received 2016-01-09 17:13:41 +03:00
Loïc Hoguin 1f1cb7ab9b Make send method configurable and export useful functions
Needed for the new Web-MQTT plugin.
2016-01-08 02:26:32 +03:00
Michael Klishin 9ba1da7434 Update (c) info 2016-01-01 12:59:18 +03:00
ash-lshift 8fb4904fdc whitespace changes 2015-07-14 10:56:02 +01:00
ash-lshift f293972f3c check authorization at mqtt topic level 2015-06-30 09:08:26 +01:00
Michael Klishin 8712428268 Merge branch 'stable' 2015-05-24 04:55:07 +03:00
Michael Klishin 692c6a7060 (c) year 2015-05-24 04:54:58 +03:00
Michael Klishin 0b6e7d5714 put/get/delete operations for retained message store
Yet to be done: dump and restore to disk on shutdown
and boot.
2015-04-21 14:26:46 +03:00
Michael Klishin 1c935cfb21 Process bits for retained message stores 2015-04-18 03:55:34 +03:00
Michael Klishin cee8eb2fd8 Client MQTT authentication using x509 certificates
Original patch by Eric Rauer.
2014-11-21 14:54:33 +00:00
Michael Klishin dfabf68e2e Explain 2014-05-21 10:40:12 +04:00
Emile Joubert 35dc34bfff Better 3.1.1 compatibility 2014-03-19 12:05:42 +00:00
Simon MacMullen 4e07b1d640 Update copyright for 2014 2014-03-17 17:25:23 +00:00
Emile Joubert c2da97cf65 Renames and codedocs 2014-01-20 10:33:27 +00:00
Emile Joubert 8d950b1f26 Detection of absent keepalives 2014-01-06 17:50:02 +00:00
Simon MacMullen bb95f34e47 s/VMware/GoPivotal/g 2013-07-01 10:49:14 +01:00
Emile Joubert f973d137c3 Update copyright 2013 2013-01-23 13:06:40 +00:00
Emile Joubert cfcdf7a59b More consistent naming of credit flow state 2012-08-21 16:45:14 +01:00
Emile Joubert 2a716fac2b Clearer separation between reader and processor 2012-08-20 17:30:13 +01:00
Emile Joubert 8d52799853 Removed adapter info from state 2012-08-16 17:56:50 +01:00
Emile Joubert d3f4a2dc24 Updates 2012-08-05 23:52:54 +01:00
Emile Joubert ec215b34bd Clean sessions, and anonymous connections 2012-07-16 14:57:31 +01:00
Emile Joubert c553880ac7 Avoid a separate erlang process for processor 2012-07-13 16:32:13 +01:00
Emile Joubert 9783db3f5d Rudimentary unsubscribe support 2012-07-04 17:47:07 +01:00
Emile Joubert 78b41c8fc0 Basic subscription and heartbeat response 2012-07-04 16:31:30 +01:00
Emile Joubert 1603079761 Basic authentication and publishing 2012-07-03 17:35:18 +01:00
Emile Joubert f195facf1c Sufficient parsing to accept connection 2012-07-03 09:55:31 +01:00