Commit Graph

363 Commits

Author SHA1 Message Date
David Ansari 033a87523d Bump ActiveMQ to v6.1.7
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details
We've experienced lots of failures in CI:
```
GEN    test/system_SUITE_data/apache-activemq-5.18.3-bin.tar.gz
make: *** [Makefile:65: test/system_SUITE_data/apache-activemq-5.18.3-bin.tar.gz] Error 28
make: Leaving directory '/home/runner/work/rabbitmq-server/rabbitmq-server/deps/amqp10_client'
Error: Process completed with exit code 2.
```

Bumping to the latest ActiveMQ Classic version may or may not help with
these failures.

Either way, we want to test against the latest ActiveMQ version. Version
5.18.3 reached end-of-life and is no longer maintained.
2025-06-23 18:11:42 +02:00
Jean-Sébastien Pédron 63f7da23c7
Delete symlinks to `erlang.mk` and `rabbitmq-components.mk`
[Why]
They make it more difficult to compile RabbitMQ on Windows. They were
probably useful at the time of the switch to a monorepository but I
don't see their need anymore.
2025-06-16 15:19:18 +02:00
David Ansari 1850ff1363 Avoid using the size/1 BIF
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details
Avoid using the size/1 BIF for performance critical code because
according to
https://whatsapp.github.io/erlang-language-platform/docs/erlang-error-index/w/W0050/
"The BIF is not optimized by the JIT".
2025-06-11 14:14:01 +02:00
David Ansari f8f1396c90 Follow AMQP spec for durable field
The AMQP spec defines:
```
<field name="durable" type="boolean" default="false"/>
```

RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set.
The idea was to favour safety over performance.
This complies with the AMQP spec because the spec allows other target or
node specific defaults for the durable field:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the fields
> within the header unless other target or node specific defaults have otherwise
> been set.

However, some client libraries completely omit the header section if the
app expliclity sets durable=false. This complies with the spec, but it
means that RabbitMQ cannot diffentiate between "client app forgot to set
the durable field" vs "client lib opted in for an optimisation omitting
the header section".

This is problematic with JMS message selectors where JMS apps can filter
on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode,
RabbitMQ needs to know whether the JMS app sent the message as
PERSISTENT or NON_PERSISTENT.

Rather than relying on client libs to always send the header section
including the durable field, this commit makes RabbitMQ comply with the
default value for durable in the AMQP spec.
Some client lib maintainers accepted to send the header section, while
other maintainers refused to do so:
https://github.com/Azure/go-amqp/issues/330
https://issues.apache.org/jira/browse/QPIDJMS-608

Likely the AMQP spec was designed to omit the header section when
performance is important, as is the case with durable=false. Omitting
the header section means saving a few bytes per message on the wire and
some marshalling and unmarshalling overhead on both client and server.

Therefore, it's better to push the "safe by default" behaviour from the broker
back to the client libs. Client libs should send messages as durable by
default unless the client app expliclity opts in to send messages as
non-durable. This is also what JMS does: By default JMS apps send
messages as PERSISTENT:
> The message producer's default delivery mode is PERSISTENT.

Therefore, this commit also makes the AMQP Erlang client send messages as
durable, by default.

This commit will apply to RabbitMQ 4.2.
It's arguably not a breaking change because in RabbitMQ, message durability
is actually more determined by the queue type the message is sent to rather than the
durable field of the message:
* Quroum queues and streams store messages durably (fsync or replicate)
  no matter what the durable field is
* MQTT QoS 0 queues hold messages in memory no matter what the
  durable field is
* Classic queues do not fsync even if the durable field is set to true

In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with
durable=true:
53e3dd6abb/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java (L91)

The tests for selecting messages by JMSDeliveryMode relying on the
behaviour in this commit can be found on the `jms` branch.
2025-05-20 08:32:52 +02:00
David Ansari 561376052e Fix type spec for AMQP 1.0 address
The target address can be null which denotes the anonymous terminus.
https://docs.oasis-open.org/amqp/anonterm/v1.0/anonterm-v1.0.html
2025-04-07 16:37:17 +02:00
David Ansari 32854e8d34 Auto widen session incoming-window in AMQP 1.0 client
This commit fixes a bug in the Erlang AMQP 1.0 client.

