Commit Graph

37 Commits

Author SHA1 Message Date
Jean-Sébastien Pédron e58eb1807a
Move `file_handle_cache` and `vm_memory_monitor` back to `rabbit`
[Why]
They were moved from `rabbit` to `rabbit_common` several years ago to
solve an dependency issue because `amqp_client` depended on the file
handle cache. This is not the case anymore.

[How]
The modules are moved back to `rabbit`.

`rabbit_common` doesn't need to depend on `os_mon` anymore. `rabbit`
already depends on it, so no changes needed here.

`include/rabbit_memory.hrl` and some test cases are moved as well to
follow the `vm_memory_monitor` module.
2025-05-07 09:46:14 +02:00
Loïc Hoguin 53444107b5
Add dynamic buffer functionality to rabbit_reader
The `buffer` socket option will be changed dynamically
based on how much data is received.

This is restricted to AMQP protocols (old and 1.0).

The algorithm is a little different than Cowboy 2.13.
The moving average is less reactive (div 8 instead of 2)
and floats are used so that using smaller lower buffer
values is possible (otherwise the rounding prevents
increasing buffer sizes). The lower buffer size was
set to 128 as a result.

Compared to the previous which was to set `buffer` to
`rcvbuf` effectively, often to 131072 on Linux for
example, the performance sees a slight improvement
in various scenarios for all message sizes using
AMQP-0.9.1 and a lower memory usage as well. But
the difference is small in the benchmarks we have
run (5% to 10%), whereas Cowboy saw a huge improvement
because its default was very small (1460).

For AMQP-1.0 this seems to be no worse but we didn't
detect a clear improvement. We saw scenarios where
small message sizes showed improvement, and large
message sizes showed a regression. But we are even
less confident with these results. David (AMQP-1.0
native developer) ran a few tests and didn't see a
regression.

The dynamic buffer code is currently identical for
old and 1.0 AMQP. But we might tweak them differently
in the future so they're left as duplicate for now.
This is because different protocols have different
behaviors and so the algorithm may need to be tweaked
differently for each protocol.
2025-02-27 12:46:28 +01:00
David Ansari 06ec8f0342 Orderly shutdown of sessions
Make AMQP 1.0 connection shut down its sessions before sending the
close frame to the client similar to how the AMQP 0.9.1 connection
shuts down its channels before closing the connection.

This commit avoids concurrent deletion of exclusive queues by the session process
and the classic queue process.
This commit should also fix https://github.com/rabbitmq/rabbitmq-server/issues/2596
2025-02-12 17:17:28 +01:00
David Ansari 579c58603e Support AMQP over WebSocket (OSS part) 2025-01-27 17:50:47 +01:00
Michael Klishin 968eefa1bb
Bump (c) line year
There are no functional changes to this massive diff.
2025-01-01 17:54:10 -05:00
Michael Davis c3c7675bda
rabbit_khepri: Add macros for path patterns 2024-11-22 11:21:11 -05:00
Michael Davis e8fb9b6889
rabbit: Move include/{khepri.hrl => rabbit_khepri.hrl}
This fixes erlang_ls's header resolution. Previously it would confuse
the include_lib of the `khepri.hrl` from Khepri with this header in
the rabbit app.

This header is also specific to how rabbit uses Khepri so I think the
new name fits better.
2024-11-22 11:21:11 -05:00
David Ansari f78f14ab1d Display container-id in the UI and CLI 2024-09-13 17:05:46 +02:00
Jean-Sébastien Pédron 94b8689284
Reorganize data in the Khepri store
[Why]

The previous layout followed the flat structure we have in Mnesia:
* In Mnesia, we have tables named after each purpose (exchanges, queues,
  runtime parameters and so on).
* In Khepri, we had about the same but the table names were replaced by
  a tree node in the tree. We ended up with one tree node per purpose
  at the root of the tree.

Khepri implements a tree. We could benefit from this and organize data
to reflect their relationship in RabbitMQ.

[How]

