171 Commits
Author | SHA1 | Message | Date |
---|---|---|---|
|
c34d6206f1 |
Support AMQP SQL Filter Expressions
Instead of the JMS message selector syntax, support a subset of the AMQP SQL Filter Expressions syntax as defined in [AMQP Filter Expressions Version 1.0 Committee Specification Draft 01](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276). This commit changes the descriptors. |
|
|
80a687f525 | Simplify splitting large messages | |
|
cf3bbe99a7 | Simplify pattern matching | |
|
bf23a7fb30 |
Speed up AMQP 1.0 parser by generating less garbage
This commit results in great performance improvements for AMQP 1.0. Stream filtering via AMQP SQL Filters or AMQP Property Filters where the broker reads messages from the stream into memory and the filter returns false, i.e. messages are not sent to the client: Before this commit: ~400,000 msgs/s After this commit: ~500,000 msgs/s There is also a ~10% increase in end-to-end throughput for normal AMQP workloads, e.g. when sending to and receiving from a classic queue. Prior to this commit, a lot of garbage was created leading to many minor garbage collections. This commit reduces the garbage being generated. The new module amqp10_composite performs very well: * Single tuple update setting multiple fields at once, sometimes done even in-place without copying the tuple. * The list of fields is passed into X registers, no need for any new allocations. On the serialisation side, this commit also generates less garbage by: 1. Avoiding tuple_to_list/1 2. Omitting trailing elements of the list that are null This will also lead to fewer bytes per message sent on the wire and less resource usage for the clients as they need to parse fewer fields. |
|
|
603466b5d8 | Use byte_size/1 instead of size/1 | |
|
5c318c8e38 |
Fix AMQP crashes for approximate numbers
This commit fixes several crashes: 1. Serialising IEEE 754-2008 decimals as well as NaN and +-Inf for float and doubles crashed 2. Converting IEEE 754-2008 decimals as well as NaN and +-Inf for float and dobules from amqp to amqpl crashed The 2nd crash looks as follows: ``` exception exit: {function_clause, [{mc_amqpl,to_091, [<<"decimal-32">>,{as_is,116,<<124,0,0,0>>}], [{file,"mc_amqpl.erl"},{line,747}]}, {mc_amqpl,'-convert_from/3-lc$^2/1-2-',1, [{file,"mc_amqpl.erl"},{line,155}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,155}]}, {mc,convert,3,[{file,"mc.erl"},{line,358}]}, {rabbit_channel,outgoing_content,2, [{file,"rabbit_channel.erl"},{line,2649}]}, {rabbit_channel,handle_basic_get,7, [{file,"rabbit_channel.erl"},{line,2636}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,617}]}, {gen_server2,handle_msg,2, [{file,"gen_server2.erl"},{line,1056}]}]} ``` The 2nd crash is fixed by omitting any `{as_is, _TypeCode, _Binary}` values during AMQP 1.0 -> AMQP 0.9.1 conversion. This will be documented in the conversion table. In addition to fixing these crashes, this commit adds tests that RabbitMQ is able to store and forward IEEE 754-2008 decimals. IEEE 754-2008 decimals can be parsed and serialsed by RabbitMQ. However, RabbitMQ doesn't support interpreting this values. For example, they can't be used on the headers exchange or for AMQP filter expressions. |
|
|
93db480bc4 |
Support SQL filter expressions for streams
## What? This commit allows AMQP 1.0 clients to define SQL-like filter expressions when consuming from streams, enabling server-side message filtering. RabbitMQ will only dispatch messages that match the provided filter expression, reducing network traffic and client-side processing overhead. SQL filter expressions are a more powerful alternative to the [AMQP Property Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) introduced in RabbitMQ 4.1. SQL filter expressions are based on the [JMS message selector syntax](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax) and support: * Comparison operators (`=`, `<>`, `>`, `<`, `>=`, `<=`) * Logical operators (`AND`, `OR`, `NOT`) * Arithmetic operators (`+`, `-`, `*`, `/`) * Special operators (`BETWEEN`, `LIKE`, `IN`, `IS NULL`) * Access to the properties and application-properties sections **Examples** Simple expression: ```sql header.priority > 4 ``` Complex expression: ```sql order_type IN ('premium', 'express') AND total_amount BETWEEN 100 AND 5000 AND (customer_region LIKE 'EU-%' OR customer_region = 'US-CA') AND properties.creation-time >= 1750772279000 AND NOT cancelled ``` Like AMQP property filter expressions, SQL filter expressions can be combined with Bloom filters. Combining both allows for highly customisable expressions (SQL) and extremely fast evaluation (Bloom filter) if only a subset of the chunks need to be read from disk. ## Why? Compared to AMQP property filter expressions, SQL filter expressions provide the following advantage: * High expressiveness and flexibility in defining the filter Like for AMQP property filter expressions, the following advantages apply: * No false positives (as is the case for Bloom filters) * Multiple concurrent clients can attach to the same stream each consuming only a specific subset of messages while preserving message order. * Low network overhead as only messages that match the filter are transferred to the client * Likewise, lower resource usage (CPU and memory) on clients since they don't need to deserialise messages that they are not interested in. * If the SQL expression is simple, even the broker will save resources because it doesn't need to serialse and send messages that the client isn't interested in. ## How? ### JMS Message Selector Syntax vs. AMQP Extension Spec The AMQP Filter Expressions Version 1.0 extension Working Draft 09 defines SQL Filter Expressions in Section 6. This spec differs from the JMS message selector spec. Neither is a subset of the other. We can choose to follow either. However, I think it makes most sense to follow the JMS spec because: * The JMS spec is better defined * The JMS spec is far more widespread than the AMQP Working Draft spec. (A slight variation of the AMQP Working Draft is used by Azure Service Bus: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter) * The JMS spec is mostly simpler (partly because matching on only simple types) * This will allow for a single SQL parser in RabbitMQ for both AMQP clients consuming from a stream and possibly in future for JMS clients consuming from queues or topics. <details> <summary>AMQP extension spec vs JMS spec</summary> AMQP != is synonym for <> JMS defines only <> Conclusion <> is sufficient AMQP Strings can be tested for “greater than” “both operands are of type string or of type symbol (any combination is permitted) and the lexicographical rank of the left operand is greater than the lexicographical rank of the right operand” JMS “String and Boolean comparison is restricted to = and <>.” Conclusion The JMS behaviour is sufficient. AMQP IN <set-expression> set-expression can contain non-string literals JMS: set-expression can contain only string literals Conclusion The JMS behaviour is sufficient. AMQP EXISTS predicate to check for composite types JMS Only simple types Conclusion We want to match only for simple types, i.e. allowing matching only against values in the application-properties, properties sections and priority field of the header section. AMQP: Modulo operator % Conclusion JMS doesn't define the modulo operator. Let's start without it. We can decide in future to add support since it can actually be useful, for example for two receivers who want to process every other message. AMQP: The ‘+’ operator can concatenate string and symbol values Conclusion Such string concatenation isn't defined in JMS. We don't need it. AMQP: Define NAN and INF JMS: “Approximate literals use the Java floating-point literal syntax.” Examples include "7." Conclusion We can go with the JMS spec given that needs to be implemented anyway for JMS support. Scientific notations are supported in both the AMQP spec and JMS spec. AMQP String literals can be surrounded by single or double quotation marks JMS A string literal is enclosed in single quotes Conclusion Supporting single quotes is good enough. AMQP “A binary constant is a string of pairs of hexadecimal digits prefixed by ‘0x’ that are not enclosed in quotation marks” Conclusion JMS doesn't support binary constants. We can start without binary constants. Matching against binary values are still supported if these binary values can be expressed as UTF-8 strings. AMQP Functions DATE, UTC, SUBSTRING, LOWER, UPPER, LEFT, RIGHT Vendor specific functions Conclusion JMS doesn't define such functions. We can start without those functions. AMQP <field><array_element_reference> <field>‘.’<composite_type_reference> to access map and array elements Conclusion Same as above: We want to match only for simple types, i.e. allowing matching only against values in the application-properties, properties sections and priority field of the header section. AMQP allows for delimited identifiers JMS Java identifier part characters Conclusion We can go with the Java identifiers extending the allowed characters by `.` and `-` to reference field names such as `properties.group-id`. JMS: BETWEEN operator Conclusion The BETWEEN operator isn't supported in the AMQP spec. Let's support it as convenience since it's already available in JMS. </details> ### Filter Name The client provides a filter with name `sql-filter` instead of name `jms-selector` to allow to differentiate between JMS clients and other native AMQP 1.0 clients using SQL expressions. This way, we can also optionally extend the SQL grammar in future. ### Identifiers JMS message selectors allow identifiers to contain some well known JMS headers that match to well known AMQP fields, for example: ```erl jms_header_to_amqp_field_name(<<"JMSDeliveryMode">>) -> durable; jms_header_to_amqp_field_name(<<"JMSPriority">>) -> priority; jms_header_to_amqp_field_name(<<"JMSMessageID">>) -> message_id; jms_header_to_amqp_field_name(<<"JMSTimestamp">>) -> creation_time; jms_header_to_amqp_field_name(<<"JMSCorrelationID">>) -> correlation_id; jms_header_to_amqp_field_name(<<"JMSType">>) -> subject; %% amqp-bindmap-jms-v1.0-wd10 § 3.2.2 JMS-defined ’JMSX’ Properties jms_header_to_amqp_field_name(<<"JMSXUserID">>) -> user_id; jms_header_to_amqp_field_name(<<"JMSXGroupID">>) -> group_id; jms_header_to_amqp_field_name(<<"JMSXGroupSeq">>) -> group_sequence; ``` This commit does a similar matching for `header.` and `properties.` prefixed identifiers to field names in the AMQP property section. The only field that is supported to filter on in the AMQP header section is `priority`, that is identifier `header.priority`. By default, as described in the AMQP extension spec, if an identifier is not prefixed, it refers to a key in the application-properties section. Hence, all identifiers prefixed with `header.`, and `properties.` have special meanings and MUST be avoided by applications unless they want to refer to those specific fields. Azure Service Bus uses the `sys.` and `user.` prefixes for well known field names and arbitrary application-provided keys, respectively. ### SQL lexer, parser and evaluator This commit implements the SQL lexer and parser in files rabbit_jms_selector_lexer.xrl and rabbit_jms_selector_parser.yrl, respectively. Advantages: * Both the definitions in the lexer and the grammar in the parser are defined **declaratively**. * In total, the entire SQL syntax and grammar is defined in only 240 lines. * Therefore, lexer and parser are simple to maintain. The idea of this commit is to use the same lexer and parser for native AMQP clients consumings from streams (this commit) as for JMS clients (in the future). All native AMQP client vs JMS client bits are then manipulated after the Abstract Syntax Tree (AST) has been created by the parser. For example, this commit transforms the AST specifically for native AMQP clients by mapping `properties.` prefixed identifiers (field names) to atoms. A JMS client's mapping from `JMS` prefixed headers can transform the AST differently. Likewise, this commit transforms the AST to compile a regex for complex LIKE expressions when consuming from a stream while a future version might not want to compile a regex when consuming from quorum queues. Module `rabbit_jms_ast` provides such AST helper methods. The lexer and parser are not performance critical as this work happens upon receivers attaching to the stream. The evaluator however is performance critical as message evaluation happens on the hot path. ### LIKE expressions The evaluator has been optimised to only compile a regex when necessary. If the LIKE expression-value contains no wildcard or only a single `%` wildcard, Erlang pattern matching is used as it's more efficient. Since `_` can match any UTF-8 character, a regex will be compiled with the `[unicode]` options. ### Filter errors Any errors upon a receiver attaching to a stream causes the filter to not become active. RabbitMQ will log a warning describing the reason and will omit the named filter in its attach reply frame. The client lib is responsible for detaching the link as explained in the AMQP spec: > The receiving endpoint sets its desired filter, the sending endpoint sets the filter actually in place (including any filters defaulted at the node). The receiving endpoint MUST check that the filter in place meets its needs and take responsibility for detaching if it does not. This applies to lexer and parser errors. Errors during message evaluation will result in an unknown value. Conditional operators on unknown are described in the JMS spec. If the entire selector condition is unknown, the message does not match, and will therefore not be delivered to the client. ## Clients Support for passing the SQL expression from app to broker is provided by the Java client in https://github.com/rabbitmq/rabbitmq-amqp-java-client/pull/216 |
|
|
63f7da23c7
|
Delete symlinks to `erlang.mk` and `rabbitmq-components.mk`
[Why] They make it more difficult to compile RabbitMQ on Windows. They were probably useful at the time of the switch to a monorepository but I don't see their need anymore. |
|
|
1850ff1363 |
Avoid using the size/1 BIF
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run
Details
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run
Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run
Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run
Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run
Details
Test (make) / Type check (1.17, 27) (push) Waiting to run
Details
Avoid using the size/1 BIF for performance critical code because according to https://whatsapp.github.io/erlang-language-platform/docs/erlang-error-index/w/W0050/ "The BIF is not optimized by the JIT". |
|
|
07adc3e571
|
Remove Bazel files | |
|
968eefa1bb
|
Bump (c) line year
There are no functional changes to this massive diff. |
|
|
4eb5b82f9f
|
amqp10_common: Don't wrap export for test in test macro
The application is not always recompiled which causes tests to fail
because they cannot call `serial_number:usort/1`.
(cherry picked from commit
|
|
|
358ff79611 |
Provide clear error message for reserved annotation keys
As described in https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations > The annotations type is a map where the keys are restricted to be of type symbol or of type ulong. > All ulong keys, and all symbolic keys except those beginning with "x-" are reserved. Prior to this commit, if an AMQP client used a reserved annotation key, the entire AMQP connection terminated with a function_clause error message that might be difficult to understand for client libs: ``` <<"Session error: function_clause\n[{amqp10_framing,'-decode_annotations/1-fun-0-',\n [{{symbol,<<\"aa\">>},{utf8,<<\"bbb\">>}}],\n [{file,\"amqp10_framing.erl\"},{line,158}]},\n {lists,map,2,[{file,\"lists.erl\"},{line,1559}]},\n {amqp10_framing,decode,1,[{file,\"amqp10_framing.erl\"},{line,127}]},\n {lists,map_1,2,[{file,\"lists.erl\"},{line,1564}]},\n {lists,map,2,[{file,\"lists.erl\"},{line,1559}]},\n {mc_amqp,init,1,[{file,\"mc_amqp.erl\"},{line,102}]},\n {mc,init,4,[{file,\"mc.erl\"},{line,150}]},\n {rabbit_amqp_session,incoming_link_transfer,4,\n [{file,\"rabbit_amqp_session.erl\"},{line,2341}]}]">> ``` This commit ends only the session and provides a clearer error message. |
|
|
df59a52b70
|
Support AMQP filter expressions (#12415)
* Support AMQP filter expressions
## What?
This PR implements the following property filter expressions for AMQP clients
consuming from streams as defined in
[AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227):
* properties filters [section 4.2.4]
* application-properties filters [section 4.2.5]
String prefix and suffix matching is also supported.
This PR also fixes a bug where RabbitMQ would accept wrong filters.
Specifically, prior to this PR the values of the filter-set's map were
allowed to be symbols. However, "every value MUST be either null or of a
described type which provides the archetype filter."
## Why?
This feature adds the ability to RabbitMQ to have multiple concurrent clients
each consuming only a subset of messages while maintaining message order.
This feature also reduces network traffic between RabbitMQ and clients by
only dispatching those messages that the clients are actually interested in.
Note that AMQP filter expressions are more fine grained than the [bloom filter based
stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because
* they do not suffer false positives
* the unit of filtering is per-message instead of per-chunk
* matching can be performed on **multiple** values in the properties and
application-properties sections
* prefix and suffix matching on the actual values is supported.
Both, AMQP filter expressions and bloom filters can be used together.
## How?
If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only
replies with filters it actually supports and validated successfully to
comply with:
"The receiving endpoint sets its desired filter, the sending endpoint
[RabbitMQ] sets the filter actually in place (including any filters defaulted at
the node)."
* Delete streams test case
The test suite constructed a wrong filter-set.
Specifically the value of the filter-set didn't use a described type as
mandated by the spec.
Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html
throws errors that the descriptor can't be encoded. Given that this code
path is already tests via the amqp_filtex_SUITE, this F# test gets
therefore deleted.
* Re-introduce the AMQP filter-set bug
Since clients might rely on the wrong filter-set value type, we support
the bug behind a deprecated feature flag and gradually remove support
this bug.
* Revert "Delete streams test case"
This reverts commit
|
|
|
5e3942478f
|
amqp10_common: Don't dialyze tests or from source by default | |
|
9d7ebf32a9 |
Enforce correct transfer settled flag
For messages published to RabbitMQ, RabbitMQ honors the transfer `settled` field, no matter what value the sender settle mode was set to in the attach frame. Therefore, prior to this commit, a client could send a transfer with `settled=true` even though sender settle mode was set to `unsettled` in the attach frame. This commit enforces that the publisher sets only transfer `settled` fields that are valid with the spec. If sender settle mode is: * `unsettled`, the transfer `settled` flag must be `false`. * `settled`, the transfer `settled` flag must be `true`. * `mixed`, the transfer `settled` flag can be `true` or `false`. |
|
|
b1eb354385 | Strictly validate annotations | |
|
7ad8e2856b
|
make: Restrict Erlang.mk plugin inclusion
This has no real impact on performance[1] but should make it clear which application can run the broker and/or publish to Hex.pm. In particular, applications that we can't run the broker from will now give up early if we try to. Note that while the broker can't normally run from the amqp_client application's directory, it can run from tests and some of the tests start the broker. [1] on my machine |
|
|
445f3c9270
|
make: Move rabbitmq-early-test.mk to rabbitmq-early-plugin.mk
No real need to have two files, especially since it contains only a few variable definitions. Plan is to only keep separate files for larger features such as dist or run. |
|
|
d4222f8216
|
make: Remove emptied rabbitmq-tools.mk | |
|
7e7e6feb9d
|
make: Remove rabbitmq-tests.mk
Everything in this file seems to be dead code except ct-slow/ct-fast, which have been replaced by their equivalent in the rabbit Makefile. |
|
|
bbfa066d79
|
Cleanup .gitignore files for the monorepo
We don't need to duplicate so many patterns in so many files since we have a monorepo (and want to keep it). If I managed to miss something or remove something that should stay, please put it back. Note that monorepo-wide patterns should go in the top-level .gitignore file. Other .gitignore files are for application or folder- specific patterns. |
|
|
18f8ee1457
|
Merge pull request #11549 from rabbitmq/loic-make-cleanups
Various make cleanup/consolidation |
|
|
dcb2fe0fd9 |
Fix message IDs settlement order
## What? This commit fixes issues that were present only on `main` branch and were introduced by #9022. 1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`) expect message IDs to be (n)acked in the order as they were delivered to the channel / session proc. Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5` was wrong causing not all messages to be acked adding a regression to also AMQP 0.9.1. 2. The order in which the session proc requeues or rejects multiple message IDs at once is important. For example, if the client sends a DISPOSITION with first=3 and last=5, the message IDs corresponding to delivery IDs 3,4,5 must be requeued or rejected in exactly that order. For example, quorum queues use this order of message IDs in |
|
|
9f15e978b1
|
make: Remove xrefr
It is no longer used by Erlang.mk. |
|
|
cfa3de4b2b
|
Remove unused imports (thanks elp!) | |
|
c3289689bd | Remove unused code | |
|
b721b1a01b | Support maps and lists in AMQP array elements | |
|
6018155e9b | Add property test for AMQP serializer and parser | |
|
6225dc9928 |
Do not parse entire AMQP body
Prior to this commit the entire amqp-value or amqp-sequence sections were parsed when converting a message from mc_amqp. Parsing the entire amqp-value or amqp-sequence section can generate a huge amount of garbage depending on how large these sections are. Given that other protocol cannot make use of amqp-value and amqp-sequence sections anyway, leave them AMQP encoded when converting from mc_amqp. In fact prior to this commit, the entire body section was parsed generating huge amounts of garbage just to subsequently encode it again in mc_amqpl or mc_mqtt. The new conversion interface from mc_amqp to other mc_* modules will either output amqp-data sections or the encoded amqp-value / amqp-sequence sections. |
|
|
fc7f458f7c | Fix tests | |
|
2380b850e9 | Do not create binaries in parse/1 | |
|
9fbbb4770c | Move frequent clauses to the top | |
|
bfe285445e | Remove slow guard tests | |
|
4980669502 | New mc_amqp state | |
|
0a31d7721b |
Parse until body and return position for bare sections
parse_many is completely optimized |
|
|
cf7a2a0f45 | Speed up AMQP parser | |
|
4ec33c8678
|
Fix some dialyzer build system errors in make (#11014)
* make amqp10_common dialyze green in make * make rabbitmq_ct_client_helpers dialyze green with make * fixup rabbitmq_prelaunch path ref * Cleanup unused dep_* vars * Fixup xref for rabbitmq_ct_helpers I could not figure out how to make xref aware of the cli code without also checking the cli code as well, and reporting additional errors * remove unused file * fix make diaylze for rabbitmq_stream_common * update deps/oauth2_client/Makefile to match Bazel |
|
|
b0260cf4b3 |
Avoid creation of binaries in AMQP 1.0 generator
When generating iodata() in the AMQP 1.0 generator, prefer integers over binaries. Rename functions and variable names to better reflect the AMQP 1.0 spec instead of using AMQP 0.9.1 wording. |
|
|
8a3f3c6f34 |
Enable AMQP 1.0 clients to manage topologies
## What? * Allow AMQP 1.0 clients to dynamically create and delete RabbitMQ topologies (exchanges, queues, bindings). * Provide an Erlang AMQP 1.0 client that manages topologies. ## Why? Today, RabbitMQ topologies can be created via: * [Management HTTP API](https://www.rabbitmq.com/docs/management#http-api) (including Management UI and [messaging-topology-operator](https://github.com/rabbitmq/messaging-topology-operator)) * [Definition Import](https://www.rabbitmq.com/docs/definitions#import) * AMQP 0.9.1 clients Up to RabbitMQ 3.13 the RabbitMQ AMQP 1.0 plugin auto creates queues and bindings depending on the terminus [address format](https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing). Such implicit creation of topologies is limiting and obscure. For some address formats, queues will be created, but not deleted. Some of RabbitMQ's success is due to its flexible routing topologies that AMQP 0.9.1 clients can create and delete dynamically. This commit allows dynamic management of topologies for AMQP 1.0 clients. This commit builds on top of Native AMQP 1.0 (PR #9022) and will be available in RabbitMQ 4.0. ## How? This commits adds the following management operations for AMQP 1.0 clients: * declare queue * delete queue * purge queue * bind queue to exchange * unbind queue from exchange * declare exchange * delete exchange * bind exchange to exchange * unbind exchange from exchange Hence, at least the AMQP 0.9.1 management operations are supported for AMQP 1.0 clients. In addition the operation * get queue is provided which - similar to `declare queue` - returns queue information including the current leader and replicas. This allows clients to publish or consume locally on the node that hosts the queue. Compared to AMQP 0.9.1 whose commands and command fields are fixed, the new AMQP Management API is extensible: New operations and new fields can easily be added in the future. There are different design options how management operations could be supported for AMQP 1.0 clients: 1. Use a special exchange type as done in https://github.com/rabbitmq/rabbitmq-management-exchange This has the advantage that any protocol client (e.g. also STOMP clients) could dynamically manage topologies. However, a special exchange type is the wrong abstraction. 2. Clients could send "special" messages with special headers that the broker interprets. This commit decided for a variation of the 2nd option using a more standardized way by re-using a subest of the following latest AMQP 1.0 extension specifications: * [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021) * [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65571) (July 2019) * [AMQP Management Version 1.0 - Working Draft 16](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65575) (July 2019) An important goal is to keep the interaction between AMQP 1.0 client and RabbitMQ simple to increase usage, development and adoptability of future RabbitMQ AMQP 1.0 client library wrappers. The AMQP 1.0 client has to create a link pair to the special `/management` node. This allows the client to send and receive from the management node. Similar to AMQP 0.9.1, there is no need for a reply queue since the reply will be sent directly to the client. Requests and responses are modelled via HTTP, but sent via AMQP using the `HTTP Semantics and Content over AMQP` extension (henceforth `HTTP over AMQP` extension). This commit tries to follow the `HTTP over AMQP` extension as much as possible but deviates where this draft spec doesn't make sense. The projected mode §4.1 is used as opposed to tunneled mode §4.2. A named relay `/management` is used (§6.3) where the message field `to` is the URL. Deviations are * §3.1 mandates that URIs are not encoded in an AMQP message. However, we percent encode URIs in the AMQP message. Otherwise there is for example no way to distinguish a `/` in a queue name from the URI path separator `/`. * §4.1.4 mandates a data section. This commit uses an amqp-value section as it's a better fit given that the content is AMQP encoded data. Using an HTTP API allows for a common well understood interface and future extensibility. Instead of re-using the current RabbitMQ HTTP API, this commit uses a new HTTP API (let's call it v2) which could be used as a future API for plain HTTP clients. ### HTTP API v1 The current HTTP API (let's call it v1) is **not** used since v1 comes with a couple of weaknesses: 1. Deep level of nesting becomes confusing and difficult to manage. Examples of deep nesting in v1: ``` /api/bindings/vhost/e/source/e/destination/props /api/bindings/vhost/e/exchange/q/queue/props ``` 2. Redundant endpoints returning the same resources v1 has 9 endpoints to list binding(s): ``` /api/exchanges/vhost/name/bindings/source /api/exchanges/vhost/name/bindings/destination /api/queues/vhost/name/bindings /api/bindings /api/bindings/vhost /api/bindings/vhost/e/exchange/q/queue /api/bindings/vhost/e/exchange/q/queue/props /api/bindings/vhost/e/source/e/destination /api/bindings/vhost/e/source/e/destination/props ``` 3. Verbs in path names Path names should be nouns instead. v1 contains verbs: ``` /api/queues/vhost/name/get /api/exchanges/vhost/name/publish ``` ### AMQP Management extension Only few aspects of the AMQP Management extension are used. The central idea of the AMQP management spec is **dynamic discovery** such that broker independent AMQP 1.0 clients can discover objects, types, operations, and HTTP endpoints of specific brokers. In fact, clients are only conformant if: > All request addresses are dynamically discovered starting from the discovery document. > A requesting container MUST NOT use fixed assumptions about the addressing structure of the management API. While this is a nice and powerful idea, no AMQP 1.0 client and no AMQP 1.0 server implement the latest AMQP 1.0 management spec from 2019, partly presumably due to its complexity. Therefore, the idea of such dynamic discovery has failed to be implemented in practice. The AMQP management spec mandates that the management endpoint returns a discovery document containing broker specific collections, types, configuration, and operations including their endpoints. The API endpoints of the AMQP management spec are therefore all designed around dynamic discovery. For example, to create either a queue or an exchange, the client has to ``` POST /$management/entities ``` which shows that the entities collection acts as a generic factory, see section 2.2. The server will then create the resource and reply with a location header containing a URI pointing to the resource. For RabbitMQ, we don’t need such a generic factory to create queues or exchanges. To list bindings for a queue Q1, the spec suggests ``` GET /$management/Queues/Q1/$management/entities ``` which again shows the generic entities endpoint as well as a `$management` endpoint under Q1 to allow a queue to return a discovery document. For RabbitMQ, we don’t need such generic endpoints and discovery documents. Given we aim for our own thin RabbitMQ AMQP 1.0 client wrapper libraries which expose the RabbitMQ model to the developer, we can directly use fixed HTTP endpoint assumptions in our RabbitMQ specific libraries. This is by far simpler than using the dynamic endpoints of the management spec. Simplicity leads to higher adoption and enables more developers to write RabbitMQ AMQP 1.0 client library wrappers. The AMQP Management extension also suffers from deep level of nesting in paths Examples: ``` /$management/Queues/Q1/$management/entities /$management/Queues/Q1/Bindings/Binding1 ``` as well as verbs in path names: Section 7.1.4 suggests using verbs in path names, for example “purge”, due to the dynamic operations discovery document. ### HTTP API v2 This commit introduces a new HTTP API v2 following best practices. It could serve as a future API for plain HTTP clients. This commit and RabbitMQ 4.0 will only implement a minimal set of HTTP API v2 endpoints and only for HTTP over AMQP. In other words, the existing HTTP API v1 Cowboy handlers will continue to be used for all plain HTTP requests in RabbitMQ 4.0 and will remain untouched for RabbitMQ 4.0. Over time, after 4.0 shipped, we could ship a pure HTTP API implementation for HTTP API v2. Hence, the new HTTP API v2 endpoints for HTTP over AMQP should be designed such that they can be re-used in the future for a pure HTTP implementation. The minimal set of endpoints for RabbitMQ 4.0 are: `` GET / PUT / DELETE /vhosts/:vhost/queues/:queue ``` read, create, delete a queue ``` DELETE /vhosts/:vhost/queues/:queue/messages ``` purges a queue ``` GET / DELETE /vhosts/:vhost/bindings/:binding ``` read, delete bindings where `:binding` is a binding ID of the following path segment: ``` src=e1;dstq=q2;key=my-key;args= ``` Binding arguments `args` has an empty value by default, i.e. there are no binding arguments. If the binding includes binding arguments, `args` will be an Erlang portable term hash provided by the server similar to what’s provided in HTTP API v1 today. Alternatively, we could use an arguments scheme of: ``` args=k1,utf8,v1&k2,uint,3 ``` However, such a scheme leads to long URIs when there are many binding arguments. Note that it’s perfectly fine for URI producing applications to include URI reserved characters `=` / `;` / `,` / `$` in a path segment. To create a binding, the client therefore needs to POST to a bindings factory URI: ``` POST /vhosts/:vhost/bindings ``` To list all bindings between a source exchange e1 and destination exchange e2 with binding key k1: ``` GET /vhosts/:vhost/bindings?src=e1&dste=e2&key=k1 ``` This endpoint will be called by the RabbitMQ AMQP 1.0 client library to unbind a binding with non-empty binding arguments to get the binding ID before invoking a ``` DELETE /vhosts/:vhost/bindings/:binding ``` In future, after RabbitMQ 4.0 shipped, new API endpoints could be added. The following is up for discussion and is only meant to show the clean and simple design of HTTP API v2. Bindings endpoint can be queried as follows: to list all bindings for a given source exchange e1: ``` GET /vhosts/:vhost/bindings?src=e1 ``` to list all bindings for a given destination queue q1: ``` GET /vhosts/:vhost/bindings?dstq=q1 ``` to list all bindings between a source exchange e1 and destination queue q1: ``` GET /vhosts/:vhost/bindings?src=e1&dstq=q1 ``` multiple bindings between source exchange e1 and destination queue q1 could be deleted at once as follows: ``` DELETE /vhosts/:vhost/bindings?src=e1&dstq=q1 ``` GET could be supported globally across all vhosts: ``` /exchanges /queues /bindings ``` Publish a message: ``` POST /vhosts/:vhost/queues/:queue/messages ``` Consume or peek a message (depending on query parameters): ``` GET /vhosts/:vhost/queues/:queue/messages ``` Note that the AMQP 1.0 client omits the `/vhost/:vhost` path prefix. Since an AMQP connection belongs to a single vhost, there is no need to additionally include the vhost in every HTTP request. Pros of HTTP API v2: 1. Low level of nesting Queues, exchanges, bindings are top level entities directly under vhosts. Although the HTTP API doesn’t have to reflect how resources are stored in the database, v2 does nicely reflect the Khepri tree structure. 2. Nouns instead of verbs HTTP API v2 is very simple to read and understand as shown by ``` POST /vhosts/:vhost/queues/:queue/messages to post messages, i.e. publish to a queue. GET /vhosts/:vhost/queues/:queue/messages to get messages, i.e. consume or peek from a queue. DELETE /vhosts/:vhost/queues/:queue/messages to delete messages, i.e. purge a queue. ``` A separate new HTTP API v2 allows us to ship only handlers for HTTP over AMQP for RabbitMQ 4.0 and therefore move faster while still keeping the option on the table to re-use the new v2 API for pure HTTP in the future. In contrast, re-using the HTTP API v1 for HTTP over AMQP is possible, but dirty because separate handlers (HTTP over AMQP and pure HTTP) replying differently will be needed for the same v1 endpoints. |
|
|
afd28ba76d | Apply PR feedback | |
|
8cb313d5a1 |
Support AMQP 1.0 natively
## What Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0. By "native", we mean do not proxy via AMQP 0.9.1 anymore. ## Why Native AMQP 1.0 comes with the following major benefits: 1. Similar to Native MQTT, this commit provides better throughput, latency, scalability, and resource usage for AMQP 1.0. See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements. See further below for some benchmarks. 2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol, this commit allows implementing more AMQP 1.0 features in the future. Some features are already implemented in this commit (see next section). 3. Simpler, better understandable, and more maintainable code. Native AMQP 1.0 as implemented in this commit has the following major benefits compared to AMQP 0.9.1: 4. Memory and disk alarms will only stop accepting incoming TRANSFER frames. New connections can still be created to consume from RabbitMQ to empty queues. 5. Due to 4. no need anymore for separate connections for publishers and consumers as we currently recommended for AMQP 0.9.1. which potentially halves the number of physical TCP connections. 6. When a single connection sends to multiple target queues, a single slow target queue won't block the entire connection. Publisher can still send data quickly to all other target queues. 7. A publisher can request whether it wants publisher confirmation on a per-message basis. In AMQP 0.9.1 publisher confirms are configured per channel only. 8. Consumers can change their "prefetch count" dynamically which isn't possible in our AMQP 0.9.1 implementation. See #10174 9. AMQP 1.0 is an extensible protocol This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in RabbitMQ 3.x - most of which cannot be backported due to the complexity and limitations of the old 3.x implementation. This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0. ## Implementation details 1. Breaking change: With Native AMQP, the behaviour of ``` Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false) Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false) ``` will break because we always convert according to the message container conversions. For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties. Also, `false` won’t be respected since we always convert the headers with message containers. 2. Remove rabbit_queue_collector rabbit_queue_collector is responsible for synchronously deleting exclusive queues. Since the AMQP 1.0 plugin never creates exclusive queues, rabbit_queue_collector doesn't need to be started in the first place. This will save 1 Erlang process per AMQP 1.0 connection. 3. 7 processes per connection + 1 process per session in this commit instead of 7 processes per connection + 15 processes per session in 3.x Supervision hierarchy got re-designed. 4. Use 1 writer process per AMQP 1.0 connection AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel. Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session. Advantage of single writer proc per session (prior to this commit): * High parallelism for serialising packets if multiple sessions within a connection write heavily at the same time. This commit uses a single writer process per AMQP 1.0 connection that is shared across all AMQP 1.0 sessions. Advantages of single writer proc per connection (this commit): * Lower memory usage with hundreds of thousands of AMQP 1.0 sessions * Less TCP and IP header overhead given that the single writer process can accumulate across all sessions bytes before flushing the socket. In other words, this commit decides that a reader / writer process pair per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows. Having a writer per session is too heavy. We still ensure high throughput by having separate reader, writer, and session processes. 5. Transform rabbit_amqp1_0_writer into gen_server Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output. How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes. 6. Remove stats timer from writer AMQP 1.0 connections haven't emitted any stats previously. 7. When there are contiguous queue confirmations in the session process mailbox, batch them. When the confirmations are sent to the publisher, a single DISPOSITION frame is sent for contiguously confirmed delivery IDs. This approach should be good enough. However it's sub optimal in scenarios where contiguous delivery IDs that need confirmations are rare, for example: * There are multiple links in the session with different sender settlement modes and sender publishes across these links interleaved. * sender settlement mode is mixed and sender publishes interleaved settled and unsettled TRANSFERs. 8. Introduce credit API v2 Why: The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly designed since basic.credit is a synchronous call into the queue process blocking the entire AMQP 1.0 session process. How: Change the interactions between queue clients and queue server implementations: * Clients only request a credit reply if the FLOW's `echo` field is set * Include all link flow control state held by the queue process into a new credit_reply queue event: * `available` after the queue sends any deliveries * `link-credit` after the queue sends any deliveries * `drain` which allows us to combine the old queue events send_credit_reply and send_drained into a single new queue event credit_reply. * Include the consumer tag into the credit_reply queue event such that the AMQP 1.0 session process can process any credit replies asynchronously. Link flow control state `delivery-count` also moves to the queue processes. The new interactions are hidden behind feature flag credit_api_v2 to allow for rolling upgrades from 3.13 to 4.0. 9. Use serial number arithmetic in quorum queues and session process. 10. Completely bypass the rabbit_limiter module for AMQP 1.0 flow control. The goal is to eventually remove the rabbit_limiter module in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter into rabbit_queue_consumers. 11. Fix credit bug for streams: AMQP 1.0 settlements shouldn't top up link credit, only FLOW frames should top up link credit. 12. Allow sender settle mode unsettled for streams since AMQP 1.0 acknowledgements to streams are no-ops (currently). 13. Fix AMQP 1.0 client bugs Auto renewing credits should not be related to settling TRANSFERs. Remove field link_credit_unsettled as it was wrong and confusing. Prior to this commit auto renewal did not work when the sender uses sender settlement mode settled. 14. Fix AMQP 1.0 client bugs The wrong outdated Link was passed to function auto_flow/2 15. Use osiris chunk iterator Only hold messages of uncompressed sub batches in memory if consumer doesn't have sufficient credits. Compressed sub batches are skipped for non Stream protocol consumers. 16. Fix incoming link flow control Always use confirms between AMQP 1.0 queue clients and queue servers. As already done internally by rabbit_fifo_client and rabbit_stream_queue, use confirms for classic queues as well. 17. Include link handle into correlation when publishing messages to target queues such that session process can correlate confirms from target queues to incoming links. 18. Only grant more credits to publishers if publisher hasn't sufficient credits anymore and there are not too many unconfirmed messages on the link. 19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow between classic queue process and session process. 20. Link flow control is independent between links. A client can refer to a queue or to an exchange with multiple dynamically added target queues. Multiple incoming links can also fan in to the same queue. However the link topology looks like, this commit ensures that each link is only granted more credits if that link isn't overloaded. 21. A connection or a session can send to many different queues. In AMQP 0.9.1, a single slow queue will lead to the entire channel, and then entire connection being blocked. This commit makes sure that a single slow queue from one link won't slow down sending on other links. For example, having link A sending to a local classic queue and link B sending to 5 replica quorum queue, link B will naturally grant credits slower than link A. So, despite the quorum queue being slower in confirming messages, the same AMQP 1.0 connection and session can still pump data very fast into the classic queue. 22. If cluster wide memory or disk alarm occurs. Each session sends a FLOW with incoming-window to 0 to sending client. If sending clients don’t obey, force disconnect the client. If cluster wide memory alarm clears: Each session resumes with a FLOW defaulting to initial incoming-window. 23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms, specifically, attaching consumers and consuming, i.e. emptying queues. There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation. 24. Flow control summary: * If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control). * If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control. * If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied. Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path. 25. Register AMQP sessions Prefer local-only pg over our custom pg_local implementation as pg is a better process group implementation than pg_local. pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once. 26. Start a local-only pg when Rabbit boots: > A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name: > pg:start_link(node()). Register AMQP 1.0 connections and sessions with pg. In future we should remove pg_local and instead use the new local-only pg for all registered processes such as AMQP 0.9.1 connections and channels. 27. Requeue messages if link detached Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed' field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed, we expect every outstanding delivery to be requeued. In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued. Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1: "After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them." [https://www.rabbitmq.com/consumers.html#unsubscribing] An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries 28. Init AMQP session with BEGIN frame Similar to how there can't be an MQTT processor without a CONNECT frame, there can't be an AMQP session without a BEGIN frame. This allows having strict dialyzer types for session flow control fields (i.e. not allowing 'undefined'). 29. Move serial_number to AMQP 1.0 common lib such that it can be used by both AMQP 1.0 server and client 30. Fix AMQP client to do serial number arithmetic. 31. AMQP client: Differentiate between delivery-id and transfer-id for better understandability. 32. Fix link flow control in classic queues This commit fixes ``` java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0 ``` followed by ``` ./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2 ``` Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around 8 - 10,000 messages. The bug was that in flight messages from classic queue process to session process were not taken into account when topping up credit to the classic queue process. Fixes #2597 The solution to this bug (and a much cleaner design anyway independent of this bug) is that queues should hold all link flow control state including the delivery-count. Hence, when credit API v2 is used the delivery-count will be held by the classic queue process, quorum queue process, and stream queue client instead of managing the delivery-count in the session. 33. The double level crediting between (a) session process and rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was removed. Therefore, instead of managing 3 separate delivery-counts (i. session, ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used in rabbit_fifo. This is a big simplification. 34. This commit fixes quorum queues without bumping the machine version nor introducing new rabbit_fifo commands. Whether credit API v2 is used is solely determined at link attachment time depending on whether feature flag credit_api_v2 is enabled. Even when that feature flag will be enabled later on, this link will keep using credit API v1 until detached (or the node is shut down). Eventually, after feature flag credit_api_v2 has been enabled and a subsequent rolling upgrade, all links will use credit API v2. This approach is safe and simple. The 2 alternatives to move delivery-count from the session process to the queue processes would have been: i. Explicit feature flag credit_api_v2 migration function * Can use a gen_server:call and only finish migration once all delivery-counts were migrated. Cons: * Extra new message format just for migration is required. * Risky as migration will fail if a target queue doesn’t reply. ii. Session always includes DeliveryCountSnd when crediting to the queue: Cons: * 2 delivery counts will be hold simultaneously in session proc and queue proc; could be solved by deleting the session proc’s delivery-count for credit-reply * What happens if the receiver doesn’t provide credit for a very long time? Is that a problem? 35. Support stream filtering in AMQP 1.0 (by @acogoluegnes) Use the x-stream-filter-value message annotation to carry the filter value in a published message. Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered filters when creating a receiver that wants to filter out messages from a stream. 36. Remove credit extension from AMQP 0.9.1 client 37. Support maintenance mode closing AMQP 1.0 connections. 38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation. 39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default. The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment tools from failing that execute: ``` rabbitmq-plugins enable rabbitmq_amqp1_0 rabbitmq-plugins disable rabbitmq_amqp1_0 ``` 40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`. Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`: ``` rabbitmqctl list_connections protocol Listing connections ... protocol {1, 0} {0,9,1} ``` ## Benchmarks ### Throughput & Latency Setup: * Single node Ubuntu 22.04 * Erlang 26.1.1 Start RabbitMQ: ``` make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3" ``` Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1. Start client: https://github.com/ssorj/quiver https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64) ``` docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest bash-5.1# quiver --version quiver 0.4.0-SNAPSHOT ``` 1. Classic queue ``` quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000 ``` This commit: ``` Count ............................................. 1,000,000 messages Duration ............................................... 73.8 seconds Sender rate .......................................... 13,548 messages/s Receiver rate ........................................ 13,547 messages/s End-to-end rate ...................................... 13,547 messages/s Latencies by percentile: 0% ........ 0 ms 90.00% ........ 9 ms 25% ........ 2 ms 99.00% ....... 14 ms 50% ........ 4 ms 99.90% ....... 17 ms 100% ....... 26 ms 99.99% ....... 24 ms ``` RabbitMQ 3.x (main branch as of 30 January 2024): ``` ---------------------- Sender ----------------------- --------------------- Receiver ---------------------- -------- Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms] ----------------------------------------------------- ----------------------------------------------------- -------- 2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511 4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0 6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0 8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662 10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0 12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0 14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147 16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0 18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0 20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0 22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0 receiver timed out 24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0 quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1. Traceback (most recent call last): File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run _plano.wait(receiver, check=True) File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait raise PlanoProcessError(proc) plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1. ``` 2. Quorum queue: ``` quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000 ``` This commit: ``` Count ............................................. 1,000,000 messages Duration .............................................. 101.4 seconds Sender rate ........................................... 9,867 messages/s Receiver rate ......................................... 9,868 messages/s End-to-end rate ....................................... 9,865 messages/s Latencies by percentile: 0% ....... 11 ms 90.00% ....... 23 ms 25% ....... 15 ms 99.00% ....... 28 ms 50% ....... 18 ms 99.90% ....... 33 ms 100% ....... 49 ms 99.99% ....... 47 ms ``` RabbitMQ 3.x: ``` ---------------------- Sender ----------------------- --------------------- Receiver ---------------------- -------- Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms] ----------------------------------------------------- ----------------------------------------------------- -------- 2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221 4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168 6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0 8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0 10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0 12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0 receiver timed out 14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0 quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1. Traceback (most recent call last): File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run _plano.wait(receiver, check=True) File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait raise PlanoProcessError(proc) plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1. ``` 3. Stream: ``` quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose ``` This commit: ``` Count ............................................. 1,000,000 messages Duration ................................................ 8.7 seconds Message rate ........................................ 115,154 messages/s ``` RabbitMQ 3.x: ``` Count ............................................. 1,000,000 messages Duration ............................................... 21.2 seconds Message rate ......................................... 47,232 messages/s ``` ### Memory usage Start RabbitMQ: ``` ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf" ``` ``` /bin/cat rabbitmq.conf tcp_listen_options.sndbuf = 2048 tcp_listen_options.recbuf = 2048 vm_memory_high_watermark.relative = 0.95 vm_memory_high_watermark_paging_ratio = 0.95 loopback_users = none ``` Create 50k connections with 2 sessions per connection, i.e. 100k session in total: ```go package main import ( "context" "log" "time" "github.com/Azure/go-amqp" ) func main() { for i := 0; i < 50000; i++ { conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()}) if err != nil { log.Fatal("dialing AMQP server:", err) } _, err = conn.NewSession(context.TODO(), nil) if err != nil { log.Fatal("creating AMQP session:", err) } _, err = conn.NewSession(context.TODO(), nil) if err != nil { log.Fatal("creating AMQP session:", err) } } log.Println("opened all connections") time.Sleep(5 * time.Hour) } ``` This commit: ``` erlang:memory(). [{total,4586376480}, {processes,4025898504}, {processes_used,4025871040}, {system,560477976}, {atom,1048841}, {atom_used,1042841}, {binary,233228608}, {code,21449982}, {ets,108560464}] erlang:system_info(process_count). 450289 ``` 7 procs per connection + 1 proc per session. (7 + 2*1) * 50,000 = 450,000 procs RabbitMQ 3.x: ``` erlang:memory(). [{total,15168232704}, {processes,14044779256}, {processes_used,14044755120}, {system,1123453448}, {atom,1057033}, {atom_used,1052587}, {binary,236381264}, {code,21790238}, {ets,391423744}] erlang:system_info(process_count). 1850309 ``` 7 procs per connection + 15 per session (7 + 2*15) * 50,000 = 1,850,000 procs 50k connections + 100k session require with this commit: 4.5 GB in RabbitMQ 3.x: 15 GB ## Future work 1. More efficient parser and serializer 2. TODO in mc_amqp: Do not store the parsed message on disk. 3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP clients to create RabbitMQ objects (queues, exchanges, ...). |
|
|
8b151f43f7 |
Parse 2 bytes encoded AMQP boolean to Erlang boolean
An AMQP boolean can by encoded using 1 byte or 2 bytes: https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean Prior to this commit, our Erlang parser returned: * Erlang terms `true` or `false` for the 1 byte AMQP encoding * Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding Having a serializer and parser that perform the opposite actions such that ``` Term = parse(serialize(Term)) ``` is desirable as it provides a symmetric property useful not only for property based testing, but also for avoiding altering message hashes when serializing and parsing the same term. However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since all Erlang code must take care of both forms leading to subtle bugs as occurred in: * |
|
|
01092ff31f
|
(c) year bumps | |
|
e1d09fbba6 | Replaced true | false by boolean() | |
|
1b642353ca
|
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023 |
|
|
119f034406
|
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception. Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol. This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point. The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed. This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt. COMMIT HISTORY: * Refactor away from using the delivery{} record In many places including exchange types. This should make it easier to move towards using a message container type instead of basic_message. Add mc module and move direct replies outside of exchange Lots of changes incl classic queues Implement stream support incl amqp conversions simplify mc state record move mc.erl mc dlx stuff recent history exchange Make tracking work But doesn't take a protocol agnostic approach as we just convert everything into AMQP legacy and back. Might be good enough for now. Tracing as a whole may want a bit of a re-vamp at some point. tidy make quorum queue peek work by legacy conversion dead lettering fixes dead lettering fixes CMQ fixes rabbit_trace type fixes fixes fix Fix classic queue props test assertion fix feature flag and backwards compat Enable message_container feature flag in some SUITEs Dialyzer fixes fixes fix test fixes Various Manually update a gazelle generated file until a gazelle enhancement can be made https://github.com/rabbitmq/rules_erlang/issues/185 Add message_containers_SUITE to bazel and regen bazel files with gazelle from rules_erlang@main Simplify essential proprty access Such as durable, ttl and priority by extracting them into annotations at message container init time. Move type to remove dependenc on amqp10 stuff in mc.erl mostly because I don't know how to make bazel do the right thing add more stuff Refine routing header stuff wip Cosmetics Do not use "maybe" as type name as "maybe" is a keyword since OTP 25 which makes Erlang LS complain. * Dedup death queue names * Fix function clause crashes Fix failing tests in the MQTT shared_SUITE: A classic queue message ID can be undefined as set in |
|
|
55442aa914 |
Replace @rabbitmq.com addresses with rabbitmq-core@groups.vmware.com
Don't ask why we have to do it. Because reasons! |
|
|
eb94a58bc9 |
Add a workflow to compare the bazel/erlang.mk output
To catch any drift between the builds |
|
|
a944439fba |
Replace globs in bazel with explicit lists of files
As this is preferred in rules_erlang 3.9.14 |