Prior to this commit, to repro this bug:
1. Send more than 2^16 messages to a queue.
2. Grant more than a total of 2^16 link credit initially (on a single link
   or across multiple links) on a single session without any
   auto or manual link credit renewal.

The expectation is that thanks to sufficiently granted initial link-credit,
the client will receive all messages.
However, consumption stops after exactly 2^16-1 messages.

That's because the client lib was never sending a flow frame to the server.
So, after the client received all 2^16-1 messages (the initial
incoming-window set by the client), the server's remote-incoming-window
reached 0 causing the server to stop delivering messages.

The expectation is that the client lib automatically handles session
flow control without any manual involvement of the client app.

This commit implements this fix:
* We keep the server's remote-incoming window always large by default as
  explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window
* Hence, the client lib sets its incoming-window to 100,000 initially.
* The client lib tracks its incoming-window decrementing it by 1 for
  every transfer it received. (This wasn't done prior to this commit.)
* Whenever this window shrinks below 50,000, the client sends a flow
  frame without any link information widening its incoming-window back to 100,000.
* For test cases (maybe later for apps as well), there is a new function
  `amqp10_client_session:flow/3`, which allows for a test case to do manual
  session flow control. Its API is designed very similar to
  `amqp10_client_session:flow_link/4` in that the test can optionally request
  the lib to auto widen the session window whenever it falls below a certain threshold.
2025-03-19 16:29:02 +00:00
Loïc Hoguin c5d150a7ef
Use Erlang.mk's native Elixir support for CLI
This avoids using Mix while compiling which simplifies
a number of things and let us do further build improvements
later on.

Elixir is only enabled from within rabbitmq_cli currently.

Eunit is disabled since there are only Elixir tests.

Dialyzer will force-enable Elixir in order to process
Elixir-compiled beam files.

This commit also includes a few changes that are
related:

 * The Erlang distribution will now be started for parallel-ct

 * Many unnecessary PROJECT_MOD lines have been removed

 * `eunit_formatters` has been removed, it provides little value

 * The new `maybe_flock` Erlang.mk function is used where possible

 * Build test deps when testing rabbitmq_cli (Mix won't do it anymore)

 * rabbitmq_ct_helpers now use the early plugins to have Dialyzer
   properly set up
2025-03-18 10:02:49 +01:00
Aitor Perez 07adc3e571
Remove Bazel files 2025-03-13 13:42:34 +00:00
David Ansari 65576863fc
amqp10_client: Fix crash in close_sent
Fix crash in close_sent since the client might receive the open frame if
it previously sent the close frame in state open_sent.

We explicitly ignore the open frame. The alternative is to add another
gen_statem state CLOSE_PIPE which  might be an overkill however.

This commit also fixes a wrong comment: No sessions have begun if the
app requests the connection to be closed in state open_sent.
2025-03-07 14:48:43 +01:00
Jean-Sébastien Pédron 77e3636272
amqp10_client: Handle `close` message in the `open_sent` state
[Why]
Without this, the connection process crashes. We see this happenning in
CI frequently.
2025-03-07 14:48:43 +01:00
David Ansari 3daef04566 Trap exit in AMQP 1.0 client proc
Trap exit signal such that terminate/3 gets executed so that
the socket is closed cleanly.
2025-02-13 17:02:51 +01:00
David Ansari 9062476a18 Support dynamic creation of queues
## What?
Support the `dynamic` field of sources and targets.

 ## Why?
1. This allows AMQP clients to dynamically create exclusive queues, which
   can be useful for RPC workloads.
2. Support creation of JMS temporary queues over AMQP using the Qpid JMS
   client. Exclusive queues map very nicely to JMS temporary queues
   because:

> Although sessions are used to create temporary destinations, this is only
for convenience. Their scope is actually the entire connection. Their
lifetime is that of their connection and any of the connection’s sessions
are allowed to create a consumer for them.

https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#creating-temporary-destinations

 ## How?
If the terminus contains the capability `temporary-queue` as defined in
[amqp-bindmap-jms-v1.0-wd10](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=67638)
[5.2] and as sent by Qpid JMS client,
RabbitMQ will create an exclusive queue.
(This allows a future commit to take other actions if capability
`temporary-topic` will be used, such as the additional creation of bindings.)

No matter what the desired node properties are, RabbitMQ will set the
lifetime policy delete-on-close deleting the exclusive queue when the
link which caused its creation ceases to exist. This means the exclusive
queue will be deleted if either:
* the link gets detached, or
* the session ends, or
* the connection closes

Although the AMQP JMS Mapping and Qpid JMS create only a **sending** link
with `dynamic=true`, this commit also supports **receiving** links with
`dynamic=true` for non-JMS AMQP clients.

RabbitMQ is free to choose the generated queue name. As suggested by the
AMQP spec, the generated queue name will contain the container-id and link
name unless they are very long.

Co-authored-by: Arnaud Cogoluègnes <acogoluegnes@gmail.com>
2025-02-12 17:17:28 +01:00
GitHub a013bb1d1c bazel run gazelle 2025-01-28 04:02:47 +00:00
David Ansari 5a467934e3 Support AMQP over WebSocket in Erlang client
## What?
Implement the AMQP over WebSocket Binding Committee Specification 01 in
the AMQP 1.0 Erlang client:
https://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/cs01/amqp-wsb-v1.0-cs01.html

 ## Why?
1. This allows writing integration tests for the server implementation
   of AMQP over WebSocket.
2. Erlang and Elixir clients can use AMQP over WebSocket in environments
   where firewalls prohibit access to the AMQP port.

 ## How?
Use gun as WebSocket client.

The new module `amqp10_client_socket` handles socket operations (open, close, send) for:
* TCP sockets
* SSL sockets
* WebSockets

Prior to this commit, the amqp10_client_connection process closed only the
write end of the socket after it sent the AMQP close performative.
This commit removed premature socket closure because:
1. There is no equivalent feature provided in Gun since sending a
   WebSocket close frame causes Gun to cleanly close the connection for
   both writing and reading.
2. It's unnecessary and can result in unexpected and confusing behaviour on the server.
3. It's better practive to keep the TCP connection fully open until
   the AMQP closing handshake completes.
4. When amqp10_client_frame_reader terminates, it will cleanly close
   the socket for both writing and reading.
2025-01-27 17:50:47 +01:00
Michael Klishin 968eefa1bb
Bump (c) line year
There are no functional changes to this massive diff.
2025-01-01 17:54:10 -05:00
David Ansari 0d34ef6047 Set a floor of zero for incoming-window
Prior to this commit, when the sending client overshot RabbitMQ's incoming-window
(which is allowed in the event of a cluster wide memory or disk alarm),
and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative
incoming-window field in the FLOW frame causing the following crash in
the writer proc:
```
crasher:
  initial call: rabbit_amqp_writer:init/1
  pid: <0.19353.0>
  registered_name: []
  exception error: bad argument
    in function  iolist_size/1
       called as iolist_size([<<112,0,0,23,120>>,
                              [82,-15],
                              <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67,
                              <<112,0,0,23,120>>,
                              "Rª",64,64,64,64])
       *** argument 1: not an iodata term
    in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141)
    in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88)
    in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79)
    in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206)
    in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189)
    in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110)
    in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121)
```

This commit fixes this crash by maintaning a floor of zero for
incoming-window in the FLOW frame.

Fixes #12816
2024-12-10 09:39:21 +01:00
GitHub c7c98b5a35 bazel run gazelle 2024-11-28 04:02:58 +00:00
Diana Parra Corbacho 9b8f4bf7f3 Tests: remove unused import 2024-11-27 16:29:07 +01:00
Michael Klishin 07fdcc2bde
Naming 2024-11-27 10:03:49 -05:00
Diana Parra Corbacho 69061277bc Tests: system_SUITE increase timeout 2024-11-27 15:45:58 +01:00
David Ansari 6e8b566323 Deduplicate AMQP type inference
Introduce a single place in the AMQP 1.0 Erlang client that infers the AMQP 1.0 type.

Erlang integers are inferred to be AMQP type `long` to avoid overflow surprises.
2024-11-15 17:40:36 +01:00
David Ansari 814d44dd82 Convert array from AMQP 1.0 to AMQP 0.9.1
Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0
encoded message that contains an array value in message annotations:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.685.0>
  registered_name: []
  exception exit: {function_clause,
                      [{mc_amqpl,to_091,
                           [<<"x-array">>,
                            {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}],
                           [{file,"mc_amqpl.erl"},{line,737}]},
                       {mc_amqpl,'-convert_from/3-fun-3-',1,
                           [{file,"mc_amqpl.erl"},{line,168}]},
                       {lists,filtermap_1,2,
                           [{file,"lists.erl"},{line,2279}]},
                       {mc_amqpl,convert_from,3,
                           [{file,"mc_amqpl.erl"},{line,158}]},
                       {mc,convert,3,[{file,"mc.erl"},{line,332}]},
                       {rabbit_channel,handle_deliver0,4,
                           [{file,"rabbit_channel.erl"},{line,2619}]},
                       {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]},
                       {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]}
```
2024-10-22 12:16:19 +02:00
GitHub 7d8d338bf0 bazel run gazelle 2024-10-22 04:02:23 +00:00
Lois Soto Lopez 3ff7e82c5c Provide specific f. to fix client ssl options
Provides a specific function to fix client ssl options, i.e.: apply all
fixes that are applied for TLS listeneres and clients on previous
versions but also sets `cacerts` option to CA certificates obtained by
`public_key:cacerts_get`, only when no `cacertfile` or `cacerts` are
provided.
2024-10-21 18:00:06 -04:00
David Ansari d1d7d7bad4 Optionally notify client app with AMQP 1.0 performative
This commit notifies the client app with the AMQP performative if
connection config `notify_with_performative` is set to `true`.

This allows the client app to learn about all fields including
properties and capabilities returned by the AMQP server.
2024-10-18 13:51:35 +02:00
David Ansari b1064fddba Support negative integers in modified annotations 2024-10-11 14:43:31 +02:00
David Ansari e6818f0040 Track requeue history
Support tracking the requeue history as described in
https://github.com/rabbitmq/rabbitmq-website/pull/2095

This commit:
1. adds a test case tracing the requeue history via AMQP 1.0
   using the modified outcome and
2. fixes bugs in the broker which crashed if a modified message
   annotation value is an AMQP 1.0 list, map, or array.

Complex modified annotation values (list, map, array) are stored as tagged values from now on.
This means AMQP 0.9.1 consumers will not receive modified annotations of
type list, map, or array (which is okay).
2024-10-11 12:21:28 +02:00
David Ansari df59a52b70
Support AMQP filter expressions (#12415)
* Support AMQP filter expressions

 ## What?

This PR implements the following property filter expressions for AMQP clients
consuming from streams as defined in
[AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227):
* properties filters [section 4.2.4]
* application-properties filters [section 4.2.5]

String prefix and suffix matching is also supported.

This PR also fixes a bug where RabbitMQ would accept wrong filters.
Specifically, prior to this PR the values of the filter-set's map were
allowed to be symbols. However, "every value MUST be either null or of a
described type which provides the archetype filter."

 ## Why?

This feature adds the ability to RabbitMQ to have multiple concurrent clients
each consuming only a subset of messages while maintaining message order.

This feature also reduces network traffic between RabbitMQ and clients by
only dispatching those messages that the clients are actually interested in.

Note that AMQP filter expressions are more fine grained than the [bloom filter based
stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because
* they do not suffer false positives
* the unit of filtering is per-message instead of per-chunk
* matching can be performed on **multiple** values in the properties and
  application-properties sections
* prefix and suffix matching on the actual values is supported.

Both, AMQP filter expressions and bloom filters can be used together.

 ## How?

If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only
replies with filters it actually supports and validated successfully to
comply with:
"The receiving endpoint sets its desired filter, the sending endpoint
[RabbitMQ] sets the filter actually in place (including any filters defaulted at
the node)."

* Delete streams test case

The test suite constructed a wrong filter-set.
Specifically the value of the filter-set didn't use a described type as
mandated by the spec.
Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html
throws errors that the descriptor can't be encoded. Given that this code
path is already tests via the amqp_filtex_SUITE, this F# test gets
therefore deleted.

* Re-introduce the AMQP filter-set bug

Since clients might rely on the wrong filter-set value type, we support
the bug behind a deprecated feature flag and gradually remove support
this bug.

* Revert "Delete streams test case"

This reverts commit c95cfeaef7.
2024-10-07 17:12:26 +02:00
David Ansari 6863ae14dd Comply with §2.2.2 of Anonymous Terminus extension
Comply with section 2.2.2 Routing Errors:
https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
2024-09-26 16:45:18 +02:00
David Ansari 9d7ebf32a9 Enforce correct transfer settled flag
For messages published to RabbitMQ, RabbitMQ honors the transfer `settled`
field, no matter what value the sender settle mode was set to in the attach
frame.

Therefore, prior to this commit, a client could send a transfer with
`settled=true` even though sender settle mode was set to `unsettled` in the
attach frame.

This commit enforces that the publisher sets only transfer `settled` fields
that are valid with the spec.

If sender settle mode is:
* `unsettled`, the transfer `settled` flag must be `false`.
* `settled`, the transfer `settled` flag must be `true`.
* `mixed`, the transfer `settled` flag can be `true` or `false`.
2024-09-25 18:06:22 +02:00
David Ansari b1eb354385 Strictly validate annotations 2024-09-18 12:42:27 +02:00
David Ansari cd600bef8b Fix modified annotations
```
<field name="message-annotations" type="fields"/>
```

Prior to this commit integration tests succeeded because both Erlang
client and RabbitMQ server contained a bug.

This bug was noticed by a Java client test suite.
2024-09-18 09:38:44 +02:00
Loïc Hoguin 4a118c25f9
make: Fix regressions following make plugins cleanup 2024-09-10 15:42:28 +02:00
David Ansari c2ce905797
Enforce AMQP 1.0 channel-max (#12221)
* Enforce AMQP 1.0 channel-max

Enforce AMQP 1.0 field `channel-max` in the `open` frame by introducing
a new more user friendly setting called `session_max`:
> The channel-max value is the highest channel number that can be used on the connection.
> This value plus one is the maximum number of sessions that can be simultaneously active on the connection.

We set the default value of `session_max` to 64 such that, by
default, RabbitMQ 4.0 allows maximum 64 AMQP 1.0 sessions per AMQP 1.0 connection.

More than 64 AMQP 1.0 sessions per connection make little sense.
See also https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#session

Limiting the maximum number of sessions per connection can be useful to
protect against
* applications that accidentally open new sessions without ending old sessions
  (session leaks)
* too many metrics being exposed, for example in the future via the
  "/metrics/per-object" Prometheus endpoint with timeseries per session
  being emitted.

This commit does not make use of the existing `channel_max` setting
because:
1. Given that `channel_max = 0` means "no limit", there is no way for an
   operator to limit the number of sessions per connections to 1.
2. Operators might want to set different limits for maximum number of
   AMQP 0.9.1 channels and maximum number of AMQP 1.0 sessions.
3. The default of `channel_max` is very high: It allows using more than
   2,000 AMQP 0.9.1 channels per connection. Lowering this default might
   break existing AMQP 0.9.1 applications.

This commit also fixes a bug in the AMQP 1.0 Erlang client which, prior
to this commit used channel number 1 for the first session. That's wrong
if a broker allows maximum 1 session by replying with `channel-max = 0`
in the `open` frame. Additionally, the spec recommends:
> To make it easier to monitor AMQP sessions, it is RECOMMENDED that implementations always assign the lowest available unused channel number.

Note that in AMQP 0.9.1, channel number 0 has a special meaning:
> The channel number is 0 for all frames which are global to the connection and 1-65535 for frames that
refer to specific channels.

* Apply PR feedback
2024-09-05 17:45:27 +02:00
Loïc Hoguin 7ad8e2856b
make: Restrict Erlang.mk plugin inclusion
This has no real impact on performance[1] but should
make it clear which application can run the broker
and/or publish to Hex.pm. In particular, applications
that we can't run the broker from will now give up
early if we try to.

Note that while the broker can't normally run from the
amqp_client application's directory, it can run from
tests and some of the tests start the broker.

[1] on my machine
2024-08-29 15:19:50 +02:00
Loïc Hoguin 445f3c9270
make: Move rabbitmq-early-test.mk to rabbitmq-early-plugin.mk
No real need to have two files, especially since it contains
only a few variable definitions. Plan is to only keep
separate files for larger features such as dist or run.
2024-08-29 15:19:50 +02:00
Loïc Hoguin d4222f8216
make: Remove emptied rabbitmq-tools.mk 2024-08-29 15:19:14 +02:00
Loïc Hoguin 7e7e6feb9d
make: Remove rabbitmq-tests.mk
Everything in this file seems to be dead code except
ct-slow/ct-fast, which have been replaced by their
equivalent in the rabbit Makefile.
2024-08-29 15:19:13 +02:00
David Ansari d46f07c0a4 Add SASL mechanism ANONYMOUS
## 1. Introduce new SASL mechanism ANONYMOUS

 ### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.

 ### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.

Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.

Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* 0215e85643/src/main/java/com/rabbitmq/client/ConnectionFactory.java (L58-L61)
* ddb7a2f068/uri.go (L31-L32)

Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).

 ### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](146b4862d8/deps/rabbitmq_amqp1_0/Makefile (L6))
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.

Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.

By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public

To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.

In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.

 ## 2. Require SASL security layer in AMQP 1.0

 ### What?
An AMQP 1.0 client must use the SASL security layer.

 ### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.

Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.

 ### How?
This commit implements AMQP 1.0 figure 2.13.

A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
2024-08-15 10:58:48 +00:00
David Ansari 60ae4d4eca
Support SASL mechanism EXTERNAL in Erlang AMQP 1.0 client (#11984)
* Support SASL mechanism EXTERNAL in Erlang AMQP 1.0 client

* Move test to plugin rabbitmq_auth_mechanism_ssl

In theory, there can be other plugin that offer SASL mechanism EXTERNAL.
Therefore, instead of adding a test dependency from app rabbit to app
rabbitmq_auth_mechanism_ssl, it's better to test this plugin specific
functionality directly in the plugin itself.
2024-08-14 18:18:00 +02:00
Karl Nilsson 194d4ba2f5
Quorum queues v4 (#10637)
This commit contains the following new quorum queue features:

* Fair share high/low priorities
* SAC consumers honour consumer priorities
* Credited consumer refactoring to meet AMQP requirements.
* Use checkpoints feature to reduce memory use for queues with long backlogs
 * Consumer cancel option that immediately removes consumer and returns all pending messages.
 * More compact commands of the most common commands such as enqueue, settle and credit
 * Correctly track the delivery-count to be compatible with the AMQP spec
 * Support the "modified" AMQP 1.0 outcome better.

Commits:

* Quorum queues v4 scaffolding.

Create the new version but not including any changes yet.

QQ: force delete followers after leader has terminated.

Also try a longer sleep for mqtt_shared_SUITE so that the
delete operation stands a chance to time out and move on
to the forced deletion stage.

In some mixed machine version scenarios some followers will never
apply the poison pill command so we may as well force delete them
just in case.

QQ: skip test in amqp_client that cannot pass with mixed machine versions

QQ: remove dead code

Code relating to prior machine versions and state conversions.

rabbit_fifo_prop_SUITE fixes

* QQ: add v4 ff and new more compact enqueue command.

Also update rabbit_fifo_* suites to test more relevant code versions
where applicable.

QQ: always use the updated credit mode format

QQv4: use more compact consumer reference in settle, credit, return

This introudces a new type: consumer_key() which is either the consumer_id
or the raft index the checkout was processed at. If the consumer is
using one of the updated credit spec formats rabbit_fifo will use the
raft index as the primary key for the consumer such that the rabbit
fifo client can then use the more space efficient integer index
instead of the full consumer id in subsequent commands.

There is compatibility code to still accept the consumer id in
settle, return, discard and credit commands but this is slighlyt
slower and of course less space efficient.

The old form will be used in cases where the fifo client may have
already remove the local consumer state (as happens after a cancel).

Lots of test refactorings of the rabbit_fifo_SUITE to begin to use
the new forms.

* More test refactoring and new API fixes

rabbit_fifo_prop_SUITE refactoring and other fixes.


* First pass SAC consumer priority implementation.

Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.

Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.

QQ: add SAC consumer priority integration tests

QQ: add check for ff in tests

* QQ: add new consumer cancel option: 'remove'

This option immediately removes and returns all messages for a
consumer instead of the softer 'cancel' option which keeps the
consumer around until all pending messages have been either
settled or returned.

This involves a change to the rabbit_queue_type:cancel/5 API
to rabbit_queue_type:cancel/3.

* QQ: capture checked out time for each consumer message.

This will form the basis for queue initiated consumer timeouts.

* QQ: Refactor to use the new ra_machine:handle_aux/5 API

Instead of the old ra_machine:handle_aux/6 callback.

* QQ hi/lo priority queue

* QQ: Avoid using mc:size/1 inside rabbit_fifo

As we dont want to depend on external functions for things that may
change the state of the queue.

* QQ bug fix: Maintain order when returning multiple

Prior to this commit, quorum queues requeued messages in an undefined
order, which is wrong.

This commit fixes this bug and requeues messages always in the order as
nacked / rejected / released by the client.

We ensure that order of requeues is deterministic from the client's
point of view and doesn't depend on whether the quorum queue soft limit
was exceeded temporarily.
So, even when rabbit_fifo_client batches requeues, the order as nacked
by the client is still maintained.

* Simplify

* Add rabbit_quorum_queue:file_handle* functions back.

For backwards compat.

* dialyzer fix

* dynamic_qq_SUITE: avoid mixed versions failure.

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* Use QQ consumer removal when AMQP client detaches

This enables us to unskip some AMQP tests.

* Use AMQP address v2 in fsharp-tests

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* rabbit_fifo: Use Ra checkpoints

* quorum queues: Use a custom interval for checkpoints

* rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure

* QQ: Checkpoints modifications

* fixes

* QQ: emit release cursors on tick for followers and leaders

else followers could end up holding on to segments a bit longer
after traffic stops.

* Support draining a QQ SAC waiting consumer

By issuing drain=true, the client says "either send a transfer or a flow frame".
Since there are no messages to send to an inactive consumer, the sending
queue should advance the delivery-count consuming all link-credit and send
a credit_reply with drain=true to the session proc which causes the session
proc to send a flow frame to the client.

* Extract applying #credit{} cmd into 2 functions

This commit is only refactoring and doesn't change any behaviour.

* Fix default priority level

Prior to this commit, when a message didn't have a priority level set,
it got enqueued as high prio.

This is wrong because the default priority is 4 and
"for example, if 2 distinct priorities are implemented,
then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent
and levels 4 and 5 are distinct."
Hence, by default a message without priority set, must be enqueued as
low prio.

* bazel run gazelle

* Avoid deprecated time unit

* Fix aux_test

* Delete dead code

* Fix rabbit_fifo_q:get_lowest_index/1

* Delete unused normalize functions

* Generate less garbage

* Add integration test for QQ SAC with consumer priority

* Improve readability

* Change modified outcome behaviour

With the new quorum queue v4 improvements where a requeue counter was
added in addition to the quorum queue delivery counter, the following
sentence from https://github.com/rabbitmq/rabbitmq-server/pull/6292#issue-1431275848
doesn't apply anymore:

> Also the case where delivery_failed=false|undefined requires the release of the
> message without incrementing the delivery_count. Again this is not something
> that our queues are able to do so again we have to reject without requeue.

Therefore, we simplify the modified outcome behaviour:
RabbitMQ will from now on only discard the message if the modified's
undeliverable-here field is true.

* Introduce single feature flag rabbitmq_4.0.0

 ## What?

Merge all feature flags introduced in RabbitMQ 4.0.0 into a single
feature flag called rabbitmq_4.0.0.

 ## Why?

1. This fixes the crash in
https://github.com/rabbitmq/rabbitmq-server/pull/10637#discussion_r1681002352
2. It's better user experience.

* QQ: expose priority metrics in UI

* Enable skipped test after rebasing onto main

* QQ: add new command "modify" to better handle AMQP modified outcomes.

This new command can be used to annotate returned or rejected messages.

This commit also retains the delivery-count across dead letter boundaries
such that the AMQP header delivery-count field can now include _all_ failed
deliver attempts since the message was originally received.

Internally the quorum queue has moved it's delivery_count header to
only track the AMQP protocol delivery attempts and now introduces
a new acquired_count to track all message acquisitions by consumers.

* Type tweaks and naming

* Add test for modified outcome with classic queue

* Add test routing on message-annotations in modified outcome

* Skip tests in mixed version tests

Skip tests in mixed version tests because feature flag
rabbitmq_4.0.0 is needed for the new #modify{} Ra command
being sent to quorum queues.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2024-08-08 08:48:27 +01:00
David Ansari e6587c6e45 Support consumer priority in AMQP
Arguments
* `rabbitmq:stream-offset-spec`,
* `rabbitmq:stream-filter`,
* `rabbitmq:stream-match-unfiltered`
are set in the `filter` field of the `Source`.
This makes sense for these consumer arguments because:
> A filter acts as a function on a message which returns a boolean result
> indicating whether the message can pass through that filter or not.

Consumer priority is not really such a predicate.
Therefore, it makes more sense to set consumer priority in the
`properties` field of the `Attach` frame.

We call the key `rabbitmq:priority` which maps to consumer argument
`x-priority`.

While AMQP 0.9.1 consumers are allowed to set any integer data
type for the priority level, this commit decides to enforce an `int`
value (range -(2^31) to 2^31 - 1 inclusive).
Consumer priority levels outside of this range are not needed in
practice.
2024-07-12 20:31:01 +02:00
Loïc Hoguin bbfa066d79
Cleanup .gitignore files for the monorepo
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).

If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
2024-06-28 12:00:52 +02:00
Loïc Hoguin 2b03233ac1
make: Remove rabbitmq-macros.mk
It hasn't been used for some time. If compare_version
becomes necessary again in the future, it's in the history.
2024-06-25 13:39:38 +02:00
Loïc Hoguin 9f15e978b1
make: Remove xrefr
It is no longer used by Erlang.mk.
2024-06-25 13:08:08 +02:00
Diana Parra Corbacho c11b812ef6 Reduce default maximum message size to 16MB 2024-06-14 11:55:03 +02:00
Karl Nilsson ebbff46c96
Merge pull request #11307 from rabbitmq/amqp-flow-control-poc-2
Introduce outbound RabbitMQ internal AMQP flow control
2024-06-05 15:01:05 +01:00
Michael Klishin 55b38bd642
Merge pull request #11369 from cloudamqp/amqp10_client_ssl_options
amqp10_client: allow configuring global TLS options
2024-06-04 14:38:40 -04:00
Péter Gömöri 2779bf7375 amqp10_client: allow configuring global TLS options
Since OTP 26 the verify_peer TLS option is enabled by default, which
requires to specify cacerts as well. This change helps with not having
to specify TLS options like cacertfile at every connection. This is
especially helpful for shovels using AMQP 1.0, where the TLS options
would otherwise need to be specified as URI parameters. Similar env
configuration already exists for amqp_client.
2024-06-04 16:08:34 +02:00
David Ansari d70e529d9a Introduce outbound RabbitMQ internal AMQP flow control
## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
2024-06-04 13:11:55 +02:00