Here is the new hierarchy implemented by this commit:

    rabbitmq
    |-- users
    |   `-- $username
    |-- vhosts
    |   `-- $vhost
    |       |-- user_permissions
    |       |   `-- $username
    |       |-- exchanges
    |       |   `-- $exchange
    |       |       |-- bindings
    |       |       |   |-- queue
    |       |       |   |   `-- $queue
    |       |       |   `-- exchange
    |       |       |       `-- $exchange
    |       |       |-- consistent_hash_ring_state
    |       |       |-- jms_topic
    |       |       |-- recent_history
    |       |       |-- serial
    |       |       `-- user_permissions
    |       |           `-- $username
    |       |-- queues
    |       |   `-- $queue
    |       `-- runtime_params
    |           `-- $param_name
    |-- runtime_params
    |   `-- $param_name
    |-- mirrored_supervisors
    |   `-- $group
    |       `-- $id
    `-- node_maintenance
        `-- $node

We first define a root path in `rabbit/include/khepri.hrl` as
`[rabbitmq]`. This could be anything, including an empty path.

All paths are constructed either from this root path definition (users
and vhosts paths do that), or from a parent resource's path (exchanges
and queues paths are based on a vhost path).
2024-09-05 15:31:29 +02:00
Michael Klishin 18705dde66 Closes #11434 2024-06-12 12:37:09 -04:00
David Ansari 5f659b5eb9 Support AMQP tracing
All incoming and outgoing AMQP frames excluding payload will be logged.

In addition, HTTP over AMQP management requests and response payloads
will be logged.

Each trace includes the module name, function name and module line which
outputs the trace.

Example log output:
```
[debug] <0.823.0> rabbit_amqp_reader:parse_frame_body/2 372
[debug] <0.823.0> channel 1 ->
[debug] <0.823.0>  {'v1_0.transfer',[{handle,{uint,0}},
[debug] <0.823.0>                    {delivery_id,{uint,4294967295}},
[debug] <0.823.0>                    {delivery_tag,{binary,<<>>}},
[debug] <0.823.0>                    {message_format,{uint,0}},
[debug] <0.823.0>                    {settled,true},
[debug] <0.823.0>                    {more,false},
[debug] <0.823.0>                    {rcv_settle_mode,undefined},
[debug] <0.823.0>                    {state,undefined},
[debug] <0.823.0>                    {resume,undefined},
[debug] <0.823.0>                    {aborted,undefined},
[debug] <0.823.0>                    {batchable,undefined}]}
[debug] <0.823.0>  followed by 100 bytes of payload
[debug] <0.823.0>
[debug] <0.828.0> rabbit_amqp_management:handle_request/5 69
[debug] <0.828.0> HTTP over AMQP request:
[debug] <0.828.0>  [{'v1_0.properties',
[debug] <0.828.0>       [{message_id,{binary,<<74,195,231,105,234,167,156,6>>}},
[debug] <0.828.0>        {user_id,undefined},
[debug] <0.828.0>        {to,{utf8,<<"/queues/q1">>}},
[debug] <0.828.0>        {subject,{utf8,<<"PUT">>}},
[debug] <0.828.0>        {reply_to,{utf8,<<"$me">>}},
[debug] <0.828.0>        {correlation_id,undefined},
[debug] <0.828.0>        {content_type,undefined},
[debug] <0.828.0>        {content_encoding,undefined},
[debug] <0.828.0>        {absolute_expiry_time,undefined},
[debug] <0.828.0>        {creation_time,undefined},
[debug] <0.828.0>        {group_id,undefined},
[debug] <0.828.0>        {group_sequence,undefined},
[debug] <0.828.0>        {reply_to_group_id,undefined}]},
[debug] <0.828.0>   {'v1_0.amqp_value',
[debug] <0.828.0>       [{content,
[debug] <0.828.0>            {map,
[debug] <0.828.0>                [{{utf8,<<"durable">>},true},
[debug] <0.828.0>                 {{utf8,<<"arguments">>},
[debug] <0.828.0>                  {map,
[debug] <0.828.0>                      [{{utf8,<<"x-queue-type">>},{utf8,<<"quorum">>}}]}}]}}]}]
[debug] <0.828.0> HTTP over AMQP response:
[debug] <0.828.0>  [{'v1_0.properties',
[debug] <0.828.0>       [{message_id,undefined},
[debug] <0.828.0>        {user_id,undefined},
[debug] <0.828.0>        {to,undefined},
[debug] <0.828.0>        {subject,{utf8,<<"201">>}},
[debug] <0.828.0>        {reply_to,undefined},
[debug] <0.828.0>        {correlation_id,{binary,<<74,195,231,105,234,167,156,6>>}},
[debug] <0.828.0>        {content_type,undefined},
[debug] <0.828.0>        {content_encoding,undefined},
[debug] <0.828.0>        {absolute_expiry_time,undefined},
[debug] <0.828.0>        {creation_time,undefined},
[debug] <0.828.0>        {group_id,undefined},
[debug] <0.828.0>        {group_sequence,undefined},
[debug] <0.828.0>        {reply_to_group_id,undefined}]},
[debug] <0.828.0>   {'v1_0.application_properties',
[debug] <0.828.0>       [{content,[{{utf8,<<"http:response">>},{utf8,<<"1.1">>}}]}]},
[debug] <0.828.0>   {'v1_0.amqp_value',
[debug] <0.828.0>       [{content,
[debug] <0.828.0>            {map,
[debug] <0.828.0>                [{{utf8,<<"leader">>},{utf8,<<"rabbit@VQD7JFK37T">>}},
[debug] <0.828.0>                 {{utf8,<<"replicas">>},
[debug] <0.828.0>                  {array,utf8,[{utf8,<<"rabbit@VQD7JFK37T">>}]}},
[debug] <0.828.0>                 {{utf8,<<"message_count">>},{ulong,0}},
[debug] <0.828.0>                 {{utf8,<<"consumer_count">>},{uint,0}},
[debug] <0.828.0>                 {{utf8,<<"name">>},{utf8,<<"q1">>}},
[debug] <0.828.0>                 {{utf8,<<"vhost">>},{utf8,<<"/">>}},
[debug] <0.828.0>                 {{utf8,<<"durable">>},{boolean,true}},
[debug] <0.828.0>                 {{utf8,<<"auto_delete">>},{boolean,false}},
[debug] <0.828.0>                 {{utf8,<<"exclusive">>},{boolean,false}},
[debug] <0.828.0>                 {{utf8,<<"type">>},{utf8,<<"quorum">>}},
[debug] <0.828.0>                 {{utf8,<<"arguments">>},
[debug] <0.828.0>                  {map,
[debug] <0.828.0>                      [{{utf8,<<"x-queue-type">>},{utf8,<<"quorum">>}}]}}]}}]}]
[debug] <0.828.0>
[debug] <0.826.0> rabbit_amqp_writer:assemble_frame/4 215
[debug] <0.826.0> channel 1 <-
[debug] <0.826.0>  {'v1_0.transfer',[{handle,{uint,1}},
[debug] <0.826.0>                    {delivery_id,{uint,0}},
[debug] <0.826.0>                    {delivery_tag,{binary,<<>>}},
[debug] <0.826.0>                    {message_format,{uint,0}},
[debug] <0.826.0>                    {settled,true},
[debug] <0.826.0>                    {more,undefined},
[debug] <0.826.0>                    {rcv_settle_mode,undefined},
[debug] <0.826.0>                    {state,undefined},
[debug] <0.826.0>                    {resume,undefined},
[debug] <0.826.0>                    {aborted,undefined},
[debug] <0.826.0>                    {batchable,undefined}]}
[debug] <0.826.0>  followed by 268 bytes of payload
[debug] <0.826.0>
```
2024-06-05 13:38:15 +02:00
Diana Parra Corbacho 3bbda5bdba Remove classic mirror queues 2024-06-04 13:00:31 +02:00
David Ansari 6b300a2f34 Fix dead lettering
# What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
2024-05-13 11:00:39 +02:00
David Ansari 95c5508060 Provide more descriptive type spec 2024-04-21 11:35:19 +02:00
David Ansari 8a3f3c6f34 Enable AMQP 1.0 clients to manage topologies
## What?

* Allow AMQP 1.0 clients to dynamically create and delete RabbitMQ
  topologies (exchanges, queues, bindings).
* Provide an Erlang AMQP 1.0 client that manages topologies.

 ## Why?

Today, RabbitMQ topologies can be created via:
* [Management HTTP API](https://www.rabbitmq.com/docs/management#http-api)
  (including Management UI and
  [messaging-topology-operator](https://github.com/rabbitmq/messaging-topology-operator))
* [Definition Import](https://www.rabbitmq.com/docs/definitions#import)
* AMQP 0.9.1 clients

Up to RabbitMQ 3.13 the RabbitMQ AMQP 1.0 plugin auto creates queues
and bindings depending on the terminus [address
format](https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing).

Such implicit creation of topologies is limiting and obscure.
For some address formats, queues will be created, but not deleted.

Some of RabbitMQ's success is due to its flexible routing topologies
that AMQP 0.9.1 clients can create and delete dynamically.

This commit allows dynamic management of topologies for AMQP 1.0 clients.
This commit builds on top of Native AMQP 1.0 (PR #9022) and will be
available in RabbitMQ 4.0.

 ## How?

This commits adds the following management operations for AMQP 1.0 clients:
* declare queue
* delete queue
* purge queue
* bind queue to exchange
* unbind queue from exchange
* declare exchange
* delete exchange
* bind exchange to exchange
* unbind exchange from exchange

Hence, at least the AMQP 0.9.1 management operations are supported for
AMQP 1.0 clients.

In addition the operation
* get queue

is provided which - similar to `declare queue` - returns queue
information including the current leader and replicas.
This allows clients to publish or consume locally on the node that hosts
the queue.

Compared to AMQP 0.9.1 whose commands and command fields are fixed, the
new AMQP Management API is extensible: New operations and new fields can
easily be added in the future.

There are different design options how management operations could be
supported for AMQP 1.0 clients:
1. Use a special exchange type as done in https://github.com/rabbitmq/rabbitmq-management-exchange
  This has the advantage that any protocol client (e.g. also STOMP clients) could
  dynamically manage topologies. However, a special exchange type is the wrong abstraction.
2. Clients could send "special" messages with special headers that the broker interprets.

This commit decided for a variation of the 2nd option using a more
standardized way by re-using a subest of the following latest AMQP 1.0 extension
specifications:
* [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021)
* [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65571) (July 2019)
* [AMQP Management Version 1.0 - Working Draft 16](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65575) (July 2019)

An important goal is to keep the interaction between AMQP 1.0 client and RabbitMQ
simple to increase usage, development and adoptability of future RabbitMQ AMQP 1.0
client library wrappers.

The AMQP 1.0 client has to create a link pair to the special `/management` node.
This allows the client to send and receive from the management node.
Similar to AMQP 0.9.1, there is no need for a reply queue since the reply
will be sent directly to the client.

Requests and responses are modelled via HTTP, but sent via AMQP using
the `HTTP Semantics and Content over AMQP` extension (henceforth `HTTP
over AMQP` extension).

This commit tries to follow the `HTTP over AMQP` extension as much as
possible but deviates where this draft spec doesn't make sense.

The projected mode §4.1 is used as opposed to tunneled mode §4.2.
A named relay `/management` is used (§6.3) where the message field `to` is the URL.

Deviations are
* §3.1 mandates that URIs are not encoded in an AMQP message.
  However, we percent encode URIs in the AMQP message. Otherwise there
  is for example no way to distinguish a `/` in a queue name from the
  URI path separator `/`.
* §4.1.4 mandates a data section. This commit uses an amqp-value section
  as it's a better fit given that the content is AMQP encoded data.

Using an HTTP API allows for a common well understood interface and future extensibility.
Instead of re-using the current RabbitMQ HTTP API, this commit uses a
new HTTP API (let's call it v2) which could be used as a future API for
plain HTTP clients.

 ### HTTP API v1

The current HTTP API (let's call it v1) is **not** used since v1
comes with a couple of weaknesses:

1. Deep level of nesting becomes confusing and difficult to manage.

Examples of deep nesting in v1:
```
/api/bindings/vhost/e/source/e/destination/props
/api/bindings/vhost/e/exchange/q/queue/props
```

2. Redundant endpoints returning the same resources

v1 has 9 endpoints to list binding(s):
```
/api/exchanges/vhost/name/bindings/source
/api/exchanges/vhost/name/bindings/destination
/api/queues/vhost/name/bindings
/api/bindings
/api/bindings/vhost
/api/bindings/vhost/e/exchange/q/queue
/api/bindings/vhost/e/exchange/q/queue/props
/api/bindings/vhost/e/source/e/destination
/api/bindings/vhost/e/source/e/destination/props
```

3. Verbs in path names
Path names should be nouns instead.
v1 contains verbs:
```
/api/queues/vhost/name/get
/api/exchanges/vhost/name/publish
```

 ### AMQP Management extension

Only few aspects of the AMQP Management extension are used.

The central idea of the AMQP management spec is **dynamic discovery** such that broker independent AMQP 1.0
clients can discover objects, types, operations, and HTTP endpoints of specific brokers.
In fact, clients are only conformant if:
> All request addresses are dynamically discovered starting from the discovery document.
> A requesting container MUST NOT use fixed assumptions about the addressing structure of the management API.

While this is a nice and powerful idea, no AMQP 1.0 client and no AMQP 1.0 server implement the
latest AMQP 1.0 management spec from 2019, partly presumably due to its complexity.
Therefore, the idea of such dynamic discovery has failed to be implemented in practice.

The AMQP management spec mandates that the management endpoint returns a discovery document containing
broker specific collections, types, configuration, and operations including their endpoints.

The API endpoints of the AMQP management spec are therefore all designed around dynamic discovery.

For example, to create either a queue or an exchange, the client has to
```
POST /$management/entities
```
which shows that the entities collection acts as a generic factory, see section 2.2.
The server will then create the resource and reply with a location header containing a URI pointing to the resource.
For RabbitMQ, we don’t need such a generic factory to create queues or exchanges.

To list bindings for a queue Q1, the spec suggests
```
GET /$management/Queues/Q1/$management/entities
```
which again shows the generic entities endpoint as well as a `$management` endpoint under Q1 to
allow a queue to return a discovery document.
For RabbitMQ, we don’t need such generic endpoints and discovery documents.

Given we aim for our own thin RabbitMQ AMQP 1.0 client wrapper libraries which expose
the RabbitMQ model to the developer, we can directly use fixed HTTP endpoint assumptions
in our RabbitMQ specific libraries.

This is by far simpler than using the dynamic endpoints of the management spec.
Simplicity leads to higher adoption and enables more developers to write RabbitMQ AMQP 1.0 client
library wrappers.

The AMQP Management extension also suffers from deep level of nesting in paths
Examples:
```
/$management/Queues/Q1/$management/entities
/$management/Queues/Q1/Bindings/Binding1
```
as well as verbs in path names: Section 7.1.4 suggests using verbs in path names,
for example “purge”, due to the dynamic operations discovery document.

 ### HTTP API v2

This commit introduces a new HTTP API v2 following best practices.
It could serve as a future API for plain HTTP clients.

This commit and RabbitMQ 4.0 will only implement a minimal set of
HTTP API v2 endpoints and only for HTTP over AMQP.
In other words, the existing HTTP API v1 Cowboy handlers will continue to be
used for all plain HTTP requests in RabbitMQ 4.0 and will remain untouched for RabbitMQ 4.0.
Over time, after 4.0 shipped, we could ship a pure HTTP API implementation for HTTP API v2.
Hence, the new HTTP API v2 endpoints for HTTP over AMQP should be designed such that they
can be re-used in the future for a pure HTTP implementation.

The minimal set of endpoints for RabbitMQ 4.0 are:

``
GET / PUT / DELETE
/vhosts/:vhost/queues/:queue
```
read, create, delete a queue

