Commit Graph

1065 Commits

Author SHA1 Message Date
David Ansari e31df4cd01 Fix test case rebalance
This test case was wrongly skipped and therefore never ran.
2024-07-11 11:20:26 +02:00
David Ansari 18e8c1d5f8 Require all stable feature flags added up to 3.13.0
Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0,
we can also require all other feature flags introduced in or before 3.13.0
and remove their compatibility code for 4.0:

* restart_streams
* stream_sac_coordinator_unblock_group
* stream_filtering
* stream_update_config_command
2024-07-11 11:20:26 +02:00
Michael Davis 88c1ad2f6e
Adapt to new `{error, timeout}` return value in Khepri 0.14.0
See rabbitmq/khepri#256.
2024-07-10 16:07:43 -04:00
Michael Davis ae0663d7ca
Merge pull request #11663 from rabbitmq/md/ci/turn-off-mixed-version-khepri-tests
Turn off mixed version tests against Khepri
2024-07-10 15:07:07 -05:00
Michael Klishin 348ca6f5a7 amqpl_direct_reply_to_SUITE: use separate queue names to avoid interference 2024-07-10 13:48:38 -04:00
Michael Davis 56bbf3760d
Respect RABBITMQ_METADATA_STORE in clustering_recovery_SUITE 2024-07-10 13:46:22 -04:00
Michael Davis 0a4e5a9845
Respect RABBITMQ_METADATA_STORE in clustering_management_SUITE 2024-07-10 13:46:05 -04:00
Michael Davis ecb757554e
Skip cluster_minority_SUITE when RABBITMQ_METADATA_STORE is set to mnesia
This suite is only meant to run with Khepri as the metadata store.
Instead of setting this explicitly we can look at the configured
metadata store and conditionally skip the entire suite. This prevents
these tests from running twice in CI.
2024-07-10 13:46:05 -04:00
Michael Davis bda1f7cef6
Skip metadata_store_clustering_SUITE in mixed versions testing
Khepri is not yet compatible with mixed-version testing and this suite
only tests clustering when Khepri is the metadata store in at least some
of the nodes.
2024-07-10 13:46:05 -04:00
David Ansari 5deff457a0 Fix direct reply to crash when tracing is enabled
This commit fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11662

Prior to this commit the following crash occurred when an RPC reply message entered
RabbitMQ and tracing was enabled:
```
** Reason for termination ==
** {function_clause,
       [{rabbit_trace,'-tap_in/6-fun-0-',
            [{virtual_reply_queue,
                 <<"amq.rabbitmq.reply-to.g1h2AA5yZXBseUAyNzc5NjQyMAAAC1oAAAAAZo4bIw==.+Uvn1EmAp0ZA+oQx2yoQFA==">>}],
            [{file,"rabbit_trace.erl"},{line,62}]},
        {lists,map,2,[{file,"lists.erl"},{line,1559}]},
        {rabbit_trace,tap_in,6,[{file,"rabbit_trace.erl"},{line,62}]},
        {rabbit_channel,handle_method,3,
            [{file,"rabbit_channel.erl"},{line,1284}]},
        {rabbit_channel,handle_cast,2,
            [{file,"rabbit_channel.erl"},{line,659}]},
        {gen_server2,handle_msg,2,[{file,"gen_server2.erl"},{line,1056}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}]}
```

(Note that no trace message is emitted for messages that are delivered to
direct reply to requesting clients (neither in 3.12, nor in 3.13, nor
after this commit). This behaviour can be added in future when a direct
reply virtual queue becomes its own queue type.)
2024-07-10 18:14:35 +02:00
David Ansari 37b1ffe244 Upgrade AMQP address format in tests
from v1 to v2.

We still use AMQP address format v1 when connecting to an old 3.13 node.
2024-07-08 13:23:50 +02:00
David Ansari 19523876cd Make AMQP address v2 format user friendly
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604

This commit changes the AMQP address format v2 from
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to
```
/exchanges/:exchange/:routing-key
/exchanges/:exchange
/queues/:queue
```

Advantages:
1. more user friendly
2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2

This plural form is still non-overlapping with AMQP address format v1.

Although it might feel unusual at first to send a message to `/queues/q1`,
if you think about `queues` just being a namespace or entity type, this
address format makes sense.
2024-07-04 14:33:05 +02:00
David Ansari 7b18bd7a81 Enforce percent encoding
Partially copy file
https://github.com/ninenines/cowlib/blob/optimise-urldecode/src/cow_uri.erl
We use this copy because:
1. uri_string:unquote/1 is lax: It doesn't validate that characters that are
   required to be percent encoded are indeed percent encoded. In RabbitMQ,
   we want to enforce that proper percent encoding is done by AMQP clients.
2. uri_string:unquote/1 and cow_uri:urldecode/1 in cowlib v2.13.0 are both
   slow because they allocate a new binary for the common case where no
   character was percent encoded.
