Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
https://github.com/rabbitmq/rabbitmq-server/blob/fbe79ff47b4edbc0fd95457e623d6593161ad198/deps/rabbit/src/rabbit_classic_queue_index_v2.erl#L1048
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
https://github.com/rabbitmq/rabbitmq-server/blob/75a953ce286a10aca910c098805a4f545989af38/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl#L2075-L2076
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 18:27:13 +08:00
|
|
|
% This Source Code Form is subject to the terms of the Mozilla Public
|
2020-07-10 21:31:17 +08:00
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
2020-03-31 03:19:02 +08:00
|
|
|
%%
|
2024-02-06 00:53:36 +08:00
|
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
2020-03-31 03:19:02 +08:00
|
|
|
%%
|
|
|
|
|
|
|
|
-module(unit_access_control_SUITE).
|
|
|
|
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
-compile([export_all, nowarn_export_all]).
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
all() ->
|
|
|
|
[
|
|
|
|
{group, sequential_tests},
|
|
|
|
{group, parallel_tests}
|
|
|
|
].
|
|
|
|
|
|
|
|
groups() ->
|
|
|
|
[
|
|
|
|
{parallel_tests, [parallel], [
|
|
|
|
password_hashing,
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
version_negotiation
|
2020-03-31 03:19:02 +08:00
|
|
|
]},
|
|
|
|
{sequential_tests, [], [
|
|
|
|
login_with_credentials_but_no_password,
|
|
|
|
login_of_passwordless_user,
|
|
|
|
set_tags_for_passwordless_user,
|
|
|
|
change_password,
|
|
|
|
auth_backend_internal_expand_topic_permission,
|
|
|
|
rabbit_direct_extract_extra_auth_props
|
|
|
|
]}
|
|
|
|
].
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% Testsuite setup/teardown
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
|
rabbit_ct_helpers:log_environment(),
|
|
|
|
rabbit_ct_helpers:run_setup_steps(Config).
|
|
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
|
|
|
|
init_per_group(Group, Config) ->
|
|
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
|
|
{rmq_nodename_suffix, Group},
|
|
|
|
{rmq_nodes_count, 1}
|
|
|
|
]),
|
|
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
|
|
rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
|
|
|
|
end_per_group(_Group, Config) ->
|
|
|
|
rabbit_ct_helpers:run_steps(Config,
|
|
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
|
|
|
|
%% ---------------------------------------------------------------------------
|
|
|
|
%% Test Cases
|
|
|
|
%% ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
password_hashing(Config) ->
|
|
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
?MODULE, password_hashing1, [Config]).
|
|
|
|
|
|
|
|
password_hashing1(_Config) ->
|
|
|
|
rabbit_password_hashing_sha256 = rabbit_password:hashing_mod(),
|
|
|
|
application:set_env(rabbit, password_hashing_module,
|
|
|
|
rabbit_password_hashing_md5),
|
|
|
|
rabbit_password_hashing_md5 = rabbit_password:hashing_mod(),
|
|
|
|
application:set_env(rabbit, password_hashing_module,
|
|
|
|
rabbit_password_hashing_sha256),
|
|
|
|
rabbit_password_hashing_sha256 = rabbit_password:hashing_mod(),
|
|
|
|
|
|
|
|
rabbit_password_hashing_sha256 =
|
|
|
|
rabbit_password:hashing_mod(rabbit_password_hashing_sha256),
|
|
|
|
rabbit_password_hashing_md5 =
|
|
|
|
rabbit_password:hashing_mod(rabbit_password_hashing_md5),
|
|
|
|
rabbit_password_hashing_md5 =
|
|
|
|
rabbit_password:hashing_mod(undefined),
|
|
|
|
|
|
|
|
rabbit_password_hashing_md5 =
|
|
|
|
rabbit_auth_backend_internal:hashing_module_for_user(
|
2020-07-09 21:11:00 +08:00
|
|
|
internal_user:new()),
|
2020-03-31 03:19:02 +08:00
|
|
|
rabbit_password_hashing_md5 =
|
|
|
|
rabbit_auth_backend_internal:hashing_module_for_user(
|
2020-07-09 21:11:00 +08:00
|
|
|
internal_user:new({hashing_algorithm, undefined})),
|
2020-03-31 03:19:02 +08:00
|
|
|
rabbit_password_hashing_md5 =
|
|
|
|
rabbit_auth_backend_internal:hashing_module_for_user(
|
2020-07-09 21:11:00 +08:00
|
|
|
internal_user:new({hashing_algorithm, rabbit_password_hashing_md5})),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
rabbit_password_hashing_sha256 =
|
|
|
|
rabbit_auth_backend_internal:hashing_module_for_user(
|
2020-07-09 21:11:00 +08:00
|
|
|
internal_user:new({hashing_algorithm, rabbit_password_hashing_sha256})),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
passed.
|
|
|
|
|
|
|
|
change_password(Config) ->
|
|
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
?MODULE, change_password1, [Config]).
|
|
|
|
|
|
|
|
change_password1(_Config) ->
|
|
|
|
UserName = <<"test_user">>,
|
|
|
|
Password = <<"test_password">>,
|
|
|
|
case rabbit_auth_backend_internal:lookup_user(UserName) of
|
|
|
|
{ok, _} -> rabbit_auth_backend_internal:delete_user(UserName, <<"acting-user">>);
|
|
|
|
_ -> ok
|
|
|
|
end,
|
|
|
|
ok = application:set_env(rabbit, password_hashing_module,
|
|
|
|
rabbit_password_hashing_md5),
|
|
|
|
ok = rabbit_auth_backend_internal:add_user(UserName, Password, <<"acting-user">>),
|
|
|
|
{ok, #auth_user{username = UserName}} =
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(
|
|
|
|
UserName, [{password, Password}]),
|
|
|
|
ok = application:set_env(rabbit, password_hashing_module,
|
|
|
|
rabbit_password_hashing_sha256),
|
|
|
|
{ok, #auth_user{username = UserName}} =
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(
|
|
|
|
UserName, [{password, Password}]),
|
|
|
|
|
|
|
|
NewPassword = <<"test_password1">>,
|
|
|
|
ok = rabbit_auth_backend_internal:change_password(UserName, NewPassword,
|
|
|
|
<<"acting-user">>),
|
|
|
|
{ok, #auth_user{username = UserName}} =
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(
|
|
|
|
UserName, [{password, NewPassword}]),
|
|
|
|
|
|
|
|
{refused, _, [UserName]} =
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(
|
|
|
|
UserName, [{password, Password}]),
|
|
|
|
passed.
|
|
|
|
|
|
|
|
|
|
|
|
login_with_credentials_but_no_password(Config) ->
|
|
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
?MODULE, login_with_credentials_but_no_password1, [Config]).
|
|
|
|
|
|
|
|
login_with_credentials_but_no_password1(_Config) ->
|
|
|
|
Username = <<"login_with_credentials_but_no_password-user">>,
|
|
|
|
Password = <<"login_with_credentials_but_no_password-password">>,
|
|
|
|
ok = rabbit_auth_backend_internal:add_user(Username, Password, <<"acting-user">>),
|
|
|
|
|
2023-08-14 18:51:46 +08:00
|
|
|
?assertMatch(
|
|
|
|
{refused, _Message, [Username]},
|
2020-03-31 03:19:02 +08:00
|
|
|
rabbit_auth_backend_internal:user_login_authentication(Username,
|
2023-08-14 18:51:46 +08:00
|
|
|
[{key, <<"value">>}])),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
ok = rabbit_auth_backend_internal:delete_user(Username, <<"acting-user">>),
|
|
|
|
|
|
|
|
passed.
|
|
|
|
|
|
|
|
%% passwordless users are not supposed to be used with
|
|
|
|
%% this backend (and PLAIN authentication mechanism in general)
|
|
|
|
login_of_passwordless_user(Config) ->
|
|
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
?MODULE, login_of_passwordless_user1, [Config]).
|
|
|
|
|
|
|
|
login_of_passwordless_user1(_Config) ->
|
|
|
|
Username = <<"login_of_passwordless_user-user">>,
|
|
|
|
Password = <<"">>,
|
|
|
|
ok = rabbit_auth_backend_internal:add_user(Username, Password, <<"acting-user">>),
|
|
|
|
|
|
|
|
?assertMatch(
|
|
|
|
{refused, _Message, [Username]},
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(Username,
|
|
|
|
[{password, <<"">>}])),
|
|
|
|
|
|
|
|
?assertMatch(
|
|
|
|
{refused, _Format, [Username]},
|
|
|
|
rabbit_auth_backend_internal:user_login_authentication(Username,
|
|
|
|
[{password, ""}])),
|
|
|
|
|
|
|
|
ok = rabbit_auth_backend_internal:delete_user(Username, <<"acting-user">>),
|
|
|
|
|
|
|
|
passed.
|
|
|
|
|
|
|
|
|
|
|
|
set_tags_for_passwordless_user(Config) ->
|
|
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
?MODULE, set_tags_for_passwordless_user1, [Config]).
|
|
|
|
|
|
|
|
set_tags_for_passwordless_user1(_Config) ->
|
|
|
|
Username = <<"set_tags_for_passwordless_user">>,
|
|
|
|
Password = <<"set_tags_for_passwordless_user">>,
|
|
|
|
ok = rabbit_auth_backend_internal:add_user(Username, Password,
|
|
|
|
<<"acting-user">>),
|
|
|
|
ok = rabbit_auth_backend_internal:clear_password(Username,
|
|
|
|
<<"acting-user">>),
|
|
|
|
ok = rabbit_auth_backend_internal:set_tags(Username, [management],
|
|
|
|
<<"acting-user">>),
|
|
|
|
|
2020-07-09 21:11:00 +08:00
|
|
|
{ok, User1} = rabbit_auth_backend_internal:lookup_user(Username),
|
|
|
|
?assertEqual([management], internal_user:get_tags(User1)),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
ok = rabbit_auth_backend_internal:set_tags(Username, [management, policymaker],
|
|
|
|
<<"acting-user">>),
|
|
|
|
|
2020-07-09 21:11:00 +08:00
|
|
|
{ok, User2} = rabbit_auth_backend_internal:lookup_user(Username),
|
|
|
|
?assertEqual([management, policymaker], internal_user:get_tags(User2)),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
ok = rabbit_auth_backend_internal:set_tags(Username, [],
|
|
|
|
<<"acting-user">>),
|
|
|
|
|
2020-07-09 21:11:00 +08:00
|
|
|
{ok, User3} = rabbit_auth_backend_internal:lookup_user(Username),
|
|
|
|
?assertEqual([], internal_user:get_tags(User3)),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
|
|
|
ok = rabbit_auth_backend_internal:delete_user(Username,
|
|
|
|
<<"acting-user">>),
|
|
|
|
|
|
|
|
passed.
|
|
|
|
|
|
|
|
|
|
|
|
rabbit_direct_extract_extra_auth_props(_Config) ->
|
|
|
|
{ok, CSC} = code_server_cache:start_link(),
|
|
|
|
% no protocol to extract
|
|
|
|
[] = rabbit_direct:extract_extra_auth_props(
|
|
|
|
{<<"guest">>, <<"guest">>}, <<"/">>, 1,
|
|
|
|
[{name,<<"127.0.0.1:52366 -> 127.0.0.1:1883">>}]),
|
|
|
|
% protocol to extract, but no module to call
|
|
|
|
[] = rabbit_direct:extract_extra_auth_props(
|
|
|
|
{<<"guest">>, <<"guest">>}, <<"/">>, 1,
|
|
|
|
[{protocol, {'PROTOCOL_WITHOUT_MODULE', "1.0"}}]),
|
|
|
|
% see rabbit_dummy_protocol_connection_info module
|
|
|
|
% protocol to extract, module that returns a client ID
|
|
|
|
[{client_id, <<"DummyClientId">>}] = rabbit_direct:extract_extra_auth_props(
|
|
|
|
{<<"guest">>, <<"guest">>}, <<"/">>, 1,
|
|
|
|
[{protocol, {'DUMMY_PROTOCOL', "1.0"}}]),
|
|
|
|
% protocol to extract, but error thrown in module
|
|
|
|
[] = rabbit_direct:extract_extra_auth_props(
|
|
|
|
{<<"guest">>, <<"guest">>}, <<"/">>, -1,
|
|
|
|
[{protocol, {'DUMMY_PROTOCOL', "1.0"}}]),
|
|
|
|
gen_server:stop(CSC),
|
|
|
|
ok.
|
|
|
|
|
|
|
|
auth_backend_internal_expand_topic_permission(_Config) ->
|
|
|
|
ExpandMap = #{<<"username">> => <<"guest">>, <<"vhost">> => <<"default">>},
|
|
|
|
%% simple case
|
|
|
|
<<"services/default/accounts/guest/notifications">> =
|
|
|
|
rabbit_auth_backend_internal:expand_topic_permission(
|
|
|
|
<<"services/{vhost}/accounts/{username}/notifications">>,
|
|
|
|
ExpandMap
|
|
|
|
),
|
|
|
|
%% replace variable twice
|
|
|
|
<<"services/default/accounts/default/guest/notifications">> =
|
|
|
|
rabbit_auth_backend_internal:expand_topic_permission(
|
|
|
|
<<"services/{vhost}/accounts/{vhost}/{username}/notifications">>,
|
|
|
|
ExpandMap
|
|
|
|
),
|
|
|
|
%% nothing to replace
|
|
|
|
<<"services/accounts/notifications">> =
|
|
|
|
rabbit_auth_backend_internal:expand_topic_permission(
|
|
|
|
<<"services/accounts/notifications">>,
|
|
|
|
ExpandMap
|
|
|
|
),
|
|
|
|
%% the expand map isn't defined
|
|
|
|
<<"services/{vhost}/accounts/{username}/notifications">> =
|
|
|
|
rabbit_auth_backend_internal:expand_topic_permission(
|
|
|
|
<<"services/{vhost}/accounts/{username}/notifications">>,
|
|
|
|
undefined
|
|
|
|
),
|
|
|
|
%% the expand map is empty
|
|
|
|
<<"services/{vhost}/accounts/{username}/notifications">> =
|
|
|
|
rabbit_auth_backend_internal:expand_topic_permission(
|
|
|
|
<<"services/{vhost}/accounts/{username}/notifications">>,
|
|
|
|
#{}
|
|
|
|
),
|
|
|
|
ok.
|
|
|
|
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
%% Test AMQP 1.0 §2.2
|
|
|
|
version_negotiation(Config) ->
|
|
|
|
ok = rabbit_ct_broker_helpers:rpc(Config, ?MODULE, version_negotiation1, [Config]).
|
2020-03-31 03:19:02 +08:00
|
|
|
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
version_negotiation1(Config) ->
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
Hostname = ?config(rmq_hostname, Config),
|
|
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
2020-03-31 03:19:02 +08:00
|
|
|
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
[?assertEqual(<<"AMQP",3,1,0,0>>,
|
|
|
|
version_negotiation2(Hostname, Port, Vsn)) ||
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
Vsn <- [<<"AMQP",0,1,0,0>>,
|
|
|
|
<<"AMQP",0,1,0,1>>,
|
|
|
|
<<"AMQP",0,1,1,0>>,
|
|
|
|
<<"AMQP",0,9,1,0>>,
|
|
|
|
<<"AMQP",0,0,8,0>>,
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
<<"AMQP",1,1,0,0>>,
|
|
|
|
<<"AMQP",2,1,0,0>>,
|
|
|
|
<<"AMQP",3,1,0,0>>,
|
|
|
|
<<"AMQP",3,1,0,1>>,
|
|
|
|
<<"AMQP",3,1,0,1>>,
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
<<"AMQP",4,1,0,0>>,
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
<<"AMQP",9,1,0,0>>,
|
|
|
|
<<"XXXX",0,1,0,0>>,
|
|
|
|
<<"XXXX",0,0,9,1>>
|
|
|
|
]],
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
[?assertEqual(<<"AMQP",0,0,9,1>>,
|
|
|
|
version_negotiation2(Hostname, Port, Vsn)) ||
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
Vsn <- [<<"AMQP",0,0,9,2>>,
|
|
|
|
<<"AMQP",0,0,10,0>>,
|
|
|
|
<<"AMQP",0,0,10,1>>]],
|
|
|
|
ok.
|
|
|
|
|
Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* https://github.com/rabbitmq/rabbitmq-java-client/blob/0215e85643a9ae0800822869be0200024e2ab569/src/main/java/com/rabbitmq/client/ConnectionFactory.java#L58-L61
* https://github.com/rabbitmq/amqp091-go/blob/ddb7a2f0685689063e6d709b8e417dbf9d09469c/uri.go#L31-L32
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](https://github.com/rabbitmq/rabbitmq-server/blob/146b4862d8e570b344c99c37d91246760e218b18/deps/rabbitmq_amqp1_0/Makefile#L6)
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-14 18:19:17 +08:00
|
|
|
version_negotiation2(Hostname, Port, Header) ->
|
|
|
|
{ok, C} = gen_tcp:connect(Hostname, Port, [binary, {active, false}]),
|
2020-03-31 03:19:02 +08:00
|
|
|
ok = gen_tcp:send(C, Header),
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
{ok, ServerVersion} = gen_tcp:recv(C, 8, 100),
|
2020-03-31 03:19:02 +08:00
|
|
|
ok = gen_tcp:close(C),
|
Support AMQP 1.0 natively
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2023-07-21 18:29:07 +08:00
|
|
|
ServerVersion.
|