```
DELETE
/vhosts/:vhost/queues/:queue/messages
```
purges a queue

```
GET / DELETE
/vhosts/:vhost/bindings/:binding
```
read, delete bindings
where `:binding` is a binding ID of the following path segment:
```
src=e1;dstq=q2;key=my-key;args=
```
Binding arguments `args` has an empty value by default, i.e. there are no binding arguments.
If the binding includes binding arguments, `args` will be an Erlang portable term hash
provided by the server similar to what’s provided in HTTP API v1 today.
Alternatively, we could use an arguments scheme of:
```
args=k1,utf8,v1&k2,uint,3
```
However, such a scheme leads to long URIs when there are many binding arguments.
Note that it’s perfectly fine for URI producing applications to include URI
reserved characters `=` / `;` / `,` / `$` in a path segment.

To create a binding, the client therefore needs to POST to a bindings factory URI:
```
POST
/vhosts/:vhost/bindings
```

To list all bindings between a source exchange e1 and destination exchange e2 with binding key k1:
```
GET
/vhosts/:vhost/bindings?src=e1&dste=e2&key=k1
```

This endpoint will be called by the RabbitMQ AMQP 1.0 client library to unbind a
binding with non-empty binding arguments to get the binding ID before invoking a
```
DELETE
/vhosts/:vhost/bindings/:binding
```