When a new cowlib version is released, we should make app rabbit depend on
app cowlib calling cow_uri:urldecode/1 and delete this file (rabbit_uri.erl).
2024-07-03 17:01:51 +02:00
David Ansari 0de9591050 Use different AMQP address format for v1 and v2
to distinguish between v1 and v2 address formats.

Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```

This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.

This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.

Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.

Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
2024-07-03 16:36:03 +02:00
Karl Nilsson 7273c6846d
Merge pull request #11582 from rabbitmq/testfixes-glorious-testfixes-and-other-improvements-hurray
Various test improvements
2024-07-01 16:54:49 +01:00
Loïc Hoguin a561b45dcf
Merge pull request #11573 from rabbitmq/loic-more-make
Another make PR
2024-07-01 14:27:04 +02:00
Karl Nilsson 2b09891a77 Fix flake in rabbit_stream_queue_SUITE 2024-07-01 10:27:52 +01:00
Karl Nilsson 087d701b6e speed up policy_SUITE 2024-07-01 10:27:52 +01:00
Karl Nilsson a277713105 speed up limit and tracking* suites
By removing explicit meta data store groups.
2024-07-01 10:27:52 +01:00
Karl Nilsson b67f7292b5 remove explicit meta data store groups from bindings_SUITE 2024-07-01 10:27:52 +01:00
Karl Nilsson a5819bf41e Remove meta data store groups from publisher_confirms_parallel_SUITE 2024-07-01 10:27:52 +01:00
Karl Nilsson 6a655b6daa Fix test flake in publisher_confirms_parallel_SUITE
test `confirm_nack` had a race condition that mean either ack or nack
where likely outcomes. By stopping the queue before the publish we
ensure only nack is the valid outcome.
2024-07-01 10:27:52 +01:00
David Ansari 166e5d8a2f Bump AMQP.NET Lite to 2.4.11 2024-06-28 18:50:59 +02:00
Loïc Hoguin bbfa066d79
Cleanup .gitignore files for the monorepo
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).

If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
2024-06-28 12:00:52 +02:00
David Ansari daf61c9653 Classic queue removes consumer
## What?

The rabbit_queue_type API has allowed to cancel a consumer.
Cancelling a consumer merely stops the queue sending more messages
to the consumer. However, messages checked out to the cancelled consumer
remain checked out until acked by the client. It's up to the client when
and whether it wants to ack the remaining checked out messages.

For AMQP 0.9.1, this behaviour is necessary because the client may
receive messages between the point in time it sends the basic.cancel
request and the point in time it receives the basic.cancel_ok response.

AMQP 1.0 is a better designed protocol because a receiver can stop a
link as shown in figure 2.46.
After a link is stopped, the client knows that the queue won't deliver
any more messages.
Once the link is stopped, the client can subsequently detach the link.

This commit extends the rabbit_queue_type API to allow a consumer being
immediately removed:
1. Any checked out messages to that receiver will be requeued by the
   queue. (As explained previously, a client that first stops a link
   by sending a FLOW with `link_credit=0` and `echo=true` and waiting
   for the FLOW reply from the server and settles any messages before
   it sends the DETACH frame, won't have any checked out messages).
2. The queue entirely forgets this consumer and therefore stops
   delivering messages to the receiver.

This new behaviour of consumer removal is similar to what happens when
an AMQP 0.9.1 channel is closed: All checked out messages to that
channel will be requeued.

 ## Why?

Removing the consumer immediately simplifies many aspects:
1. The server session process doesn't need to requeue any checked out
   messages for the receiver when the receiver detaches the link.
   Specifically, messages in the outgoing_unsettled_map and
   outgoing_pending queue don't need to be requeued because the queue
   takes care of requeueing any checked out messages.
2. It simplifies reasoning about clients first detaching and then
   re-attaching in the same session with the same link handle (the handle
   becomes available for re-use once a link is closed): This will result
   in the same RabbitMQ queue consumer tag.
3. It simplifies queue implementations since state needs to be hold and
   special logic needs to be applied to consumers that are only
   cancelled (basic.cancel AMQP 0.9.1) but not removed.
4. It makes the single active consumer feature safer when it comes to
   maintaning message order: If a client cancels consumption via AMQP
   0.9.1 basic.cancel, but still has in-flight checked out messages,
   the queue will activate the next consumer. If the AMQP 0.9.1 client
   shortly after crashes, messages to the old consumer will be requeued
   which results in message being out of order. To maintain message order,
   an AMQP 0.9.1 client must close the whole channel such that messages
   are requeued before the next consumer is activated.
   For AMQP 1.0, the client can either stop the link first (preferred)
   or detach the link directly. Detaching the link will requeue all
   messages before activating the next consumer, therefore maintaining
   message order. Even if the session crashes, message order will be
   maintained.

  ## How?

`rabbit_queue_type:cancel` accepts a spec as argument.
The new interaction between session proc and classic queue proc (let's
call it cancel API v2) must be hidden behind a feature flag.
This commit re-uses feature flag credit_api_v2 since it also gets
introduced in 4.0.
2024-06-27 19:50:36 +02:00
David Ansari 53b20ec78a Fix amqp_client_SUITE flake
Fix the following sporadic failure:

```
=== === Reason: {assertMatch,
                     [{module,amqp_client_SUITE},
                      {line,446},
                      {expression,
                          "rabbitmq_amqp_client : delete_queue ( LinkPair , QName )"},
                      {pattern,"{ ok , # { message_count := NumMsgs } }"},
                      {value,{ok,#{message_count => 29}}}]}
  in function  amqp_client_SUITE:sender_settle_mode_mixed/1 (amqp_client_SUITE.erl, line 446)
 ```
The last message (30th) is send as settled.
It apparently happened that all messages up to 29 got stored.
The 29th message also got confirmed.
Subsequently the queue got deleted with only 29 ready messages.

Bumping NumbMsgs to 31 ensures that the last message is sent unsettled.
2024-06-27 08:57:53 +00:00
David Ansari eed48ef3ee Fix amqp_client_SUITE flake
Fix the following sporadich error in CI:
```
=== Location: [{amqp_client_SUITE,available_messages,3137},
              {test_server,ts_tc,1793},
              {test_server,run_test_case_eval1,1302},
              {test_server,run_test_case_eval,1234}]
=== === Reason: {assertEqual,
                     [{module,amqp_client_SUITE},
                      {line,3137},
                      {expression,"get_available_messages ( Receiver )"},
                      {expected,5000},
                      {value,0}]}
```

The client decrements the available variable from 1 to 0 when it
receives the transfer and sends a credit_exhausted event to the CT test
case proc. The CT test case proc queries the client session proc for available
messages, which is still 0. The FLOW frame from RabbitMQ to the client with the
available=5000 set could arrive shortly after.
2024-06-27 08:00:48 +00:00
David Ansari 6aa2de3f4f Avoid crash in test case
Avoid the following unexpected error in mixed version testing where
feature flag credit_api_v2 is disabled:
```
[error] <0.1319.0> Timed out waiting for credit reply from quorum queue 'stop' in vhost '/'. Hint: Enable feature flag credit_api_v2
[warning] <0.1319.0> Closing session for connection <0.1314.0>: {'v1_0.error',
[warning] <0.1319.0>                                             {symbol,<<"amqp:internal-error">>},
[warning] <0.1319.0>                                             {utf8,
[warning] <0.1319.0>                                              <<"Timed out waiting for credit reply from quorum queue 'stop' in vhost '/'. Hint: Enable feature flag credit_api_v2">>},
[warning] <0.1319.0>                                             undefined}
[error] <0.1319.0> ** Generic server <0.1319.0> terminating
[error] <0.1319.0> ** Last message in was {'$gen_cast',
[error] <0.1319.0>                            {frame_body,
[error] <0.1319.0>                                {'v1_0.flow',
[error] <0.1319.0>                                    {uint,283},
[error] <0.1319.0>                                    {uint,65535},
[error] <0.1319.0>                                    {uint,298},
[error] <0.1319.0>                                    {uint,4294967295},
[error] <0.1319.0>                                    {uint,1},
[error] <0.1319.0>                                    {uint,282},
[error] <0.1319.0>                                    {uint,50},
[error] <0.1319.0>                                    {uint,0},
[error] <0.1319.0>                                    undefined,undefined,undefined}}}
```

Presumably, the server session proc timed out receiving a credit reply from
the quorum queue because the test case deleted the quorum queue
concurrently with the client (and therefore also the server session
process) topping up link credit.

This commit detaches the link first and ends the session synchronously
before deleting the quorum queue.
2024-06-27 07:30:23 +00:00
Michael Klishin 083800809f
Merge pull request #11436 from rabbitmq/loic-remove-fhc
Remove most of the fd related FHC code
2024-06-26 15:19:49 -04:00
Michael Klishin de866b81cb
Merge pull request #11547 from rabbitmq/speed-up-rabbit-tests
Speed up rabbit tests suites
2024-06-26 15:15:06 -04:00
David Ansari 2adcdc1a28 Attempt to de-flake
This commit attempts to remove the following flake:
```
{amqp_client_SUITE,server_closes_link,1113}
{badmatch,[<14696.3530.0>,<14696.3453.0>]}
```
by waiting after each test case until sessions were de-registered from
the 1st RabbitMQ node.
2024-06-26 15:57:26 +02:00
David Ansari dcb2fe0fd9 Fix message IDs settlement order
## What?

This commit fixes issues that were present only on `main`
branch and were introduced by #9022.

1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`)
   expect message IDs to be (n)acked in the order as they were delivered
   to the channel / session proc.
   Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5`
   was wrong causing not all messages to be acked adding a regression
   to also AMQP 0.9.1.
2. The order in which the session proc requeues or rejects multiple
   message IDs at once is important. For example, if the client sends a
   DISPOSITION with first=3 and last=5, the message IDs corresponding to
   delivery IDs 3,4,5 must be requeued or rejected in exactly that
   order.
   For example, quorum queues use this order of message IDs in
   34d3f94374/deps/rabbit/src/rabbit_fifo.erl (L226-L234)
   to dead letter in that order.

 ## How?

The session proc will settle (internal) message IDs to queues in ascending
(AMQP) delivery ID order, i.e. in the order messages were sent to the
client and in the order messages were settled by the client.

This commit chooses to keep the session's outgoing_unsettled_map map
data structure.

An alternative would have been to use a queue or lqueue for the
outgoing_unsettled_map as done in
* 34d3f94374/deps/rabbit/src/rabbit_channel.erl (L135)
* 34d3f94374/deps/rabbit/src/rabbit_queue_consumers.erl (L43)

Whether a queue (as done by `rabbit_channel`) or a map (as done by
`rabbit_amqp_session`) performs better depends on the pattern how
clients ack messages.

A queue will likely perform good enough because usually the oldest
delivered messages will be acked first.
However, given that there can be many different consumers on an AQMP
0.9.1 channel or AMQP 1.0 session, this commit favours a map because
it will likely generate less garbage and is very efficient when for
example a single new message (or few new messages) gets acked while
many (older) messages are still checked out by the session (but by
possibly different AMQP 1.0 receivers).
2024-06-26 13:35:41 +02:00
Karl Nilsson 1825f2be77 Fix flake in rabbit_stream_queue_SUITE 2024-06-26 09:32:08 +01:00
Karl Nilsson c36b56b626 speed up signal_handling_SUITE 2024-06-26 07:38:29 +01:00
Karl Nilsson d25017d8cf speed up rabbit_fifo_dlx_integration_SUITE slightly 2024-06-26 07:38:29 +01:00
Karl Nilsson e2767a750d further speed up quorum_queue_SUITE 2024-06-26 07:38:29 +01:00
Karl Nilsson e84cb65287 speed up dead_lettering_SUITE
By lowering tick intervals.
2024-06-26 07:38:29 +01:00
Karl Nilsson 3bb5030132 speed up queue_parallel_SUITE
By removing groups that test quorum configurations that are no
longer relevant.
2024-06-26 07:38:29 +01:00
Karl Nilsson 6843384c85 speed up metrics_SUITE 2024-06-26 07:38:29 +01:00
Karl Nilsson 7f0f632de7 dynamic_qq test tweak 2024-06-26 07:38:29 +01:00
Karl Nilsson 30c14e6f35 speed up product_info_SUITE
By running all tests in a single node rather than starting a new
one for each test.
2024-06-26 07:38:29 +01:00
Karl Nilsson 371b429ea1 speed up consumer_timeout_SUITE
By reducing consumer timeout and the receive timeout.
2024-06-26 07:38:29 +01:00
Karl Nilsson 25d79fa448 speed up rabbit_fifo_prop_SUITE
mostly by reducing the sizes of some properties as they can run
quite slowly in ci
2024-06-26 07:38:29 +01:00
Karl Nilsson f919fee7f1 speed up quorum_queues_SUITE
AFTER: gmake -C deps/rabbit ct-quorum_queue  6.15s user 4.25s system 2% cpu 6:25.29 total
2024-06-26 07:38:29 +01:00
Karl Nilsson 3551309baf speed up dynamic_qq_SUITE
BEFORE: time gmake -C deps/rabbit ct-dynamic_qq  1.92s user 1.44s system 2% cpu 2:23.56 total

AFTER: time gmake -C deps/rabbit ct-dynamic_qq  1.66s user 1.22s system 2% cpu 1:56.44 total
2024-06-26 07:38:29 +01:00
Karl Nilsson 13a1a7c7fe speed up rabbit_stream_queue_SUITE
Reduce the number of tests that are run for 2 nodes.

BEFORE: time gmake -C deps/rabbit ct-rabbit_stream_queue  7.22s user 5.72s system 2% cpu 8:28.18 total

AFTER time gmake -C deps/rabbit ct-rabbit_stream_queue  27.04s user 8.43s system 10% cpu 5:38.63 total
2024-06-26 07:38:29 +01:00
Loïc Hoguin 6a47eaad22
Zero sockets_used/sockets_limit stats
They are no longer used.

This removes a couple file_handle_cache:info/1 calls.

We are not removing them from the HTTP API to avoid
breaking things unintentionally.
2024-06-24 12:07:51 +02:00
Loïc Hoguin 49bedfc17e
Remove most of the fd related FHC code
Stats were not removed, including management UI stats
relating to FDs.

Web-MQTT and Web-STOMP configuration relating to FHC
were not removed.

The file_handle_cache itself must be kept until we
remove CQv1.
2024-06-24 12:07:51 +02:00
Michael Klishin 7335aaaf1b
Merge pull request #11528 from rabbitmq/mk-fix-virtual-host-creation-post-11457
Follow-up to #11457
2024-06-22 05:15:22 -04:00
Michael Klishin b822be02af
Revert "cuttlefish tls schema for amqp_client" 2024-06-22 04:16:50 -04:00
Michael Klishin 1e577a82fc Follow-up to #11457
The queue type argument won't always be a binary,
for example, when a virtual host is created.

As such, the validation code should accept at
least atoms in addition to binaries.

While at it, improve logging and error reporting
when DQT validation fails, and while at it,
make the definition import tests focussed on
virtual host a bit more robust.
2024-06-22 02:16:22 -04:00
Simon Unge 24fb0334ac Add schema duplicate for amqp 1.0 2024-06-21 21:43:07 -04:00
Simon Unge 145592efe9 Remove server options and move to rabbit schema 2024-06-21 21:43:07 -04:00
Michael Klishin 0b84acd164
Merge pull request #11515 from rabbitmq/issue-11514
Handle unknown QQ state
2024-06-21 21:41:59 -04:00
Michael Klishin 7c2d29d1e4
Merge pull request #11513 from rabbitmq/loic-remove-ram-durations
CQ: Remove rabbit_memory_monitor and RAM durations
2024-06-21 21:40:13 -04:00
Michael Klishin f51528244e definition_import_SUITE: fix a subtle timing issue
In case 16, an await_condition/2 condition was
not correctly matching the error. As a result,
the function proceeded to the assertion step
earlier than it should have, failing with
an obscure function_clause.

This was because an {error, Context} clause
was not correct.

In addition to fixing it, this change adds a
catch-all clause and verifies the loaded
tagged virtual host before running any assertions
on it.

If the virtual host was not imported, case 16
will now fail with a specific CT log message.

References #11457 because the changes there
exposed this behavior in CI.
2024-06-21 21:32:38 -04:00
Luke Bakken ce16e61d08 Do not overwrite `default_queue_type` with `undefined`
Importing a definitions file with no `default_queue_type` metadata for a vhost will result in that vhosts value being set to `undefined`.

Once set to a non-`undefined` value, this PR prevents `default_queue_type` from being set back to `undefined`
2024-06-21 11:25:18 -04:00
Michal Kuratczyk bfedfdca71
Handle unknown QQ state
ra_state may contain a QQ state such as {'foo',init,unknown}.
Perfore this fix, all_replica_states doesn't map such states
to a 2-tuple which leads to a crash in maps:from_list because
a 3-tuple can't be handled.

A crash in rabbit_quorum_queue:all_replica_states leads to no
results being returned from a given node when the CLI asks for
QQs with minimum quorum.
2024-06-20 17:49:43 +02:00
Loïc Hoguin 1ca46f1c63
CQ: Remove rabbit_memory_monitor and RAM durations
CQs have not used RAM durations for some time, following
the introduction of v2.
2024-06-20 15:19:51 +02:00
Michal Kuratczyk 489de21a76
Deflake vhost_is_created_with_default_user
This test relied on the order of items
in the list of users returned and would crash
with `case_clause` if `guest` was returned first.
2024-06-19 13:52:56 +02:00
Arnaud Cogoluègnes a52b8c0eb3
Bump Maven to 3.9.8 in Java test suites 2024-06-18 08:55:19 +02:00
Loïc Hoguin d959c8a43d
Merge pull request #11112 from rabbitmq/loic-faster-cq-shared-store-gc
4.x: Additional CQv2 message store optimisations
2024-06-17 15:30:25 +02:00
Michael Klishin 6605a7330f
Merge pull request #11455 from rabbitmq/default-max-message-size
Reduce default maximum message size to 16MB
2024-06-14 13:49:26 -04:00
Diana Parra Corbacho c11b812ef6 Reduce default maximum message size to 16MB 2024-06-14 11:55:03 +02:00
Loïc Hoguin 41ce4da5ca
CQ: Remove ability to change shared store index module
It will always use the ETS index. This change lets us
do optimisations that would otherwise not be possible,
including 81b2c39834953d9e1bd28938b7a6e472498fdf13.

A small functional change is included in this commit:
we now always use ets:update_counter to update the
ref_count, instead of a mix of update_{counter,fields}.

When upgrading to 4.0, the index will be rebuilt for
all users that were using a custom index module.
2024-06-14 11:52:03 +02:00
Jean-Sébastien Pédron df658317ee
peer_discovery_tmp_hidden_node_SUITE: Use IP address to simulate a long node name
[Why]
The `longnames`-based testcase depends on a copnfigured FQDN. Buildbuddy
hosts were incorrectly configured and were lacking one. As a workaround,
`/etc/hosts` was modified to add an FQDN.

We don't use Buildbuddy anymore, but the problem also appeared on team
members' Broadcom-managed laptops (also having an incomplete network
configuration). More importantly, GitHub workers seem to have the same
problem, but randomly!

At this point, we can't rely on the host's correct network
configuration.

[How]
The testsuite is modified to use a hard-coded IP address, 127.0.0.1, to
simulate a long Erlang node name (i.e. it has dots).
2024-06-13 15:04:07 +02:00
Karl Nilsson cca64e8ed6 Add new local random exchange type.
This exchange type will only bind classic queues and will only return
routes for queues that are local to the publishing connection. If more than
one queue is bound it will make a random choice of the locally bound queues.

This exchange type is suitable as a component in systems that run
highly available low-latency RPC workloads.

Co-authored-by: Marcial Rosales <mrosales@pivotal.io>
2024-06-12 17:16:45 +01:00
Karl Nilsson 3390fc97fb etcd peer discovery fixes
Instead of relying on the complex and non-determinstic default node
selection mechanism inside peer discovery this change makes the
etcd backend implemention make the leader selection itself based on
the etcd create_revision of each entry. Although not spelled out anywhere
explicitly is likely that a property called "Create Revision" is going
to remain consistent throughout the lifetime of the etcd key.

Either way this is likely to be an improvement on the current approach.
2024-06-12 15:31:27 +01:00
Michael Klishin b9f387b988
Merge pull request #11412 from rabbitmq/link-error
Prefer link error over session error
2024-06-07 11:10:48 -04:00
Michael Davis 5e4a4326ef
Merge pull request #11398 from rabbitmq/md/khepri/read-only-permissions-transaction-queries
Khepri: Use read-only transactions to query for user/topic permissions
2024-06-07 08:20:02 -05:00
David Ansari 895bf3e3cb Prefer link error over session error
when AMQP client publishes with a bad 'to' address such that other links
on the same session can continue to work.
2024-06-07 13:41:05 +02:00
Michael Klishin c592405b7a
Merge pull request #11397 from rabbitmq/direct-reply-to
4.x: add tests for AMQP 0.9.1 direct reply to feature
2024-06-07 02:28:31 -04:00
Michael Klishin 959d3fd15c Simplify a rabbitmqctl_integration_SUITE test 2024-06-06 21:21:26 -04:00
Michael Klishin 1fc325f5d7 Introduce a way to disable virtual host reconciliation
Certain test suites need virtual hosts to be down
for an extended period of time, and this feature
gets in a way ;)
2024-06-06 21:21:26 -04:00
David Ansari d806d698af Add tests for AMQP 0.9.1 direct reply to feature
Add tests for https://www.rabbitmq.com/docs/direct-reply-to

Relates https://github.com/rabbitmq/rabbitmq-server/issues/11380
2024-06-06 18:11:39 +02:00
Michael Davis d0425e5361
cluster_minority_SUITE: Test definitions export 2024-06-06 10:49:27 -04:00
Michael Davis 7a8669ea73
Improve cluster_minority_SUITE
This is a mix of a few changes:

* Suppress the compiler warning from the export_all attribute.
* Lower Khepri's command handling timeout value. By default this is
  set to 30s in rabbit which makes each of the cases in
  `client_operations` take an excessively long time. Before this change
  the suite took around 10 minutes to complete. Now it takes between two
  and three minutes.
* Swap the order of client and broker teardown steps in end_per_group
  hook. The client teardown steps will always fail if run after the
  broker teardown steps because they rely on a value in `Config` that
  is deleted by broker teardown.
2024-06-06 10:49:27 -04:00
Michael Davis 79e1350a87
Remove feature flag code handling potential code server deadlocks
These test cases and RPC calls were concerned with deadlocks possible
from modifying the code server process from multiple callers. With the
switch to persistent_term in the parent commits these kinds of deadlocks
are no longer possible.

We should keep the RPC calls to `rabbit_ff_registry_wrapper:inventory/0`
though for mixed-version compatibility with nodes that use module
generation instead of `persistent_term` for their registry.
2024-06-05 11:26:41 -04:00
Michael Davis ea2da2d32d
rabbit_feature_flags: Inject test feature flags atomically
`rabbit_feature_flags:inject_test_feature_flags/2` could discard flags
from the persistent term because the read and write to the persistent
term were not atomic. `feature_flags_SUITE:registry_concurrent_reloads/1`
spawns many processes to modify the feature flag registry concurrently
but also updates this persistent term concurrently. The processes would
race so that many would read the initial flags, add their flag and write
that state, discarding any flags that had been written in the meantime.

We can add a lock around changes to the persistent term to make the
changes atomic.

Non-atomic updates to the persistent term caused unexpected behavior in
the `registry_concurrent_reloads/1` case previously. The
`PT_TESTSUITE_ATTRS` persistent_term only ended up containing a few of
the desired feature flags (for example only `ff_02` and `ff_06` along
with the base `ff_a` and `ff_b`). The case did not fail because the
registry continued to make progress towards that set of feature flags.
However the test case never reached the "all feature flags appeared"
state of the spammer and so the new assertion added at the end of the
case in this commit would fail.
2024-06-05 11:26:41 -04:00
Karl Nilsson ebbff46c96
Merge pull request #11307 from rabbitmq/amqp-flow-control-poc-2
Introduce outbound RabbitMQ internal AMQP flow control
2024-06-05 15:01:05 +01:00
Michael Klishin 44c381929e
Merge pull request #11373 from rabbitmq/qq-server-recovery
QQ: Enable server recovery.
2024-06-04 15:50:16 -04:00
Karl Nilsson 08a8f93e97 QQ: Enable server recovery.
This ensures quorum queue processes are restarted after a ra system restart.
2024-06-04 16:41:12 +01:00
David Ansari d70e529d9a Introduce outbound RabbitMQ internal AMQP flow control
## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
2024-06-04 13:11:55 +02:00
Diana Parra Corbacho 3bbda5bdba Remove classic mirror queues 2024-06-04 13:00:31 +02:00
David Ansari bd847b8cac Put credit flow config into persistent term
Put configuration credit_flow_default_credit into persistent term such
that the tuple doesn't have to be copied on the hot path.

Also, change persistent term keys from `{rabbit, AtomKey}` to `AtomKey`
so that hashing becomes cheaper.
2024-05-31 16:20:51 +02:00
Simon Unge 19a751890c Remove checks to vhost-limit as that is now handled by rabbit_queue_type:declare
Add new error return tuple when queue limit is exceed
2024-05-27 09:53:54 +02:00
Michal Kuratczyk cfa3de4b2b
Remove unused imports (thanks elp!) 2024-05-23 16:36:08 +02:00
Simon Unge 60ae6c0943 cluster wide queue limit 2024-05-21 17:41:53 +00:00
David Ansari df9e9978ce Fix type spec
and improve naming
2024-05-17 12:14:58 +02:00
David Ansari e8c5d22e9b Have minimum queue TTL take effect for quorum queues
For classic queues, if both policy and queue argument are set
for queue TTL, the minimum takes effect.

Prior to this commit, for quorum queues if both policy and
queue argument are set for queue TTL, the policy always overrides the
queue argument.

This commit brings the quorum queue queue TTL resolution to classic
queue's behaviour. This allows developers to provide a custom lower
queue TTL while the operator policy acts an upper bound safe-guard.
2024-05-16 17:40:59 +02:00
Michael Davis 3435a0d362
Fix a flake in amqp_client_SUITE
`event_recorder` listens for events globally rather than per-connection
so it's possible for `connection_closed` and other events from prior
test cases to be captured and mixed with the events the test is
interested in, causing a flake.

Instead we can assert that the events we gather from `event_recorder`
contain the ones emitted by the connection.
2024-05-14 14:50:50 -04:00
David Ansari 90a40107b4 Remove BCC from x-death routing-keys
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11174
which broke the following Java client test:
```
./mvnw verify -P '!setup-test-cluster' -Drabbitmqctl.bin=DOCKER:rabbitmq -Dit.test=DeadLetterExchange#deadLetterNewRK
```

The desired documented behaviour is the following:
> routing-keys: the routing keys (including CC keys but excluding BCC ones) the message was published with

This behaviour should be respected also for messages dead lettered into a
stream. Therefore, instead of first including the BCC keys in the `#death.routing_keys` field
and removing it again in mc_amqpl before sending the routing-keys to the
client as done in v3.13.2 in
dc25ef5329/deps/rabbit/src/mc_amqpl.erl (L527)
we instead omit directly the BCC keys from `#death.routing_keys` when
recording a death event.

This commit records the BCC keys in their own mc `bcc` annotation in `mc_amqpl:init/1`.
2024-05-14 13:40:23 +02:00
David Ansari 083889ec01 Fix test assertion 2024-05-13 18:21:40 +02:00
David Ansari 68f8ef14f3 Fix comment 2024-05-13 17:59:42 +02:00
Loïc Hoguin 0942573be7
Merge pull request #10656 from rabbitmq/loic-remove-cqv1-option
4.x: remove availability of CQv1
2024-05-13 14:33:16 +02:00
Loïc Hoguin ecf46002e0
Remove availability of CQv1
We reject CQv1 in rabbit.schema as well.

Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.

We don't shard the CQ property suite anymore:
there's only 1 case remaining.
2024-05-13 13:06:07 +02:00
David Ansari 6b300a2f34 Fix dead lettering
# What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
2024-05-13 11:00:39 +02:00
Karl Nilsson 49a4c66fce Move feature flag check outside of mc
the `mc` module is ideally meant to be kept pure and portable
 and feature flags have external infrastructure dependencies
as well as impure semantics.

Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
2024-05-13 10:05:40 +02:00
David Ansari 6018155e9b Add property test for AMQP serializer and parser 2024-05-02 07:56:00 +00:00
David Ansari 6225dc9928 Do not parse entire AMQP body
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.

Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.

In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
2024-05-02 07:56:00 +00:00
David Ansari 0697c20d70 Support end-to-end checksums over the bare message
This commit enables client apps to automatically perform end-to-end
checksumming over the bare message (i.e. body + application defined
headers).

This commit allows an app to configure the AMQP client:
* for a sending link to automatically compute CRC-32 or Adler-32
checksums over each bare message including the computed checksum
as a footer annotation, and
* for a receiving link to automatically lookup the expected CRC-32
or Adler-32 checksum in the footer annotation and, if present, check
the received checksum against the actually computed checksum.

The commit comes with the following advantages:
1. Transparent end-to-end checksumming. Although checksumming is
   performed by TCP and RabbitMQ queues using the disk, end-to-end
   checksumming is a level higher up and can therefore detect bit flips
   within RabbitMQ nodes or load balancers and other bit flips that
   went unnoticed.
2. Not only is the body checksummed, but also the properties and
   application-properties sections. This is an advantage over AMQP 0.9.1
   because the AMQP protocol disallows modification of the bare message.
3. This commit is currently used for testing the RabbitMQ AMQP
   implementation, but it shows the feasiblity of how apps could also
   get integrity guarantees of the whole bare message using HMACs or
   signatures.
2024-05-02 07:56:00 +00:00
David Ansari 1d02ea9e55 Fix crashes when message gets dead lettered
Fix crashes when message is originally sent via AMQP and
stored within a classic or quorum queue and subsequently
dead lettered where the dead letter exchange needs access to message
annotations or properties or application-properties.
2024-05-02 07:56:00 +00:00
David Ansari e8e9ef32cb Delete unused module rabbit_msg_record 2024-05-02 07:56:00 +00:00
David Ansari fc7f458f7c Fix tests 2024-05-02 07:56:00 +00:00
David Ansari 2ee246c3b4 Fix MQTT -> Stream
and preserve original delivery_mode field
i.e. leave it undefined if it was sent as undefined
2024-05-02 07:56:00 +00:00
David Ansari 4980669502 New mc_amqp state 2024-05-02 07:55:59 +00:00
Karl Nilsson 02b62746bc Better handle QQ delete member operation when member already removed 2024-04-29 16:22:44 +01:00
Loïc Hoguin 9fbc0fbfa4
Merge pull request #11076 from rabbitmq/loic-fix-cq-shared-store-crashes
CQ: Fix shared store crashes
2024-04-29 09:59:27 +02:00
Michael Klishin 2ccf0c978a
Merge pull request #11077 from rabbitmq/md/vhost-notify-deletions
Notify of permission and parameter deletions when deleting a vhost
2024-04-26 19:46:31 -04:00
Loïc Hoguin fcd011f9cf
CQ: Fix shared store crashes
Crashes could happen because compaction would wrongly write
over valid messages, or truncate over valid messages, because
when looking for messages into the files it would encounter
leftover data that made it look like there was a message,
which prompted compaction to not look for the real messages
hidden within.

To avoid this we ensure that there can't be leftover data
as a result of compaction. We get this guarantee by blanking
data in the holes in the file before we start copying messages
closer to the start of the file. This requires us to do a few
more writes but we know that the only data in the files at any
point are valid messages.

Note that it's possible that some of the messages in the files
are no longer referenced; that's OK. We filter them out after
scanning the file.

This was also a good time to merge two almost identical scan
functions, and be more explicit about what messages should be
dropped after scanning the file (the messages no longer in the
ets index and the fan-out messages that ended up re-written in
a more recent file).
2024-04-26 13:03:34 +02:00
David Ansari 3fd1d0aa32 Fix AMQP fragmentation test in CI
```
make -C deps/rabbit ct-amqp_system t=dotnet:fragmentation
```

fails in the new make CI with:
```
amqp_system_SUITE > dotnet > fragmentation
    #1. {error,{{badmatch,{error,134,
                                 "Unhandled exception. Amqp.AmqpException: Invalid frame size:527, maximum frame size:512.\n   at Amqp.Connection.ThrowIfClosed(String operation)\n   at Amqp.Connection.AddSession(Session session)\n   at Amqp.Session..ctor(Connection connection, Begin begin, OnBegin onBegin)\n   at Amqp.Session..ctor(Connection connection)\n   at Program.AmqpClient.connectWithOpen(String uri, Open opn) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 53\n   at Program.Test.fragmentation(String uri) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 284\n   at Program.main(String[] argv) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 533\n"}},
                [{amqp_system_SUITE,run_dotnet_test,2,
                                    [{file,"amqp_system_SUITE.erl"},
                                     {line,228}]},
                 {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]},
                 {test_server,run_test_case_eval1,6,
                              [{file,"test_server.erl"},{line,1302}]},
                 {test_server,run_test_case_eval,9,
                              [{file,"test_server.erl"},{line,1234}]}]}}
```

RabbitMQ includes its node name and cluster name in the open frame to
the client. Running this test locally shows an open frame size of 467
bytes.
The suspicion is that the node name and cluster name in CI is longer
causing the open frame from RabbitMQ to the client to exceed the frame size
of 512 bytes.
2024-04-26 11:25:09 +02:00
Michael Davis f0938312ae
vhost_SUITE: Clean up default limit and policy changes in cases
These changes to the defaults can affect other test cases. In particular
it will affect the time to live and deletion events in the vhost
deletion idempotency case.
2024-04-25 10:26:09 -04:00
Michael Davis f90c9d3014
Notify of runtime parameter deletion when deleting a vhost 2024-04-25 10:26:09 -04:00
Michael Davis d8fcfc6761
Notify of permission deletions when deleting a vhost 2024-04-25 10:26:09 -04:00
Jean-Sébastien Pédron dbf9dfbfa7
peer_discovery_tmp_hidden_node_SUITE: New testsuite for the temporary hidden node
... used by peer discovery.
2024-04-16 10:19:42 +02:00
David Ansari 5ad61fe9f3 Fix function_clause when AMQP connection fails
```
make -C deps/rabbit ct-amqp_auth t=address_v2:vhost_absent
```
previously resulted in a function_clause error printing the
following crash report in the client:
```
=ERROR REPORT==== 10-Apr-2024::15:23:39.866688 ===
** State machine <0.207.0> terminating
** Last event = {info,{'DOWN',#Ref<0.1404963469.3720347649.257117>,process,
                              <0.208.0>,normal}}
** When server state  = {open_sent,
                         {state,1,<0.206.0>,
                          #Ref<0.1404963469.3720347649.257117>,<0.209.0>,[],
                          <0.208.0>,
                          {tcp,#Port<0.16>},
                          undefined,undefined,
                          #{notify => <0.204.0>,port => 21000,
                            address => "localhost",
                            sasl =>
                             {plaintext,
                              <<131,104,3,119,5,112,108,97,105,110,109,0,0,0,
                                9,116,101,115,116,32,117,115,101,114,109,0,0,
                                0,9,116,101,115,116,32,117,115,101,114>>},
                            hostname => <<"vhost:this vhost does not exist">>,
                            container_id => <<"my container">>,
                            max_frame_size => 1048576,
                            notify_when_opened => <0.204.0>,
                            notify_when_closed => <0.204.0>,
                            tls_opts => undefined,
                            transfer_limit_margin => 0}}}
** Reason for termination = error:function_clause
** Callback modules = [amqp10_client_connection]
** Callback mode = state_functions
** Stacktrace =
**  [{amqp10_client_connection,open_sent,
         [info,
          {'DOWN',#Ref<0.1404963469.3720347649.257117>,process,<0.208.0>,
              normal},
          {state,1,<0.206.0>,#Ref<0.1404963469.3720347649.257117>,<0.209.0>,
              [],<0.208.0>,
              {tcp,#Port<0.16>},
              undefined,undefined,
              #{notify => <0.204.0>,port => 21000,address => "localhost",
                sasl =>
                    {plaintext,
                        <<131,104,3,119,5,112,108,97,105,110,109,0,0,0,9,116,
                          101,115,116,32,117,115,101,114,109,0,0,0,9,116,101,
                          115,116,32,117,115,101,114>>},
                hostname => <<"vhost:this vhost does not exist">>,
                container_id => <<"my container">>,max_frame_size => 1048576,
                notify_when_opened => <0.204.0>,
                notify_when_closed => <0.204.0>,transfer_limit_margin => 0}}],
         [{file,"amqp10_client_connection.erl"},{line,255}]},
     {gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1395}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}]

=CRASH REPORT==== 10-Apr-2024::15:23:39.867521 ===
  crasher:
    initial call: amqp10_client_connection:init/1
    pid: <0.207.0>
    registered_name: []
    exception error: no function clause matching
                     amqp10_client_connection:open_sent(info,
                                                        {'DOWN',
                                                         #Ref<0.1404963469.3720347649.257117>,
                                                         process,<0.208.0>,
                                                         normal},
                                                        {state,1,<0.206.0>,
                                                         #Ref<0.1404963469.3720347649.257117>,
                                                         <0.209.0>,[],
                                                         <0.208.0>,
                                                         {tcp,#Port<0.16>},
                                                         undefined,undefined,
                                                         #{notify => <0.204.0>,
                                                           port => 21000,
                                                           address =>
                                                            "localhost",
                                                           sasl =>
                                                            {plaintext,
                                                             <<131,104,3,119,
                                                               5,112,108,97,
                                                               105,110,109,0,
                                                               0,0,9,116,101,
                                                               115,116,32,117,
                                                               115,101,114,
                                                               109,0,0,0,9,
                                                               116,101,115,
                                                               116,32,117,115,
                                                               101,114>>},
                                                           hostname =>
                                                            <<"vhost:this vhost does not exist">>,
                                                           container_id =>
                                                            <<"my container">>,
                                                           max_frame_size =>
                                                            1048576,
                                                           notify_when_opened =>
                                                            <0.204.0>,
                                                           notify_when_closed =>
                                                            <0.204.0>,
                                                           transfer_limit_margin =>
                                                            0}}) (amqp10_client_connection.erl, line 255)
      in function  gen_statem:loop_state_callback/11 (gen_statem.erl, line 1395)
    ancestors: [<0.206.0>,amqp10_client_sup,<0.182.0>]
    message_queue_len: 0
    messages: []
    links: [<0.206.0>]
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 10958
    stack_size: 28
    reductions: 16408
  neighbours:
```
2024-04-10 16:52:06 +02:00
David Ansari c57ece8207 Extend AMQP message interceptor test 2024-04-10 11:40:49 +02:00
David Ansari 390d5715a0 Introduce new AMQP 1.0 address format
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.

The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

 ## Why?

The AMQP address v1 format comes with the following flaws:

1. Obscure address format:

Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.

2. Implicit creation of topologies

Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.

3. Redundant address formats

```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.

4. Properties section must be parsed to determine whether a routing key is present

Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.

5. Using `subject` as routing key misuses the purpose of this field.

According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.

6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.

The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.

7. Clients must create a separate link per target exchange

While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.

Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.

 ## How?

 ### Design goals

Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
   understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
   Instead, topologies should be created either explicitly via the new management node
   prior to link attachment (see #10559), or in future, we might support the `dynamic`
   source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
   or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
   fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
   valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.

Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
   RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
   between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
   to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
   We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.

 ### Additional Context

The address is usually a String, but can be of any type.

The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.

> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.

The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.

The following v2 address formats will be used.

 ### v2 addresses

A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2

without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.

 ### v2 source addresses

The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.

 ### v2 target addresses

v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.

The `to` field requires an address type and is better suited than the `subject field.

Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.

The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.

```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).

```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.

v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.

(The bare message must not be modified because it could be signed.)

The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.

 ### v2 address format reference

To sump up (and as stated at the top of this commit message):

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

Hence, all 8 listed design goals are reached.
2024-04-05 12:22:02 +02:00
David Ansari dda1c500da Require feature flag message_containers
as it is required for Native AMQP 1.0 in 4.0.

Remove compatibility code.
2024-04-04 15:11:31 +02:00
David Ansari 34862063d0 Skip flaky mixed version test
```
bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [cluster_size_3] -case async_notify_unsettled_classic_queue" --config=rbe-26 --runs_per_test=40
```
was failing 8 out of 40 times.

Skip this test as we know that link flow control with classic queues is
broken in 3.13:
https://github.com/rabbitmq/rabbitmq-server/issues/2597

Credit API v2 in RabbitMQ 4.0 fixes this bug.
2024-04-03 19:14:35 +02:00
David Ansari 2815c71f86 Skip test assertion
Not only are quorum queues wrongly implemented, but also classic queues
when draining in 3.13.

Like quorum queues, classsic queues reply with a send_drained event
before delivering the message(s).

Therefore, we have to skip the drain test in such mixed version
clusters where the leader runs on the old (3.13.1) node.

The new 4.0 implementation with credit API v2 fixes this bug.
2024-04-03 17:51:53 +02:00
Ayanda Dube 3a16d4d03e remove deprecated queue_explicit_gc_run_operation_threshold config 2024-03-28 15:32:36 +00:00
David Ansari 8a3f3c6f34 Enable AMQP 1.0 clients to manage topologies
## What?

* Allow AMQP 1.0 clients to dynamically create and delete RabbitMQ
  topologies (exchanges, queues, bindings).
* Provide an Erlang AMQP 1.0 client that manages topologies.

 ## Why?

Today, RabbitMQ topologies can be created via:
* [Management HTTP API](https://www.rabbitmq.com/docs/management#http-api)
  (including Management UI and
  [messaging-topology-operator](https://github.com/rabbitmq/messaging-topology-operator))
* [Definition Import](https://www.rabbitmq.com/docs/definitions#import)
* AMQP 0.9.1 clients

Up to RabbitMQ 3.13 the RabbitMQ AMQP 1.0 plugin auto creates queues
and bindings depending on the terminus [address
format](https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing).

Such implicit creation of topologies is limiting and obscure.
For some address formats, queues will be created, but not deleted.

Some of RabbitMQ's success is due to its flexible routing topologies
that AMQP 0.9.1 clients can create and delete dynamically.

This commit allows dynamic management of topologies for AMQP 1.0 clients.
This commit builds on top of Native AMQP 1.0 (PR #9022) and will be
available in RabbitMQ 4.0.

 ## How?

This commits adds the following management operations for AMQP 1.0 clients:
* declare queue
* delete queue
* purge queue
* bind queue to exchange
* unbind queue from exchange
* declare exchange
* delete exchange
* bind exchange to exchange
* unbind exchange from exchange

Hence, at least the AMQP 0.9.1 management operations are supported for
AMQP 1.0 clients.

In addition the operation
* get queue

is provided which - similar to `declare queue` - returns queue
information including the current leader and replicas.
This allows clients to publish or consume locally on the node that hosts
the queue.

Compared to AMQP 0.9.1 whose commands and command fields are fixed, the
new AMQP Management API is extensible: New operations and new fields can
easily be added in the future.

There are different design options how management operations could be
supported for AMQP 1.0 clients:
1. Use a special exchange type as done in https://github.com/rabbitmq/rabbitmq-management-exchange
  This has the advantage that any protocol client (e.g. also STOMP clients) could
  dynamically manage topologies. However, a special exchange type is the wrong abstraction.
2. Clients could send "special" messages with special headers that the broker interprets.

This commit decided for a variation of the 2nd option using a more
standardized way by re-using a subest of the following latest AMQP 1.0 extension
specifications:
* [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021)
* [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65571) (July 2019)
* [AMQP Management Version 1.0 - Working Draft 16](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65575) (July 2019)

An important goal is to keep the interaction between AMQP 1.0 client and RabbitMQ
simple to increase usage, development and adoptability of future RabbitMQ AMQP 1.0
client library wrappers.

The AMQP 1.0 client has to create a link pair to the special `/management` node.
This allows the client to send and receive from the management node.
Similar to AMQP 0.9.1, there is no need for a reply queue since the reply
will be sent directly to the client.

Requests and responses are modelled via HTTP, but sent via AMQP using
the `HTTP Semantics and Content over AMQP` extension (henceforth `HTTP
over AMQP` extension).

This commit tries to follow the `HTTP over AMQP` extension as much as
possible but deviates where this draft spec doesn't make sense.

The projected mode §4.1 is used as opposed to tunneled mode §4.2.
A named relay `/management` is used (§6.3) where the message field `to` is the URL.

Deviations are
* §3.1 mandates that URIs are not encoded in an AMQP message.
  However, we percent encode URIs in the AMQP message. Otherwise there
  is for example no way to distinguish a `/` in a queue name from the
  URI path separator `/`.
* §4.1.4 mandates a data section. This commit uses an amqp-value section
  as it's a better fit given that the content is AMQP encoded data.

Using an HTTP API allows for a common well understood interface and future extensibility.
Instead of re-using the current RabbitMQ HTTP API, this commit uses a
new HTTP API (let's call it v2) which could be used as a future API for
plain HTTP clients.

 ### HTTP API v1

The current HTTP API (let's call it v1) is **not** used since v1
comes with a couple of weaknesses:

1. Deep level of nesting becomes confusing and difficult to manage.

Examples of deep nesting in v1:
```
/api/bindings/vhost/e/source/e/destination/props
/api/bindings/vhost/e/exchange/q/queue/props
```

2. Redundant endpoints returning the same resources

v1 has 9 endpoints to list binding(s):
```
/api/exchanges/vhost/name/bindings/source
/api/exchanges/vhost/name/bindings/destination
/api/queues/vhost/name/bindings
/api/bindings
/api/bindings/vhost
/api/bindings/vhost/e/exchange/q/queue
/api/bindings/vhost/e/exchange/q/queue/props
/api/bindings/vhost/e/source/e/destination
/api/bindings/vhost/e/source/e/destination/props
```

3. Verbs in path names
Path names should be nouns instead.
v1 contains verbs:
```
/api/queues/vhost/name/get
/api/exchanges/vhost/name/publish
```

 ### AMQP Management extension

Only few aspects of the AMQP Management extension are used.

The central idea of the AMQP management spec is **dynamic discovery** such that broker independent AMQP 1.0
clients can discover objects, types, operations, and HTTP endpoints of specific brokers.
In fact, clients are only conformant if:
> All request addresses are dynamically discovered starting from the discovery document.
> A requesting container MUST NOT use fixed assumptions about the addressing structure of the management API.

While this is a nice and powerful idea, no AMQP 1.0 client and no AMQP 1.0 server implement the
latest AMQP 1.0 management spec from 2019, partly presumably due to its complexity.
Therefore, the idea of such dynamic discovery has failed to be implemented in practice.

The AMQP management spec mandates that the management endpoint returns a discovery document containing
broker specific collections, types, configuration, and operations including their endpoints.

The API endpoints of the AMQP management spec are therefore all designed around dynamic discovery.

For example, to create either a queue or an exchange, the client has to
```
POST /$management/entities
```
which shows that the entities collection acts as a generic factory, see section 2.2.
The server will then create the resource and reply with a location header containing a URI pointing to the resource.
For RabbitMQ, we don’t need such a generic factory to create queues or exchanges.

To list bindings for a queue Q1, the spec suggests
```
GET /$management/Queues/Q1/$management/entities
```
which again shows the generic entities endpoint as well as a `$management` endpoint under Q1 to
allow a queue to return a discovery document.
For RabbitMQ, we don’t need such generic endpoints and discovery documents.

Given we aim for our own thin RabbitMQ AMQP 1.0 client wrapper libraries which expose
the RabbitMQ model to the developer, we can directly use fixed HTTP endpoint assumptions
in our RabbitMQ specific libraries.

This is by far simpler than using the dynamic endpoints of the management spec.
Simplicity leads to higher adoption and enables more developers to write RabbitMQ AMQP 1.0 client
library wrappers.

The AMQP Management extension also suffers from deep level of nesting in paths
Examples:
```
/$management/Queues/Q1/$management/entities
/$management/Queues/Q1/Bindings/Binding1
```
as well as verbs in path names: Section 7.1.4 suggests using verbs in path names,
for example “purge”, due to the dynamic operations discovery document.

 ### HTTP API v2

This commit introduces a new HTTP API v2 following best practices.
It could serve as a future API for plain HTTP clients.

This commit and RabbitMQ 4.0 will only implement a minimal set of
HTTP API v2 endpoints and only for HTTP over AMQP.
In other words, the existing HTTP API v1 Cowboy handlers will continue to be
used for all plain HTTP requests in RabbitMQ 4.0 and will remain untouched for RabbitMQ 4.0.
Over time, after 4.0 shipped, we could ship a pure HTTP API implementation for HTTP API v2.
Hence, the new HTTP API v2 endpoints for HTTP over AMQP should be designed such that they
can be re-used in the future for a pure HTTP implementation.

The minimal set of endpoints for RabbitMQ 4.0 are:

``
GET / PUT / DELETE
/vhosts/:vhost/queues/:queue
```
read, create, delete a queue

```
DELETE
/vhosts/:vhost/queues/:queue/messages
```
purges a queue

```
GET / DELETE
/vhosts/:vhost/bindings/:binding
```
read, delete bindings
where `:binding` is a binding ID of the following path segment:
```
src=e1;dstq=q2;key=my-key;args=
```
Binding arguments `args` has an empty value by default, i.e. there are no binding arguments.
If the binding includes binding arguments, `args` will be an Erlang portable term hash
provided by the server similar to what’s provided in HTTP API v1 today.
Alternatively, we could use an arguments scheme of:
```
args=k1,utf8,v1&k2,uint,3
```
However, such a scheme leads to long URIs when there are many binding arguments.
Note that it’s perfectly fine for URI producing applications to include URI
reserved characters `=` / `;` / `,` / `$` in a path segment.

To create a binding, the client therefore needs to POST to a bindings factory URI:
```
POST
/vhosts/:vhost/bindings
```

To list all bindings between a source exchange e1 and destination exchange e2 with binding key k1:
```
GET
/vhosts/:vhost/bindings?src=e1&dste=e2&key=k1
```

This endpoint will be called by the RabbitMQ AMQP 1.0 client library to unbind a
binding with non-empty binding arguments to get the binding ID before invoking a
```
DELETE
/vhosts/:vhost/bindings/:binding
```

In future, after RabbitMQ 4.0 shipped, new API endpoints could be added.
The following is up for discussion and is only meant to show the clean and simple design of HTTP API v2.

Bindings endpoint can be queried as follows:

to list all bindings for a given source exchange e1:
```
GET
/vhosts/:vhost/bindings?src=e1
```

to list all bindings for a given destination queue q1:
```
GET
/vhosts/:vhost/bindings?dstq=q1
```

to list all bindings between a source exchange e1 and destination queue q1:
```
GET
/vhosts/:vhost/bindings?src=e1&dstq=q1
```

multiple bindings between source exchange e1 and destination queue q1 could be deleted at once as follows:
```
DELETE /vhosts/:vhost/bindings?src=e1&dstq=q1
```

GET could be supported globally across all vhosts:
```
/exchanges
/queues
/bindings
```

Publish a message:
```
POST
/vhosts/:vhost/queues/:queue/messages
```

Consume or peek a message (depending on query parameters):
```
GET
/vhosts/:vhost/queues/:queue/messages
```

Note that the AMQP 1.0 client omits the `/vhost/:vhost` path prefix.
Since an AMQP connection belongs to a single vhost, there is no need to
additionally include the vhost in every HTTP request.

Pros of HTTP API v2:

1. Low level of nesting

Queues, exchanges, bindings are top level entities directly under vhosts.
Although the HTTP API doesn’t have to reflect how resources are stored in the database,
v2 does nicely reflect the Khepri tree structure.

2. Nouns instead of verbs
HTTP API v2 is very simple to read and understand as shown by
```
POST    /vhosts/:vhost/queues/:queue/messages	to post messages, i.e. publish to a queue.
GET     /vhosts/:vhost/queues/:queue/messages	to get messages, i.e. consume or peek from a queue.
DELETE  /vhosts/:vhost/queues/:queue/messages	to delete messages, i.e. purge a queue.
```

A separate new HTTP API v2 allows us to ship only handlers for HTTP over AMQP for RabbitMQ 4.0
and therefore move faster while still keeping the option on the table to re-use the new v2 API
for pure HTTP in the future.
In contrast, re-using the HTTP API v1 for HTTP over AMQP is possible, but dirty because separate handlers
(HTTP over AMQP and pure HTTP) replying differently will be needed for the same v1 endpoints.
2024-03-28 11:36:56 +01:00
Michael Klishin a04b092274
Merge pull request #10742 from rabbitmq/md-fix-vhost-recovery-queue-update
Update durable queues outside of Khepri transactions
2024-03-20 10:06:29 -04:00
Michael Klishin 3564df8998 per_node_limit_SUITE: clean up created virtual hosts 2024-03-15 19:34:04 -04:00
Alex Valiushko 72317cc1b0 address feedback 2024-03-15 19:34:04 -04:00
Alex Valiushko d7374cb244 Add consumers per channel limit 2024-03-15 19:34:04 -04:00
Michael Davis 8a03b28880
Add a unit test for rabbit_amqqueue:mark_local_durable_queues_stopped/1 2024-03-14 14:24:01 -04:00
Loïc Hoguin 74b9811a5c
Tests for shared message store file scanner 2024-03-11 13:17:16 +01:00
Karl Nilsson e7f676d5cb rabbit_stream_queue_SUITE: test reliability with khepri
Khepri projections may not be immedately updated after a queue
declare so we need to await_condition to validate the leader
pid is updated correctly.
2024-03-06 11:27:05 +00:00
Jean-Sébastien Pédron 2afffd59bd
feature_flags_SUITE: Kill dangling spammer process in registry_concurrent_reloads()
[Why]
It looks like `exit(Spammer, normal)` doesn't terminate the process.
This leaves a dangling process around and seems to cause transient
failures in the `try_to_deadlock_in_registry_reload_1` testcase that
follows it.

[How]
We use `exit(Spammer, kill)` and a monitor to wait for the process to
actually terminate.
2024-03-06 10:58:25 +01:00
Karl Nilsson 58d26cd45b QQ: add feature flag to control adding non-voting members.
Without a feature flag it is possible to add a member on a newer node
with a Ra command format that the other nodes do not yet understand
resulting in crashed nodes.
2024-03-05 09:56:37 +00:00
David Ansari 3d066315e4 Bump AMQP.NET Lite to 2.4.9 2024-03-05 10:16:41 +01:00
David Ansari 118d8460ce Fix routing key for AMQP 0.9.1 reading from a stream
when message was published to a stream via the stream protocol.

Fixes the following test:
```
./mvnw test -Dtest=AmqpInteroperabilityTest#publishToStreamConsumeFromStreamQueue
```
2024-03-04 11:32:39 -05:00
David Ansari 99dd040975 Cache exchange record
for default and pre-declared exchanges to save copying
the #exchange{} record (i.e. save an ETS lookup call) on
every received message.

The default and pre-declared exchanges are protected from deletion and
modification. Exchange routing decorators are not used in tier 1 plugins
and in no open source tier 2 plugin.
2024-03-04 11:32:39 -05:00
David Ansari df50e50473 Delete dead code 2024-02-28 14:15:20 +01:00
David Ansari b0142287c7 Protect receiving app from being overloaded
What?

Protect receiving application from being overloaded with new messages
while still processing existing messages if the auto credit renewal
feature of the Erlang AMQP 1.0 client library is used.

This feature can therefore be thought of as a prefetch window equivalent
in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum.

How?

The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented.
This commit takes the same approach as done in the server:
The incoming_unsettled map is hold in the link instead of in the session
to accurately and quickly determine the number of unsettled messages for
a receiving link.

The amqp10_client lib will grant more credits to the sender when the sum
of remaining link credits and number of unsettled deliveries falls below
the threshold RenewWhenBelow.

This avoids maintaning additional state like the `link_credit_unsettled`
or an alternative delivery_count_settled sequence number which is more
complex to implement correctly.

This commit breaks the amqp10_client_session:disposition/6 API:
This commit forces the client application to only range settle for a
given link, i.e. not across multiple links on a given session at once.
The latter is allowed according to the AMQP spec.
2024-02-28 14:15:20 +01:00
David Ansari 6485948dda Terminate channels and queue collector after reader
What?

To not risk any regressions, keep the behaviour of RabbitMQ 3.x
where channel processes and connection helper processes such as
rabbit_queue_collector and rabbit_heartbeat are terminated after
rabbit_reader process.

For example, when RabbitMQ terminates with SIGTERM, we want
exclusive queues being deleted synchronously (as in 3.x).

Prior to this commit:
1. java -jar target/perf-test.jar -x 0 -y 1
2. ./sbin/rabbitmqctl stop_app
resulted in the following crash:
```
crasher:
  initial call: rabbit_reader:init/2
  pid: <0.2389.0>
  registered_name: []
  exception exit: {noproc,
                      {gen_server,call,[<0.2391.0>,delete_all,infinity]}}
    in function  gen_server:call/3 (gen_server.erl, line 419)
    in call from rabbit_reader:close_connection/1 (rabbit_reader.erl, line 683)
    in call from rabbit_reader:send_error_on_channel0_and_close/4 (rabbit_reader.erl, line 1668)
    in call from rabbit_reader:handle_dependent_exit/3 (rabbit_reader.erl, line 710)
    in call from rabbit_reader:mainloop/4 (rabbit_reader.erl, line 530)
    in call from rabbit_reader:run/1 (rabbit_reader.erl, line 452)
    in call from rabbit_reader:start_connection/4 (rabbit_reader.erl, line 351)
```
because rabbit_queue_collector was terminated before rabbit_reader.
This commit fixes this crash.

How?

Any Erlang supervisor including the rabbit_connection_sup supervisor
terminates its children in the opposite of the start order.
Since we want channel and queue collector processes - children of
rabbit_connection_helper_sup - be terminated after the
reader process, we must start rabbit_connection_helper_sup before the
reader process.

Since rabbit_connection_sup - the ranch_protocol implementation - does
not know yet whether it will supervise an AMQP 0.9.1 or AMQP 1.0
connection, it creates rabbit_connection_helper_sup for each AMQP protocol
version removing the superfluous one as soon as the protocol version negotation is
completed. Spawning and deleting this addition process has a negligible
effect on performance.

The whole problem is that the rabbit_connection_helper_sup differs in
its supervisor flags for AMQP 0.9.1 and AMQP 1.0 when it is started
because for Native AMQP 1.0 in 4.0 we remove the unnecessary
rabbit_amqp1_0_session_sup_sup supervisor level.
Therefore, we achieve our goal:
* in Native AMQP 1.0, 1 additional Erlang process is created per session
* in AMQP 1.0 in 3.x, 15 additional Erlang processes are created per session
2024-02-28 14:15:20 +01:00
David Ansari 7858532def Allow no routing key
to be set in both target address and subject.
2024-02-28 14:15:20 +01:00
David Ansari f886ab90a1 Fix crash
```
./omq amqp -t /queue --amqp-subject foo -C 1
```
crashed with
```
reason: {badarg,
            [{erlang,list_to_binary,
                 [undefined],
             {rabbit_amqp_session,ensure_terminus,5,
                 [{file,"rabbit_amqp_session.erl"},{line,2046}]},
             {rabbit_amqp_session,ensure_target,3,
                 [{file,"rabbit_amqp_session.erl"},{line,1705}]},
             {rabbit_amqp_session,handle_control,2,
                 [{file,"rabbit_amqp_session.erl"},{line,715}]},
             {rabbit_amqp_session,handle_cast,2,
                 [{file,"rabbit_amqp_session.erl"},{line,331}]},
             {gen_server,try_handle_cast,3,
                 [{file,"gen_server.erl"},{line,1121}]},
             {gen_server,handle_msg,6,
                 [{file,"gen_server.erl"},{line,1183}]},
             {proc_lib,init_p_do_apply,3,
                 [{file,"proc_lib.erl"},{line,241}]}]}
```
2024-02-28 14:15:20 +01:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
David Ansari 8a2a7406de Fix AMQP 0.9.1 headers regression
Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/10620

Up to RabbitMQ 3.12:
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
  unset, RabbitMQ will deliver an AMQP 0.9.1 message with
  P_basic.headers unset.
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
  being an empty list ([]), RabbitMQ will deliver an AMQP 0.9.1 message with
  P_basic.headers being an empty list ([]).

In 3.13 including message containers, the 1st behaviour stayed the same
while the 2nd behaviour changed to:
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
  being an empty list ([]), RabbitMQ will deliver an AMQP 0.9.1 message with
  P_basic.headers unset.

This commit fixes this regression by using the same behaviour as in
3.12.
2024-02-27 11:12:05 +01:00
David Ansari 8b151f43f7 Parse 2 bytes encoded AMQP boolean to Erlang boolean
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean

Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding

Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.

However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)

Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
2024-02-16 17:57:28 +01:00
Jean-Sébastien Pédron 21975a5c25
mirrored_supervisor: Restore child ID format
[Why]
The format was changed to be compatible with Khepri paths. However, this
ID is used in in-memory states here and there as well. So changing its
format makes upgrades complicated because the code has to handle both
the old and new formats possibly used by the mirrored supervisor already
running on other nodes.

[How]
Instead, this patch converts the ID (in its old format) to something
compatible with a Khepri path only when we need to build a Khepri path.

This relies on the fact that the `Group` is a module and we can call it
to let it convert the opaque ID to a Khepri path.

While here, improve the type specs to document that a group is always a
module name and to document what a child ID can be.
2024-02-13 14:10:19 +01:00
Karl Nilsson 5317f958fb Streams: Soft remove policy configuration of max_segment_size_bytes
This configuration is not guaranteed to be safe to change after a stream has bee n
declared and thus we'll remove the ability to change it after the initial
declaration. Users should favour the x- queue arg for this config but it will still
be possible to configure it as a policy but it will only be evaluated at
declara tion time.

This means that if a policy is set for a stream that re-configures the
`stream-m ax-segment-size-bytes` key it will show in the UI as updated but
the pre-existing stream will not use the updated configuration.

The key has been removed from the UI but for backwards compatibility it is still
 settable.

NB: this PR adds a new command `update_config` to the stream coordinator state
machine. Strictly speaking this should require a new machine version but we're by
passing that by relying on the feature flag instead which avoids this command
being committed before all nodes have the new code version. A new machine version
can lower the availability properties during a rolling cluster upgrade so in
this case it is preferable to avoid that given the simplicity of the change.
2024-02-07 11:06:10 +00:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
Jean-Sébastien Pédron 2a0563b30e
rabbit_stream_queue_SUITE: Wait for ra_leaderboard to return the leader
[Why]
Sometimes, `ra_leaderboard:lookup_leader/1` will return `undefined`
because it doesn't know the leader yet. This leads to a failure of the
testcase with a `badmatch` exception.

[How]
We wait for the function to return a valid leader ID, then try again and
return the result.
2024-01-31 18:48:38 +01:00
Michael Klishin 4641e66c85
Merge pull request #10426 from rabbitmq/amazon-mq-global-quorum-critical
New upgrade time QQ health check: add check_if_new_quorum_queue_replicas_have_finished_initial_sync by @illotum (plus a test)
2024-01-26 23:08:26 -05:00
Karl Nilsson b1d7037c7d
Merge pull request #10331 from rabbitmq/stream-coordinator-fixes
Stream coordinator: fixes to automatic membership changes.
2024-01-26 15:39:27 +00:00
Karl Nilsson 3c964284f4 Make find_queue_info a bit more reliable and possibly faster. 2024-01-26 12:58:25 +00:00
Karl Nilsson 31cad52d82 Try making test backwards compatible. 2024-01-26 11:50:29 +00:00
Karl Nilsson dd16b7ef18 queue_type_SUITE: wait for stream replica before consuming.
Potential flake
2024-01-25 16:40:39 +00:00
Alex Valiushko 2177d4a5c8 add ability to list queues with local promotable replicas 2024-01-24 12:25:39 -08:00
Karl Nilsson c2640c8272 Add a stream test to mixed versions incompatible
Don't do clustering with mixed versions anyway. Please.
2024-01-24 12:12:03 +00:00
Karl Nilsson 71e7c33448 Stream coordinator: fixes to automatic membership changes.
Various bug fixes to make stream coordinator membership changes
more reliable. Previously various errors could happen as well as
partially successful attempts where the membership change command
may fail but it leaves the new server running.

Also ensure that stream coordinator members are removed as part of
the forget_cluster_node command.

Add stream coordinator status command.

To show the raft status of the stream coordinator just like is done
for quorum queues.
2024-01-24 11:02:42 +00:00
Arnaud Cogoluègnes 1f89ede396
Remove rabbit_authz_backend:state_can_expire/0
Use expiry_timestamp/1 instead, which returns 'never'
if the credentials do not expire.

Fixes #10382
2024-01-24 09:58:59 +01:00
Michael Klishin 7b151a7651 More missed (c) header updates 2024-01-22 23:44:47 -05:00
Michael Klishin c1d37e3e02
Merge pull request #10364 from rabbitmq/flaky-mc-flake-flake
Reduce flakiness of certain Common Test suites
2024-01-22 16:22:24 -05:00
Karl Nilsson d15aadff5f quorum_queues_SUITE: remove assertion that isn't guaranteed
The leader_locator_balanced_random_maintenance test is effectively
using a plain random approach so we cannot assert that there
definitely would be leaders on both potential nodes only that there
aren't any leaders on the node that is in maintenance mode.
2024-01-22 17:18:05 +00:00
Karl Nilsson 61df65e256 Test leader locator local with unique queue names.
Stream deletes aren't necessarily fully complete by the time the
queue.delete command returns as the stream coordinator will do this
work async. By using unique queue names we avoid the need to do
additional polling / waiting for teh delete operation to be
fully completed.
2024-01-22 16:36:00 +00:00
Karl Nilsson 3d74945abf Handle case where queue info online returns [] 2024-01-22 15:27:35 +00:00
Karl Nilsson 0a814e945a rabbit_stream_queue:recover try a flush for info 2024-01-22 15:27:35 +00:00
Arnaud Cogoluègnes 3bd0fdc316
Merge pull request #10299 from rabbitmq/token-expiration-in-stream-connections
Take expiry timestamp into account in stream connections
2024-01-22 10:23:20 +01:00
Michael Klishin fb775afcc8
More (c) source header updates #9969 2024-01-19 19:53:28 -05:00
Michael Klishin b2180a558b Make rabbitmqctl rename_cluster_node's friend, update_cluster_nodes, a no-op 2024-01-19 11:22:16 -05:00
Michael Klishin 1556fec127 Make 'rabbitmqctl rename_cluster_node' a no-op
This makes a command that renames cluster members
a no-op. This command is really complex under
the hood and is fundamentally incompatible
with a few key Raft-based features:

 * Khepri
 * Quorum queues
 * Streams

Because Khepri first ships in RabbitMQ 3.13,
now is the time to effectively eliminate this
command.

It will be permanently removed together with
other deprecated CLI commands in 4.0.

Per discussion with the team.

Closes #10367.
2024-01-19 11:22:16 -05:00
Arnaud Cogoluègnes 33c64d06ea
Add expiry_timestamp/1 callback to authz backend behavior
Backends return 'never' or the timestamp of the expiry time
of the credentials. Only the OAuth2 backend returns a timestamp,
other RabbitMQ authz backends return 'never'.

Client code uses rabbit_access_control, so it contains now
a new expiry_timestamp/1 function that returns the earliest
expiry time of the underlying backends.

Fixes #10298
2024-01-19 14:46:47 +01:00
Karl Nilsson 77e15dc971 Streams tests: no need to delete test queue if already deleted. 2024-01-19 13:05:43 +00:00
Karl Nilsson 5665880a2b flake: improve racy test in quorum_queue_SUITE
per_message_ttl test woudl publish a message with a short ttl
 then assert on info counters. On a slow system it is possible
that the message expires before the test could observe the counter
change.
2024-01-19 11:57:44 +00:00
Jean-Sébastien Pédron 5abeb753f3
Merge pull request #10355 from rabbitmq/fix-logging_SUITE-after-move-of-rabbitmq_prelaunch-to-deps
Fix failures in `logging_SUITE` after the move of `rabbitmq_prelaunch`
2024-01-18 16:14:23 +01:00
Karl Nilsson 9a71595b5f Test reliability
fix
2024-01-18 14:47:03 +00:00
Karl Nilsson 85dc3b1737
Merge pull request #10353 from rabbitmq/list-running-opt
Do not contact disconnected nodes in rabbit_nodes:list_running/0
2024-01-18 14:36:00 +00:00
Jean-Sébastien Pédron 9a02e3e6ef
rabbit_logger_std_h: Mock calls to io:put_chars/2 during testing
[Why]
Before `rabbitmq_prelaunch` was moved from `deps/rabbit/apps` to `deps`,
it would inherit compile flags from `deps/rabbit`. Therefore, when
`rabbit` was tested, `rabbit_logger_std_h` simply replaced the calls to
`io:put_chars/2` with an internal macro to also call `ct:log/2`.

This is not possible anymore after the move and the move broke the
console-based testcases.

[How]
`rabbit_logger_std_h` now uses an indirect internal call to a wrapper of
`io:put_chars/2`. This allows the `logging_SUITE` to mock that call and
add the addition call to `ct:log/2`.

We need to do an explicit `?MODULE:io_put_chars/2` even though a local
call would work, otherwise meck can't intercept the calls.
2024-01-18 13:58:40 +01:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
Simon Unge 66a3cbcc94 Use infinity as config instead of 0 2024-01-17 20:19:38 +00:00
Simon Unge 30368ffe1f Use ets table size instead of count the elements 2024-01-17 13:54:41 -05:00
Simon Unge d137edc23a Add channel limit per node 2024-01-17 13:54:41 -05:00
Karl Nilsson 87664e9fcb Remove clustering test that assert on ram nodes.
Ram nodes are a deprecated feature and the actual assertion is
quite a complicated once that isn't easy to reason about as it
asserts on the cluster view of nodes that that have their
rabbit app stopped.
2024-01-17 15:32:59 +00:00
Michael Klishin 48ce1b5ec7 Improve supported information units (Mi, Gi, Ti)
This revisits the information system conversion,
that is, support for suffixes like GiB, GB.

When configuration values like disk_free_limit.absolute,
vm_memory_high_watermark.absolute are set, the value
can contain an information unit (IU) suffix.

We now support several new suffixes and the meaning
a few more changes.

First, the changes:

 * k, K now mean kilobytes and not kibibytes
 * m, M now mean megabytes and not mebibytes
 * g, G now means gigabytes and not gibibytes

This is to match the system used by Kubernetes.
There is no consensus in the industry about how
"k", "m", "g", and similar single letter suffixes
should be treated. Previously it was a power of 2,
now a power of 10 to align with a very popular OSS
project that explicitly documents what suffixes it supports.

Now, the additions:

Finally, the node will now validate these suffixes
at boot time, so an unsupported value will cause
the node to stop with a rabbitmq.conf validation
error.

The message logged will look like this:

````
2024-01-15 22:11:17.829272-05:00 [error] <0.164.0> disk_free_limit.absolute invalid, supported formats: 500MB, 500MiB, 10GB, 10GiB, 2TB, 2TiB, 10000000000
2024-01-15 22:11:17.829376-05:00 [error] <0.164.0> Error preparing configuration in phase validation:
2024-01-15 22:11:17.829387-05:00 [error] <0.164.0>   - disk_free_limit.absolute invalid, supported formats: 500MB, 500MiB, 10GB, 10GiB, 2TB, 2TiB, 10000000000
````

Closes #10310
2024-01-15 22:11:57 -05:00
Michael Klishin d57fde54c5
Merge pull request #10186 from rabbitmq/message-received-at-timestamp
Timestamp (annotate with a timestamp) all messages automatically
2024-01-02 11:37:25 -05:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
Diana Parra Corbacho 8670aca85a Retrieve oldest message received timestamp 2024-01-01 21:14:50 -05:00
Diana Parra Corbacho a52c9d0c30 Set received-at-timestamp annotation (rts) for all messages 2024-01-01 21:14:50 -05:00
Diana Parra Corbacho a363737337 CLI: list_deprecated_features command
Lists all or used deprecated features
2023-12-11 12:51:13 +01:00
Michael Klishin 9d94048852 Avoid importing resources without specified virtual host
On boot first and foremost. Log a more helpful message.

 See #10068 for the background.
2023-12-08 02:35:17 -05:00
Michael Klishin 62fffb6634 Another take at #10068
Scan queues, exchanges and bindings before attempting
to import anything on boot. If they miss the virtual
host field, fail early and log a sensible message.
2023-12-08 01:39:47 -05:00
Michael Klishin e7273f6118
Merge pull request #10059 from rabbitmq/cache-oldest-timestamp
QQ: Cache oldest_entry_timestamp
2023-12-07 16:20:56 -05:00
Jean-Sébastien Pédron 2504eb7f0d
rabbit_peer_discovery: Check selected node's DB readyness
[Why]
If a node joins the selected node but the selected node's DB layer is
not ready, it will fail and the whole peer discovery process will
restart (until the selected node is ready).

That's fine, but scary messages are logged for a situation that is not
really an actual error at this point.

[How]
While querying properties of all discovered nodes, we also check is the
DB layer is ready using `rabbit_db:is_init_finished/0`. We then use this
property to determine if we can try to join or if we should wait and
retry.

This avoids a join which we know will fail eventually, and thus error
messages.
2023-12-07 15:51:54 +01:00
Jean-Sébastien Pédron 84cede17e1
rabbit_peer_discovery: Rewrite core logic
[Why]
This work started as an effort to add peer discovery support to our
Khepri integration. Indeed, as part of the task to integrate Khepri, we
missed the fact that `rabbit_peer_discovery:maybe_create_cluster/1` was
called from the Mnesia-specific code only. Even though we knew about it
because we hit many issues caused by the fact the `join_cluster` and
peer discovery use different code path to create a cluster.

To add support for Khepri, the first version of this patch was to move
the call to `rabbit_peer_discovery:maybe_create_cluster/1` from
`rabbit_db_cluster` instead of `rabbit_mnesia`. To achieve that, it made
sense to unify the code and simply call `rabbit_db_cluster:join/2`
instead of duplicating the work.

Unfortunately, doing so highlighted another issue: the way the node to
cluster with was selected. Indeed, it could cause situations where
multiple clusters are created instead of one, without resorting to
out-of-band counter-measures, like a 30-second delay added in the
Kubernetes operator (rabbitmq/cluster-operator#1156). This problem was
even more frequent when we tried to unify the code path and call
`join_cluster`.

After several iterations on the patch and even more discussions with the
team, we decided to rewrite the algorithm to make node selection more
robust and still use `rabbit_db_cluster:join/2` to create the cluster.

[How]
This commit is only about the rewrite of the algorithm. Calling peer
discovery from `rabbit_db_cluster` instead of `rabbit_mnesia` (and thus
making peer discovery work with Khepri) will be done in a follow-up
commit.

We wanted the new algorithm to fulfill the following properties:

1. `rabbit_peer_discovery` should provide the ability to re-trigger it
   easily to re-evaluate the cluster. The new public API is
   `rabbit_peer_discovery:sync_desired_cluster/0`.

2. The selection of the node to join should be designed in a way that
   all nodes select the same, regardless of the order in which they
   become available. The adopted solution is to sort the list of
   discovered nodes with the following criterias (in that order):

    1. the size of the cluster a discovered node is part of; sorted from
       bigger to smaller clusters
    2. the start time of a discovered node; sorted from older to younger
       nodes
    3. the name of a discovered node; sorted alphabetically

   The first node in that list will not join anyone and simply proceed
   with its boot process. Other nodes will try to join the first node.

3. To reduce the chance of incorrectly having multiple standalone nodes
   because the discovery backend returned only a single node, we want to
   apply the following constraints to the list of nodes after it is
   filtered and sorted (see property 2 above):

    * The list must contain `node()` (i.e. the node running peer
      discovery itself).
    * If the RabbitMQ's cluster size hint is greater than 1, the list
      must have at least two nodes. The cluster size hint is the maximum
      between the configured target cluster size hint and the number of
      elements in the nodes list returned by the backend.

   If one of the constraint is not met, the entire peer discovery
   process is restarted after a delay.

4. The lock is acquired only to protect the actual join, not the
   discovery step where the backend is queried to get the list of peers.
   With the node selection described above, this will let the first node
   to start without acquiring the lock.

5. The cluster membership views queried as part of the algorithm to sort
   the list of nodes will be used to detect additional clusters or
   standalone nodes that did not cluster correctly. These nodes will be
   asked to re-evaluate peer discovery to increase the chance of forming
   a single cluster.

6. After some delay, peer discovery will be re-evaluated to further
   eliminate the chances of having multiple clusters instead of one.

This commit covers properties from point 1 to point 4. Remaining
properties will be the scope of additional pull requests after this one
works.

If there is a failure at any point during discovery, filtering/sorting,
locking or joining, the entire process is restarted after a delay. This
is configured using the following parameters:
* cluster_formation.discovery_retry_limit
* cluster_formation.discovery_retry_interval

The default parameters were bumped to 30 retries with a delay of 1
second between each.

The locking retries/interval parameters are not used by the new
algorithm anymore.

There are extra minor changes that come with the rewrite:
* The configured backend is cached in a persistent term. The goal is to
  make sure we use the same backend throughout the entire process and
  when we call `maybe_unregister/0` even if the configuration changed
  for whatever reason in between.
* `maybe_register/0` is called from `rabbit_db_cluster` instead of at
  the end of a successful peer discovery process. `rabbit_db_cluster`
  had to call `maybe_register/0` if the node was not virgin anyway. So
  make it simpler and always call it in `rabbit_db_cluster` regardless
  of the state of the node.
* `log_configured_backend/0` is gone. `maybe_init/0` can log the backend
  directly. There is no need to explicitly call another function for
  that.
* Messages are logged using `?LOG_*()` macros instead of the old
  `rabbit_log` module.
2023-12-07 15:51:54 +01:00
Jean-Sébastien Pédron 9bf49311a4
peer_discovery_classic_config_SUITE: Fix `successful_discovery_with_a_subset_of_nodes_coming_online`
[Why]
The testcase was broken as part of the work on Khepri (#7206): all nodes
were started, making it an equivalent of the `successful_discovery`
testcase.

[How]
We drop the first entry in the list of nodes given to
`rabbit_ct_broker_helpers`. This way, it won't be started at all while
still being listed in the classic config parameter.
2023-12-07 15:51:54 +01:00
Karl Nilsson a5ce25eca3 QQ: Cache oldest_entry_timestamp
To avoid re-reading the same entry from the log for queues with a backlog
but no current consumers.
2023-12-06 11:09:18 +00:00
Karl Nilsson 3eb982c200 make test mixed versions compatible 2023-12-04 15:42:05 +00:00
Karl Nilsson ddf896e4e4 QQ: ensure members that are started late have right config.
If a quorum queue is declared whilst one or more selected nodes
are down the nodes were not started with the correct config.

This change addresses that as well as adding one more parameter to
the mutable config passed to `ra:restart_server/2`
2023-12-04 12:03:08 +00:00
Karl Nilsson f67058ac6c
Merge pull request #9830 from rabbitmq/mc-refinements
Message container conversion improvements
2023-11-24 10:22:34 +00:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
David Ansari 95c5f2ec9e Some small fixes 2023-11-16 12:33:17 +01:00
Karl Nilsson c4fd947aad MC: various changes and improvements
To refine conversion behaviour add additional tests
and ensure it matches the documentation.

mc: optionally capture source environment

And pass target environment to mc:convert

This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
2023-11-15 11:04:49 +00:00
Karl Nilsson 8d2c0a6425 Add tests for rabbit_queue_type:format/2
For streams and quorum queues, classic are tested in many other
places including mgmt tests.
2023-11-08 10:27:01 +00:00
Karl Nilsson 9cf6454973 Expose rabbit_quorum_queue:infos/2
To allow callers to specify a subset of fields they'd like.
2023-11-07 09:20:52 +00:00