Instead of performing credit_flow within quorum queue and stream queue
clients, return new {block | unblock, QueueName} actions.
The queue client process can then decide what to do.
For example, the channel continues to use credit_flow such that the
channel gets blocked sending any more credits to rabbit_reader.
However, the MQTT connection process does not use credit_flow. It
instead blocks its reader directly.
Instead of tracking {Vhost, ClientId} to ConnectionPid mappings in our
custom process registry, i.e. custom local ETS table with a custom
gen_server process managing that ETS table, this commit uses the pg module
because pg is better tested.
To save memory with millions of MQTT client connections, we want to save
the mappings only locally on the node where the connection resides and
therfore not be replicated across all nodes.
According to Maxim Fedorov:
"The easy way to provide per-node unique pg scope is to start it like
pg:start_link(node()). At least that's what we've been doing to have
node-local scopes. It will still try to discover scopes on nodeup from
nodes joining the cluster, but since you cannot have nodes with the
same name in one cluster, using node() for local-only scopes worked
well for us."
So that's what we're doing in this commit.
"Each Client connecting to the Server has a unique ClientId"
"If the ClientId represents a Client already connected to
the Server then the Server MUST disconnect the existing
Client [MQTT-3.1.4-2]."
Instead of tracking client IDs via Raft, we use local ETS tables in this
commit.
Previous tracking of client IDs via Raft:
(+) consistency (does the right thing)
(-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients
(-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k)
disconnect at the same time because monitor (DOWN) Ra commands get
written resulting in Ra machine timeout.
(-) if we need consistency, we ideally want a single source of truth,
e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process)
While above downsides could be fixed (e.g. avoiding DOWN commands by
instead doing periodic cleanups of client ID entries using session interval
in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config),
in this case we do not necessarily need the consistency guarantees Raft provides.
In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort
basis: If there are no network failures and no messages get lost,
existing clients with duplicate client IDs get disconnected.
In the presence of network failures / lost messages, two clients with
the same client ID can end up publishing or receiving from the same
queue. Arguably, that's acceptable and less worse than the scaling
issues we experience when we want stronger consistency.
Note that it is also the responsibility of the client to not connect
twice with the same client ID.
This commit also ensures that the client ID is a binary to save memory.
A new feature flag is introduced, which when enabled, deletes the Ra
cluster named 'mqtt_node'.
Independent of that feature flag, client IDs are tracked locally in ETS
tables.
If that feature flag is disabled, client IDs are additionally tracked in
Ra.
The feature flag is required such that clients can continue to connect
to all nodes except for the node being udpated in a rolling update.
This commit also fixes a bug where previously all MQTT connections were
cluster-wide closed when one RabbitMQ node was put into maintenance
mode.
Share the same MQTT keepalive code between rabbit_mqtt_reader and
rabbit_web_mqtt_handler.
Add MQTT keepalive test in both plugins rabbitmq_mqtt and
rabbitmq_web_mqtt.
* Create MQTT connections without proxying via AMQP
* Do authn / authz in rabbitmq_mqtt instead of rabbit_direct:connect/5
* Remove rabbit_heartbeat process and per connection supervisors
Current status:
Creating 10k MQTT connections with clean session succeeds:
./emqtt_bench conn -V 4 -C true -c 10000 -R 500
To avoid blocking when registering or unregistering a client id. This is
ok as informing the current connection holder of the client id is
already async. This should be more scalable and provide much better MQTT
connection setup latency.
Changes initial_state/4 to initial_state/5 to add the peer
address that needs to be provided by Web MQTT. This function
was only used locally and by Web MQTT.
The old format is removed in Erlang 19.0, leading to build errors.
Also, get rid of the `use_specs` macro and thus always define -spec() &
friends.
While here, unnify the style of -type and -spec.
References rabbitmq/rabbitmq-server#860.
[#118562897]
[#122335241]