In future, after RabbitMQ 4.0 shipped, new API endpoints could be added.
The following is up for discussion and is only meant to show the clean and simple design of HTTP API v2.

Bindings endpoint can be queried as follows:

to list all bindings for a given source exchange e1:
```
GET
/vhosts/:vhost/bindings?src=e1
```

to list all bindings for a given destination queue q1:
```
GET
/vhosts/:vhost/bindings?dstq=q1
```

to list all bindings between a source exchange e1 and destination queue q1:
```
GET
/vhosts/:vhost/bindings?src=e1&dstq=q1
```

multiple bindings between source exchange e1 and destination queue q1 could be deleted at once as follows:
```
DELETE /vhosts/:vhost/bindings?src=e1&dstq=q1
```

GET could be supported globally across all vhosts:
```
/exchanges
/queues
/bindings
```

Publish a message:
```
POST
/vhosts/:vhost/queues/:queue/messages
```

Consume or peek a message (depending on query parameters):
```
GET
/vhosts/:vhost/queues/:queue/messages
```

Note that the AMQP 1.0 client omits the `/vhost/:vhost` path prefix.
Since an AMQP connection belongs to a single vhost, there is no need to
additionally include the vhost in every HTTP request.

Pros of HTTP API v2:

1. Low level of nesting

Queues, exchanges, bindings are top level entities directly under vhosts.
Although the HTTP API doesn’t have to reflect how resources are stored in the database,
v2 does nicely reflect the Khepri tree structure.

2. Nouns instead of verbs
HTTP API v2 is very simple to read and understand as shown by
```
POST    /vhosts/:vhost/queues/:queue/messages	to post messages, i.e. publish to a queue.
GET     /vhosts/:vhost/queues/:queue/messages	to get messages, i.e. consume or peek from a queue.
DELETE  /vhosts/:vhost/queues/:queue/messages	to delete messages, i.e. purge a queue.
```

A separate new HTTP API v2 allows us to ship only handlers for HTTP over AMQP for RabbitMQ 4.0
and therefore move faster while still keeping the option on the table to re-use the new v2 API
for pure HTTP in the future.
In contrast, re-using the HTTP API v1 for HTTP over AMQP is possible, but dirty because separate handlers
(HTTP over AMQP and pure HTTP) replying differently will be needed for the same v1 endpoints.
2024-03-28 11:36:56 +01:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
Jean-Sébastien Pédron 9dcf7c322e
Revert "Merge pull request #10096 from rabbitmq/upgrade-mirroring-sup-childspec"
This reverts commit 75773e6d9b, reversing
changes made to e40367f264.
2024-02-13 13:29:16 +01:00
Michael Klishin d83ae8c9c7
More missed (c) header updates 2024-01-23 11:26:29 -05:00
Michael Klishin 7b151a7651 More missed (c) header updates 2024-01-22 23:44:47 -05:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
Michael Klishin 9f3efa8eb0 Move mirrored_supervisor.hrl to deps/rabbit/include 2023-12-13 15:03:59 -05:00
David Ansari 95c5f2ec9e Some small fixes 2023-11-16 12:33:17 +01:00
Karl Nilsson c4fd947aad MC: various changes and improvements
To refine conversion behaviour add additional tests
and ensure it matches the documentation.

mc: optionally capture source environment

And pass target environment to mc:convert

This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
2023-11-15 11:04:49 +00:00
Diana Parra Corbacho 2d4854c443 Khepri migration: match expected records
During metadata store migration, Mnesia might generate messages
for Table-Key pairs for in-flight writes.
These are skipped by mnesia-khepri-migration, as do not contain
enough info to copy the record, but we ensure here that none
comes through and we try to process it.

The event with Table-Record is generated at the time of write,
which is properly processed after the initial copy of data.
2023-10-30 09:09:53 +01:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
David Ansari 0f5fe8fadd Add Prometheus metric messages dropped by MQTT QoS 0 queue type
Why:
A RabbitMQ operator should be able to see whether RabbitMQ drops MQTT
QoS 0 messages due to overload protection. It's an indication that an
MQTT subscriber does not consume fast enough.

How:
Use Prometheus global counters.

There are 2 valid solutions:
1. Introduce a new metric called messages_dropped specifically for the
   rabbitmq_mqtt_qos0_queue type. This would work in a similar fashion
   how streams extends the per protocol global counters, but requires
   extending the per protocol & queue type global counters for the MQTT
   QoS queue type. The emitted metrics would look as follows:
```
rabbitmq_global_messages_dropped_total{protocol="mqtt310",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt311",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt50",queue_type="rabbit_mqtt_qos0_queue"} 0
```
2. Reuse the existing metric rabbitmq_global_messages_dead_lettered_maxlen_total

This commit decides to go for the 2nd approach because:
a) there is no need to add a new metric. Even though dead lettering is not supported
for the MQTT QoS 0 queue type, this metric maps nicely to
what happens: The queue drop messages since itx max length
(mqtt.mailbox_soft_limit) is exceeded with overflow behaviour
drop-head. Furtheremore the label `dead_letter_strategy="disabled"` tells
that dead lettering is not taking place from this queue type.

b) this metric allows to support dead lettering for the MQTT QoS 0 queue
type in the future.

The new dead lettering metrics look as follows:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
```
2023-08-15 16:06:15 +02:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Karl Nilsson 8a6df5d955 Add feature flag for the restart_stream feature.
As it requires a new stream coordinator command which cannot be processed
by older machine versions.
2022-11-30 14:02:57 +00:00
Jean-Sébastien Pédron 909f861e55
Remove pre-quorum-queue compatibility code
Quorum queues were introduced in RabbitMQ 3.8.0. This was first time we
added a breaking change protected behind a feature flag. This allowed a
RabbitMQ cluster to be upgraded one node at a time, without having to
stop the entire cluster.

The breaking change was a new field in the `#amqqueue{}` record. This
broke the API and the ABI because records are a compile-time thing in
Erlang.

The compatibility code is in the wild for long enough that we want to
support the new `#amqqueue{}` record only from now on. The
`quorum_queue` feature flag was marked as required in a previous commit
(see #5202). This allows us to remove code in this patch.

References #5215.
2022-08-01 12:31:40 +02:00
Jean-Sébastien Pédron 29b80bd65b
Remove pre-virtual_host_metadata feature flag compatibility code
This feature flag was introduced because a new field was added to the
`#vhost{}` record. This was a breaking change the feature flag allowed a
RabbitMQ cluster to be upgraded one node at a time, without having to
stop the entire cluster.

The compatibility code is in the wild for long enough that we want to
support the new `#vhost{}` record only from now on. The
`virtual_host_metadata` feature flag was marked as required in a
previous commit (see #5202). This allows us to remove code in this
patch.

References #5215.
2022-08-01 11:56:04 +02:00
Jean-Sébastien Pédron bc6e28f5f3
rabbit_feature_flags: Use one callback per command
In the initial Feature flags subsystem implementation, we used a
migration function taking three arguments:
* the name of the feature flag
* its properties
* a command (`enable` or `is_enabled`)

However we wanted to implement a command (`post_enable`) and pass more
variables to the migration function. With the rework in #3940, the
migration function was therefore changed to take a single argument. That
argument was a record containing the command and much more information.
The content of the record could be different, depending on the command.

This solved the extensibility and the flexilibity of how we could call
the migration function. Unfortunately, this didn't improve its return
value as we wanted to return different things depending on the command.

In this patch, we change this completely by using a map of callbacks,
one per command, instead of that single migration function.

So before, where we had:

    #{migration_fun => {Module, Function}}

The new property is now:

    #{callbacks => #{enable => {Module, Function},
                     post_enable => {Module, Function}}}

All callbacks are still optional. You don't have to implement a fallback
empty function clause to skip commands you don't want to use oryou don't
support, as you would have to with the migration function.

Callbacks may be called with a callback-specific map of argument and
they should return the expected callback-specific return values.
Everything is defined with specs.

If a feature flag uses this new callbacks map, it will automatically
depend on `feature_flags_v2`, like the previous arity-1 migration
function. A feature flag can't define both the old migration function
and the new callbacks map.

Note that this arity-1 migration function never made it to a release of
RabbitMQ, so its support is entirely dropped with no backward
compatibility support. Likewise for the `#ffcommand{}` record.
2022-07-27 18:09:41 +02:00
Jean-Sébastien Pédron bcb8733880
rabbit_feature_flags: Add a feature flags controller process
This gen_statem-based process is responsible for handling concurrency
when feature flags are enabled and synchronized when a cluster is
expanded.

This clarifies and stabilizes the behavior of the feature flag subsystem
w.r.t. situations where e.g. a feature flag migration function takes
time to update data and a new node joins a cluster and synchronizes its
feature flag states with the cluster. There was a chance that the
feature flag was marked as enabled on the joining node, even though the
migration function didn't take care of that node.

With this new feature flags controller, enabling or synchronizing
feature flags blocks and delays any concurrent operations which try to
modify feature flags states too.

This change also clarifies where and when the migration function is
called: it is called at least once on each node who knows the feature
flag and when the state goes from "disabled" to "enabled" on that node.

Note that even if the feature flag is being enabled on a subset of the
nodes (because other nodes already have it enabled), it is marked as
"state_changing" everywhere during the migration. This is to prevent
that a node where it is enabled assumes it is enabled on all nodes who
know the feature flag.

There is a new feature as well: just after a feature flag is enabled,
the migration function is called a second time for any post-enable
actions. The feature flag is marked as enabled between these "enable"
and "post-enable" steps. The success or failure of this "post-enable"
run does not affect the state of the feature flag (i.e. it is ignored).

A new migration function API is introduced to allow more advanced
things. The new API is:

    my_migration_function(
      #ffcommand{name = ...,
                 props = ...,
		 command = enable | post_enable,
		 extra = #{...}})

The record is defined in `include/feature_flags.hrl`. Here is the
meaning of each field:

* `name` and `props` are the equivalent of the `FeatureName` and
  `FeatureProps` arguments of the previous migration function API.

* `command` is basically the same as the previous `Arg` arguments.

* `extra` is map containing context-specific information. For instance, it
  contains the list of nodes where the feature flag state changes.

This whole new behavior is behind a new feature flag called
`feature_flags_v2`. If a feature flag uses the new migration function
API, `feature_flags_v2` will be automatically enabled.

If many feature flags are enabled at once (like when a fresh RabbitMQ
node is started for the first time), `feature_flags_v2` will be enabled
first if it is in the list.
2022-06-28 10:13:19 +02:00
Michael Klishin c38a3d697d
Bump (c) year 2022-03-21 01:21:56 +04:00
dcorbacho 58e36b6417 Add specific stream protocol counters to track protocol errors 2021-06-29 12:50:00 +02:00
Jean-Sébastien Pédron 127295227c Doc: Fix all errors reported by edoc
With this, `gmake edoc` returns successfully and HTML docs are correctly
generated.
2021-04-09 13:15:32 +02:00
Michael Klishin b11a79cccf
Bump (c) year in header files 2021-02-04 07:04:58 +03:00
Philip Kuryloski a1fe3ab061 Change repo "root" to deps/rabbit
rabbit must not be the monorepo root application, as other applications depend on it
2020-11-13 14:34:42 +01:00